diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java index efb0d70352..c5e4837d2a 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java @@ -42,7 +42,7 @@ public OrcSchemaConversionValidator(State sourceState) { } @Override - public boolean validate(KafkaTopic topic) { + public boolean validate(KafkaTopic topic) throws Exception { LOGGER.debug("Validating ORC schema conversion for topic {}", topic.getName()); try { Schema schema = (Schema) this.schemaRegistry.getLatestSchema(topic.getName()); diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java index 8f335b5cad..d8ef567fba 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java @@ -34,7 +34,7 @@ public class OrcSchemaConversionValidatorTest { @Test - public void testOrcSchemaConversionValidator() { + public void testOrcSchemaConversionValidator() throws Exception { KafkaTopic topic1 = new KafkaTopic("topic1", ImmutableList.of()); KafkaTopic topic2 = new KafkaTopic("topic2", ImmutableList.of()); KafkaTopic topic3 = new KafkaTopic("topic3", ImmutableList.of()); @@ -53,7 +53,7 @@ public void testOrcSchemaConversionValidator() { } @Test - public void testGetLatestSchemaFail() { + public void testGetLatestSchemaFail() throws Exception { KafkaTopic topic1 = new KafkaTopic("topic1", ImmutableList.of()); KafkaTopic topic2 = new KafkaTopic("topic2", ImmutableList.of()); KafkaTopic topic3 = new KafkaTopic("topic3", ImmutableList.of()); diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java index 2691ae112c..4390190ec4 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java @@ -67,6 +67,19 @@ public void testValidatorTimeout() { Assert.assertEquals(validTopics.get(0).getName(), "topic2"); } + @Test + public void testValidatorThrowingException() { + List allTopics = Arrays.asList("topic1", "topic2"); + List topics = buildKafkaTopics(allTopics); + State state = new State(); + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, ValidatorThrowingException.class.getName()); + List validTopics = new TopicValidators(state).validate(topics); + + Assert.assertEquals(validTopics.size(), 2); // validator throws exceptions, so all topics are treated as valid + Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic1"))); + Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic2"))); + } + private List buildKafkaTopics(List topics) { return topics.stream() .map(topicName -> new KafkaTopic(topicName, Collections.emptyList())) @@ -109,4 +122,15 @@ public boolean validate(KafkaTopic topic) { return false; } } + + public static class ValidatorThrowingException extends TopicValidatorBase { + public ValidatorThrowingException(State state) { + super(state); + } + + @Override + public boolean validate(KafkaTopic topic) throws Exception { + throw new Exception("Always throw exception"); + } + } }