From 03dc7bc21287511e739568b48c66483c15a3ad05 Mon Sep 17 00:00:00 2001 From: Tao Qin Date: Tue, 3 Oct 2023 17:48:59 -0700 Subject: [PATCH] * Add generic topic validation support * Add the first validator TopicNameValidator into the validator chain, as a refactor of existing codes Add generic topic validation support Add OrcSchemaConversionValidator --- .../extractor/extract/kafka/KafkaSource.java | 11 +- .../OrcSchemaConversionValidator.java | 61 ++++++ .../kafka/validator/TopicNameValidator.java | 43 +++++ .../kafka/validator/TopicValidatorBase.java | 39 ++++ .../kafka/validator/TopicValidators.java | 83 +++++++++ .../extract/kafka/KafkaSourceTest.java | 33 ++++ .../OrcSchemaConversionValidatorTest.java | 176 ++++++++++++++++++ .../kafka/validator/TopicValidatorsTest.java | 67 +++++++ .../util/orc/AvroOrcSchemaConverter.java | 27 ++- 9 files changed, 534 insertions(+), 6 deletions(-) create mode 100644 gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java create mode 100644 gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java create mode 100644 gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java create mode 100644 gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java create mode 100644 gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java create mode 100644 gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 0bc4c948eb..0bca916a70 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -61,6 +61,7 @@ import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.EventBasedSource; import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker; +import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.MultiWorkUnit; @@ -218,7 +219,7 @@ public List getWorkunits(SourceState state) { this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config)); - List topics = getFilteredTopics(state); + List topics = getValidTopics(getFilteredTopics(state), state); this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet()); for (String topic : this.topicsToProcess) { @@ -802,6 +803,7 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets protected List getFilteredTopics(SourceState state) { List blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST); List whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST); + // TODO: replace this with TopicNameValidator in the config once TopicValidators is rolled out. if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true)) { blacklist.add(Pattern.compile(".*\\..*")); } @@ -815,6 +817,13 @@ public void shutdown(SourceState state) { state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount); } + /** + * Return topics that pass all the topic validators. + */ + protected List getValidTopics(List topics, SourceState state) { + return new TopicValidators(state).validate(topics); + } + /** * This class contains startOffset, earliestOffset and latestOffset for a Kafka partition. */ diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java new file mode 100644 index 0000000000..0ffd3762bf --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry; +import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory; +import org.apache.gobblin.kafka.schemareg.SchemaRegistryException; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class OrcSchemaConversionValidator extends TopicValidatorBase { + private static final Logger LOGGER = LoggerFactory.getLogger(OrcSchemaConversionValidator.class); + + public static final String MAX_RECURSIVE_DEPTH_KEY = "gobblin.kafka.topicValidators.orcSchemaConversionValidator.maxRecursiveDepth"; + public static final int DEFAULT_MAX_RECURSIVE_DEPTH = 200; + + private final KafkaSchemaRegistry schemaRegistry; + + public OrcSchemaConversionValidator(SourceState sourceState) { + super(sourceState); + this.schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(sourceState.getProperties()); + } + + @Override + public boolean validate(KafkaTopic topic) { + LOGGER.debug("Validating ORC schema conversion for topic {}", topic.getName()); + try { + Schema schema = (Schema) this.schemaRegistry.getLatestSchema(topic.getName()); + // Try converting the avro schema to orc schema to check if any errors. + int maxRecursiveDepth = this.sourceState.getPropAsInt(MAX_RECURSIVE_DEPTH_KEY, DEFAULT_MAX_RECURSIVE_DEPTH); + AvroOrcSchemaConverter.tryGetOrcSchema(schema, 0, maxRecursiveDepth); + } catch (StackOverflowError e) { + LOGGER.warn("Failed to covert latest schema to ORC schema for topic: {}", topic.getName()); + return false; + } catch (IOException | SchemaRegistryException e) { + LOGGER.warn("Failed to get latest schema for topic: {}, validation is skipped, exception: ", topic.getName(), e); + return true; + } + return true; + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java new file mode 100644 index 0000000000..05b8e3beeb --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; + + +/** + * A topic validator that validates the topic name + */ +public class TopicNameValidator extends TopicValidatorBase { + private static final String DOT = "."; + + public TopicNameValidator(SourceState sourceState) { + super(sourceState); + } + + /** + * Check if a topic name is valid, current rules are: + * 1. must not contain "." + * @param topic the topic to be validated + * @return true if the topic name is valid (aka. doesn't contain ".") + */ + @Override + public boolean validate(KafkaTopic topic) { + return !topic.getName().contains(DOT); + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java new file mode 100644 index 0000000000..9a1834df55 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; + +/** + * The base class of a topic validator + */ +public abstract class TopicValidatorBase { + protected SourceState sourceState; + + public TopicValidatorBase(SourceState sourceState) { + this.sourceState = sourceState; + } + + /** + * Validate a KafkaTopic. + * This method must be thread-safe. + * @param topic The topic to validate + * @return Whether the topic passes the validation + */ + public abstract boolean validate(KafkaTopic topic); +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java new file mode 100644 index 0000000000..022493bb63 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import com.google.common.base.Strings; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + +/** + * The TopicValidators contains a list of {@link TopicValidatorBase} that validate topics. + * gobblin.kafka.topicValidators=validator1_class_name,validator2_class_name... + */ +@Slf4j +public class TopicValidators { + public static final String VALIDATOR_CLASSES_KEY = "gobblin.kafka.topicValidators"; + + public static final String VALIDATOR_CLASS_DELIMITER = ","; + + private final List validators = new ArrayList<>(); + + public TopicValidators(SourceState state) { + String validatorClasses = state.getProp(VALIDATOR_CLASSES_KEY); + if (Strings.isNullOrEmpty(validatorClasses)) { + return; + } + + String[] validatorClassNames = validatorClasses.split(VALIDATOR_CLASS_DELIMITER); + Arrays.stream(validatorClassNames).forEach(validator -> { + try { + this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validator, state)); + } catch (Exception e) { + log.error("Failed to create topic validator: {}, due to {}", validator, e); + } + }); + } + + /** + * Validate topics with all the internal validators. + * Note: the validations for every topic run in parallel. + * @param topics the topics to be validated + * @return the topics that pass all the validators + */ + public List validate(List topics) { + // Validate the topics in parallel + return topics.parallelStream() + .filter(this::validate) + .collect(Collectors.toList()); + } + + /** + * Validates a single topic with all the internal validators + */ + private boolean validate(KafkaTopic topic) { + log.debug("Validating topic {} in thread: {}", topic, Thread.currentThread().getName()); + for (TopicValidatorBase validator : this.validators) { + if (!validator.validate(topic)) { + log.info("Skip KafkaTopic: {}, by validator: {}", topic, validator.getClass().getName()); + return false; + } + } + return true; + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java index 9992d4442a..c26872e1ce 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java @@ -25,6 +25,9 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; +import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator; +import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; import org.testng.Assert; import org.testng.annotations.Test; @@ -56,6 +59,36 @@ public void testGetFilteredTopics() { Assert.assertEquals(new TestKafkaSource(testKafkaClient).getFilteredTopics(state), toKafkaTopicList(allTopics.subList(0, 3))); } + @Test + public void testTopicValidators() { + TestKafkaClient testKafkaClient = new TestKafkaClient(); + List allTopics = Arrays.asList( + "Topic1", "topic-v2", "topic3", // allowed + "topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", //period topics + "not-allowed-topic"); + testKafkaClient.testTopics = allTopics; + KafkaSource kafkaSource = new TestKafkaSource(testKafkaClient); + + SourceState state = new SourceState(); + state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*"); + state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*"); + List topicsToValidate = kafkaSource.getFilteredTopics(state); + + // Test without TopicValidators in the state + Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state), + toKafkaTopicList(allTopics.subList(0, 6)))); + + // Test empty TopicValidators in the state + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, ""); + Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state), + toKafkaTopicList(allTopics.subList(0, 6)))); + + // Test TopicValidators with TopicNameValidator in the state + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, TopicNameValidator.class.getName()); + Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state), + toKafkaTopicList(allTopics.subList(0, 3)))); + } + public List toKafkaTopicList(List topicNames) { return topicNames.stream().map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList()); } diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java new file mode 100644 index 0000000000..93495defff --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/OrcSchemaConversionValidatorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry; +import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys; +import org.apache.gobblin.kafka.schemareg.SchemaRegistryException; +import org.apache.gobblin.kafka.serialize.MD5Digest; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class OrcSchemaConversionValidatorTest { + @Test + public void testOrcSchemaConversionValidator() { + KafkaTopic topic1 = new KafkaTopic("topic1", ImmutableList.of()); + KafkaTopic topic2 = new KafkaTopic("topic2", ImmutableList.of()); + KafkaTopic topic3 = new KafkaTopic("topic3", ImmutableList.of()); + SourceState state = new SourceState(); + state.setProp(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS, TestKafkaSchemaRegistry.class.getName()); + + OrcSchemaConversionValidator validator = new OrcSchemaConversionValidator(state); + Assert.assertTrue(validator.validate(topic1)); // Pass validation + Assert.assertTrue(validator.validate(topic2)); // Pass validation + Assert.assertFalse(validator.validate(topic3)); // Fail validation, default max_recursive_depth = 200, the validation returns early + + state.setProp(OrcSchemaConversionValidator.MAX_RECURSIVE_DEPTH_KEY, 1); + Assert.assertTrue(validator.validate(topic1)); // Pass validation + Assert.assertFalse(validator.validate(topic2)); // Fail validation, because max_recursive_depth is set to 1, the validation returns early + Assert.assertFalse(validator.validate(topic3)); // Fail validation, because max_recursive_depth is set to 1, the validation returns early + } + + @Test + public void testGetLatestSchemaFail() { + KafkaTopic topic1 = new KafkaTopic("topic1", ImmutableList.of()); + KafkaTopic topic2 = new KafkaTopic("topic2", ImmutableList.of()); + KafkaTopic topic3 = new KafkaTopic("topic3", ImmutableList.of()); + SourceState state = new SourceState(); + state.setProp(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS, BadKafkaSchemaRegistry.class.getName()); + + OrcSchemaConversionValidator validator = new OrcSchemaConversionValidator(state); + // Validator should always return PASS when it fails to get latest schema. + Assert.assertTrue(validator.validate(topic1)); + Assert.assertTrue(validator.validate(topic2)); + Assert.assertTrue(validator.validate(topic3)); + } + + // A KafkaSchemaRegistry class that returns the hardcoded schemas for the test topics. + public static class TestKafkaSchemaRegistry implements KafkaSchemaRegistry { + private final String schemaMaxInnerFieldDepthIs1 = "{" + + "\"type\": \"record\"," + + " \"name\": \"test\"," + + " \"fields\": [" + + " {\n" + + " \"name\": \"id\"," + + " \"type\": \"int\"" + + " }," + + " {" + + " \"name\": \"timestamp\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + "}"; + + private final String schemaMaxInnerFieldDepthIs2 = "{" + + " \"type\": \"record\"," + + " \"name\": \"nested\"," + + " \"fields\": [" + + " {" + + " \"name\": \"nestedId\"," + + " \"type\": {\n" + + " \"type\": \"array\"," + + " \"items\": \"string\"" + + " }" + + " }," + + " {" + + " \"name\": \"timestamp\"," + + " \"type\": \"string\"" + + " }" + + " ]" + + "}"; + + private final String schemaWithRecursiveRef = "{" + + " \"type\": \"record\"," + + " \"name\": \"TreeNode\"," + + " \"fields\": [" + + " {" + + " \"name\": \"value\"," + + " \"type\": \"int\"" + + " }," + + " {" + + " \"name\": \"children\"," + + " \"type\": {" + + " \"type\": \"array\"," + + " \"items\": \"TreeNode\"" + + " }" + + " }" + + " ]" + + "}"; + private final Map topicToSchema; + + public TestKafkaSchemaRegistry(Properties props) { + topicToSchema = ImmutableMap.of( + "topic1", new Schema.Parser().parse(schemaMaxInnerFieldDepthIs1), + "topic2", new Schema.Parser().parse(schemaMaxInnerFieldDepthIs2), + "topic3", new Schema.Parser().parse(schemaWithRecursiveRef)); + } + @Override + public Schema getLatestSchema(String topicName) { + return topicToSchema.get(topicName); + } + + @Override + public MD5Digest register(String name, Schema schema) { + return null; + } + + @Override + public Schema getById(MD5Digest id) { + return null; + } + + @Override + public boolean hasInternalCache() { + return false; + } + } + + // A KafkaSchemaRegistry class that always fail to get latest schema. + public static class BadKafkaSchemaRegistry implements KafkaSchemaRegistry { + public BadKafkaSchemaRegistry(Properties props) { + } + + @Override + public Schema getLatestSchema(String name) throws IOException, SchemaRegistryException { + throw new SchemaRegistryException("Exception in getLatestSchema()"); + } + + @Override + public MD5Digest register(String name, Schema schema) throws IOException, SchemaRegistryException { + return null; + } + + @Override + public Schema getById(MD5Digest id) throws IOException, SchemaRegistryException { + return null; + } + + @Override + public boolean hasInternalCache() { + return false; + } + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java new file mode 100644 index 0000000000..d2250f832e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TopicValidatorsTest { + @Test + public void testTopicNameValidator() { + List allTopics = Arrays.asList( + "topic1", "topic2", // allowed + "topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", // bad topics + "topic3", "topic4"); // in deny list + List topics = allTopics.stream() + .map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList()); + + String validatorsToUse = String.join(TopicValidators.VALIDATOR_CLASS_DELIMITER, + ImmutableList.of(TopicNameValidator.class.getName(), DenyListValidator.class.getName())); + + SourceState state = new SourceState(); + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse); + List validTopics = new TopicValidators(state).validate(topics); + + Assert.assertEquals(validTopics.size(), 2); + Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic1"))); + Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic2"))); + } + + // A TopicValidator class to mimic a deny list + public static class DenyListValidator extends TopicValidatorBase { + Set denyList = ImmutableSet.of("topic3", "topic4"); + + public DenyListValidator(SourceState sourceState) { + super(sourceState); + } + + @Override + public boolean validate(KafkaTopic topic) { + return !this.denyList.contains(topic.getName()); + } + } +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java index 9f227ca5d5..cc885a987f 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java @@ -27,7 +27,24 @@ * A utility class that provides a method to convert {@link Schema} into {@link TypeDescription}. */ public class AvroOrcSchemaConverter { + // Convert avro schema to orc schema, calling tryGetOrcSchema without recursive depth limit for backward compatibility public static TypeDescription getOrcSchema(Schema avroSchema) { + return tryGetOrcSchema(avroSchema, 0, Integer.MAX_VALUE - 1); + } + + /** + * Try converting the avro schema into {@link TypeDescription}, but with max recursive depth to avoid stack overflow. + * A typical use case is the topic validation during work unit creation. + * @param avroSchema The avro schema to convert + * @param currentDepth Current depth of the recursive call + * @param maxDepth Max depth of the recursive call + * @return the converted {@link TypeDescription} + */ + public static TypeDescription tryGetOrcSchema(Schema avroSchema, int currentDepth, int maxDepth) + throws StackOverflowError { + if (currentDepth == maxDepth + 1) { + throw new StackOverflowError("Recursive call of tryGetOrcSchema() reaches max depth " + maxDepth); + } final Schema.Type type = avroSchema.getType(); switch (type) { @@ -43,12 +60,12 @@ public static TypeDescription getOrcSchema(Schema avroSchema) { case FIXED: return getTypeDescriptionForBinarySchema(avroSchema); case ARRAY: - return TypeDescription.createList(getOrcSchema(avroSchema.getElementType())); + return TypeDescription.createList(tryGetOrcSchema(avroSchema.getElementType(), currentDepth + 1, maxDepth)); case RECORD: final TypeDescription recordStruct = TypeDescription.createStruct(); for (Schema.Field field2 : avroSchema.getFields()) { final Schema fieldSchema = field2.schema(); - final TypeDescription fieldType = getOrcSchema(fieldSchema); + final TypeDescription fieldType = tryGetOrcSchema(fieldSchema, currentDepth + 1, maxDepth); if (fieldType != null) { recordStruct.addField(field2.name(), fieldType); } else { @@ -59,19 +76,19 @@ public static TypeDescription getOrcSchema(Schema avroSchema) { case MAP: return TypeDescription.createMap( // in Avro maps, keys are always strings - TypeDescription.createString(), getOrcSchema(avroSchema.getValueType())); + TypeDescription.createString(), tryGetOrcSchema(avroSchema.getValueType(), currentDepth + 1, maxDepth)); case UNION: final List nonNullMembers = getNonNullMembersOfUnion(avroSchema); if (isNullableUnion(avroSchema, nonNullMembers)) { // a single non-null union member // this is how Avro represents "nullable" types; as a union of the NULL type with another // since ORC already supports nullability of all types, just use the child type directly - return getOrcSchema(nonNullMembers.get(0)); + return tryGetOrcSchema(nonNullMembers.get(0), currentDepth + 1, maxDepth); } else { // not a nullable union type; represent as an actual ORC union of them final TypeDescription union = TypeDescription.createUnion(); for (final Schema childSchema : nonNullMembers) { - union.addUnionChild(getOrcSchema(childSchema)); + union.addUnionChild(tryGetOrcSchema(childSchema, currentDepth + 1, maxDepth)); } return union; }