Skip to content

Commit

Permalink
Refine to address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tao Qin committed Oct 13, 2023
1 parent 03dc7bc commit adbc308
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
Expand All @@ -39,19 +38,14 @@ public class TopicValidators {
private final List<TopicValidatorBase> validators = new ArrayList<>();

public TopicValidators(SourceState state) {
String validatorClasses = state.getProp(VALIDATOR_CLASSES_KEY);
if (Strings.isNullOrEmpty(validatorClasses)) {
return;
}

String[] validatorClassNames = validatorClasses.split(VALIDATOR_CLASS_DELIMITER);
Arrays.stream(validatorClassNames).forEach(validator -> {
for (String validatorClassName : state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) {
try {
this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validator, state));
this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validatorClassName,
state));
} catch (Exception e) {
log.error("Failed to create topic validator: {}, due to {}", validator, e);
log.error("Failed to create topic validator: {}, due to {}", validatorClassName, e);
}
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,25 @@

public class TopicValidatorsTest {
@Test
public void testTopicNameValidator() {
public void testTopicValidators() {
List<String> allTopics = Arrays.asList(
"topic1", "topic2", // allowed
"topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", // bad topics
"topic3", "topic4"); // in deny list
List<KafkaTopic> topics = allTopics.stream()
.map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList());

SourceState state = new SourceState();

// Without any topic validators
List<KafkaTopic> validTopics = new TopicValidators(state).validate(topics);
Assert.assertEquals(validTopics.size(), 7);

// Use 2 topic validators: TopicNameValidator and DenyListValidator
String validatorsToUse = String.join(TopicValidators.VALIDATOR_CLASS_DELIMITER,
ImmutableList.of(TopicNameValidator.class.getName(), DenyListValidator.class.getName()));

SourceState state = new SourceState();
state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse);
List<KafkaTopic> validTopics = new TopicValidators(state).validate(topics);
validTopics = new TopicValidators(state).validate(topics);

Assert.assertEquals(validTopics.size(), 2);
Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic1")));
Expand Down

0 comments on commit adbc308

Please sign in to comment.