diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java index 022493bb63..19d1dc4595 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java @@ -16,12 +16,11 @@ */ package org.apache.gobblin.source.extractor.extract.kafka.validator; -import com.google.common.base.Strings; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -39,19 +38,14 @@ public class TopicValidators { private final List validators = new ArrayList<>(); public TopicValidators(SourceState state) { - String validatorClasses = state.getProp(VALIDATOR_CLASSES_KEY); - if (Strings.isNullOrEmpty(validatorClasses)) { - return; - } - - String[] validatorClassNames = validatorClasses.split(VALIDATOR_CLASS_DELIMITER); - Arrays.stream(validatorClassNames).forEach(validator -> { + for (String validatorClassName : state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) { try { - this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validator, state)); + this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validatorClassName, + state)); } catch (Exception e) { - log.error("Failed to create topic validator: {}, due to {}", validator, e); + log.error("Failed to create topic validator: {}, due to {}", validatorClassName, e); } - }); + } } /** diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java index d2250f832e..d75fdc6662 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java @@ -31,7 +31,7 @@ public class TopicValidatorsTest { @Test - public void testTopicNameValidator() { + public void testTopicValidators() { List allTopics = Arrays.asList( "topic1", "topic2", // allowed "topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", // bad topics @@ -39,12 +39,17 @@ public void testTopicNameValidator() { List topics = allTopics.stream() .map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList()); + SourceState state = new SourceState(); + + // Without any topic validators + List validTopics = new TopicValidators(state).validate(topics); + Assert.assertEquals(validTopics.size(), 7); + + // Use 2 topic validators: TopicNameValidator and DenyListValidator String validatorsToUse = String.join(TopicValidators.VALIDATOR_CLASS_DELIMITER, ImmutableList.of(TopicNameValidator.class.getName(), DenyListValidator.class.getName())); - - SourceState state = new SourceState(); state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse); - List validTopics = new TopicValidators(state).validate(topics); + validTopics = new TopicValidators(state).validate(topics); Assert.assertEquals(validTopics.size(), 2); Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic1")));