Skip to content

Commit

Permalink
* Add generic topic validation support
Browse files Browse the repository at this point in the history
* Add the first validator TopicNameValidator into the validator chain, as a refactor of existing codes

Add generic topic validation support

Add OrcSchemaConversionValidator
  • Loading branch information
Tao Qin committed Oct 11, 2023
1 parent 9b254b6 commit 03dc7bc
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
Expand Down Expand Up @@ -218,7 +219,7 @@ public List<WorkUnit> getWorkunits(SourceState state) {

this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));

List<KafkaTopic> topics = getFilteredTopics(state);
List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state), state);
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());

for (String topic : this.topicsToProcess) {
Expand Down Expand Up @@ -802,6 +803,7 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets
protected List<KafkaTopic> getFilteredTopics(SourceState state) {
List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
// TODO: replace this with TopicNameValidator in the config once TopicValidators is rolled out.
if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true)) {
blacklist.add(Pattern.compile(".*\\..*"));
}
Expand All @@ -815,6 +817,13 @@ public void shutdown(SourceState state) {
state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount);
}

/**
* Return topics that pass all the topic validators.
*/
protected List<KafkaTopic> getValidTopics(List<KafkaTopic> topics, SourceState state) {
return new TopicValidators(state).validate(topics);
}

/**
* This class contains startOffset, earliestOffset and latestOffset for a Kafka partition.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory;
import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class OrcSchemaConversionValidator extends TopicValidatorBase {
private static final Logger LOGGER = LoggerFactory.getLogger(OrcSchemaConversionValidator.class);

public static final String MAX_RECURSIVE_DEPTH_KEY = "gobblin.kafka.topicValidators.orcSchemaConversionValidator.maxRecursiveDepth";
public static final int DEFAULT_MAX_RECURSIVE_DEPTH = 200;

private final KafkaSchemaRegistry schemaRegistry;

public OrcSchemaConversionValidator(SourceState sourceState) {
super(sourceState);
this.schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(sourceState.getProperties());
}

@Override
public boolean validate(KafkaTopic topic) {
LOGGER.debug("Validating ORC schema conversion for topic {}", topic.getName());
try {
Schema schema = (Schema) this.schemaRegistry.getLatestSchema(topic.getName());
// Try converting the avro schema to orc schema to check if any errors.
int maxRecursiveDepth = this.sourceState.getPropAsInt(MAX_RECURSIVE_DEPTH_KEY, DEFAULT_MAX_RECURSIVE_DEPTH);
AvroOrcSchemaConverter.tryGetOrcSchema(schema, 0, maxRecursiveDepth);
} catch (StackOverflowError e) {
LOGGER.warn("Failed to covert latest schema to ORC schema for topic: {}", topic.getName());
return false;
} catch (IOException | SchemaRegistryException e) {
LOGGER.warn("Failed to get latest schema for topic: {}, validation is skipped, exception: ", topic.getName(), e);
return true;
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;


/**
* A topic validator that validates the topic name
*/
public class TopicNameValidator extends TopicValidatorBase {
private static final String DOT = ".";

public TopicNameValidator(SourceState sourceState) {
super(sourceState);
}

/**
* Check if a topic name is valid, current rules are:
* 1. must not contain "."
* @param topic the topic to be validated
* @return true if the topic name is valid (aka. doesn't contain ".")
*/
@Override
public boolean validate(KafkaTopic topic) {
return !topic.getName().contains(DOT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;

/**
* The base class of a topic validator
*/
public abstract class TopicValidatorBase {
protected SourceState sourceState;

public TopicValidatorBase(SourceState sourceState) {
this.sourceState = sourceState;
}

/**
* Validate a KafkaTopic.
* This method must be thread-safe.
* @param topic The topic to validate
* @return Whether the topic passes the validation
*/
public abstract boolean validate(KafkaTopic topic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;

/**
* The TopicValidators contains a list of {@link TopicValidatorBase} that validate topics.
* gobblin.kafka.topicValidators=validator1_class_name,validator2_class_name...
*/
@Slf4j
public class TopicValidators {
public static final String VALIDATOR_CLASSES_KEY = "gobblin.kafka.topicValidators";

public static final String VALIDATOR_CLASS_DELIMITER = ",";

private final List<TopicValidatorBase> validators = new ArrayList<>();

public TopicValidators(SourceState state) {
String validatorClasses = state.getProp(VALIDATOR_CLASSES_KEY);
if (Strings.isNullOrEmpty(validatorClasses)) {
return;
}

String[] validatorClassNames = validatorClasses.split(VALIDATOR_CLASS_DELIMITER);
Arrays.stream(validatorClassNames).forEach(validator -> {
try {
this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validator, state));
} catch (Exception e) {
log.error("Failed to create topic validator: {}, due to {}", validator, e);
}
});
}

/**
* Validate topics with all the internal validators.
* Note: the validations for every topic run in parallel.
* @param topics the topics to be validated
* @return the topics that pass all the validators
*/
public List<KafkaTopic> validate(List<KafkaTopic> topics) {
// Validate the topics in parallel
return topics.parallelStream()
.filter(this::validate)
.collect(Collectors.toList());
}

/**
* Validates a single topic with all the internal validators
*/
private boolean validate(KafkaTopic topic) {
log.debug("Validating topic {} in thread: {}", topic, Thread.currentThread().getName());
for (TopicValidatorBase validator : this.validators) {
if (!validator.validate(topic)) {
log.info("Skip KafkaTopic: {}, by validator: {}", topic, validator.getClass().getName());
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -56,6 +59,36 @@ public void testGetFilteredTopics() {
Assert.assertEquals(new TestKafkaSource(testKafkaClient).getFilteredTopics(state), toKafkaTopicList(allTopics.subList(0, 3)));
}

@Test
public void testTopicValidators() {
TestKafkaClient testKafkaClient = new TestKafkaClient();
List<String> allTopics = Arrays.asList(
"Topic1", "topic-v2", "topic3", // allowed
"topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", //period topics
"not-allowed-topic");
testKafkaClient.testTopics = allTopics;
KafkaSource kafkaSource = new TestKafkaSource(testKafkaClient);

SourceState state = new SourceState();
state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*");
state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*");
List<KafkaTopic> topicsToValidate = kafkaSource.getFilteredTopics(state);

// Test without TopicValidators in the state
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state),
toKafkaTopicList(allTopics.subList(0, 6))));

// Test empty TopicValidators in the state
state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, "");
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state),
toKafkaTopicList(allTopics.subList(0, 6))));

// Test TopicValidators with TopicNameValidator in the state
state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, TopicNameValidator.class.getName());
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state),
toKafkaTopicList(allTopics.subList(0, 3))));
}

public List<KafkaTopic> toKafkaTopicList(List<String> topicNames) {
return topicNames.stream().map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit 03dc7bc

Please sign in to comment.