Skip to content

Commit

Permalink
KCQL validation proposal (#73)
Browse files Browse the repository at this point in the history
* KCQL validation proposal

* PR comments pt.1

* PR comments pt.1

---------

Co-authored-by: Stefan Bocutiu <[email protected]>
  • Loading branch information
GoMati-MU and stheppi committed Aug 19, 2024
1 parent 76221de commit 2866a69
Show file tree
Hide file tree
Showing 19 changed files with 1,004 additions and 94 deletions.
1 change: 1 addition & 0 deletions java-connectors/kafka-connect-azure-eventhubs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ project(':kafka-connect-azure-eventhubs') {
dependencies {
implementation project(':kafka-connect-common')
implementation project(':kafka-connect-query-language')
testImplementation(project(path: ':test-utils', configuration: 'testArtifacts'))

// //azure-specific dependencies in case we want to change from kafka protocol
// implementation group: 'com.azure', name: 'azure-identity', version: '1.11.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
import static io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader;
import static io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow;

import io.lenses.streamreactor.common.exception.ConnectorStartupException;
import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.util.EventHubsKcqlMappingsValidator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;

import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigTopicMapper;
import lombok.extern.slf4j.Slf4j;

/**
* Implementation of {@link SourceConnector} for Microsoft Azure EventHubs.
*/
Expand Down Expand Up @@ -90,6 +89,8 @@ public String version() {
private static void parseAndValidateConfigs(Map<String, String> props) {
AzureEventHubsSourceConfig azureEventHubsSourceConfig = new AzureEventHubsSourceConfig(props);
String kcqlMappings = azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG);
KcqlConfigTopicMapper.mapInputToOutputsFromConfig(kcqlMappings);
EventHubsKcqlMappingsValidator.mapInputToOutputsFromConfig(kcqlMappings).mapLeft(e -> {
throw new ConnectorStartupException(e);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@
import static io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow;
import static java.util.Optional.ofNullable;

import cyclops.control.Either;
import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.common.exception.StreamReactorException;
import io.lenses.streamreactor.common.util.EitherUtils;
import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.util.EventHubsKcqlMappingsValidator;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;

import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.OffsetStorageReader;

import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigTopicMapper;
import lombok.extern.slf4j.Slf4j;

/**
* Implementation of {@link SourceTask} for Microsoft Azure EventHubs.
*/
Expand Down Expand Up @@ -73,18 +76,19 @@ public void start(Map<String, String> props) {
TopicPartitionOffsetProvider topicPartitionOffsetProvider = new TopicPartitionOffsetProvider(offsetStorageReader);

ArrayBlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue =
new ArrayBlockingQueue<>(
RECORDS_QUEUE_DEFAULT_SIZE);
Map<String, String> inputToOutputTopics =
KcqlConfigTopicMapper.mapInputToOutputsFromConfig(
new ArrayBlockingQueue<>(RECORDS_QUEUE_DEFAULT_SIZE);
Either<StreamReactorException, List<Kcql>> mappedInputsOutputsEither =
EventHubsKcqlMappingsValidator.mapInputToOutputsFromConfig(
azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG));
Map<String, String> inputToOutputTopics =
EitherUtils.unpackOrThrow(mappedInputsOutputsEither)
.stream().collect(Collectors.toUnmodifiableMap(Kcql::getSource, Kcql::getTarget));

blockingQueueProducerProvider = new BlockingQueueProducerProvider(topicPartitionOffsetProvider);
KafkaByteBlockingQueuedProducer producer =
blockingQueueProducerProvider.createProducer(
azureEventHubsSourceConfig, recordsQueue, inputToOutputTopics);
blockingQueueProducerProvider.createProducer(azureEventHubsSourceConfig, recordsQueue, inputToOutputTopics);
EventHubsKafkaConsumerController kafkaConsumerController =
new EventHubsKafkaConsumerController(
producer, recordsQueue, inputToOutputTopics);
new EventHubsKafkaConsumerController(producer, recordsQueue, inputToOutputTopics);
initialize(kafkaConsumerController, azureEventHubsSourceConfig);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.azure.eventhubs.util;

import cyclops.control.Either;
import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.common.exception.StreamReactorException;
import io.lenses.streamreactor.common.validation.ConnectorConfigKcqlValidator;
import io.lenses.streamreactor.common.validation.validators.DistinctSourceNamesValidator;
import io.lenses.streamreactor.common.validation.validators.DistinctTargetNamesValidator;
import io.lenses.streamreactor.common.validation.validators.PatternMatchingSourceNameValidator;
import io.lenses.streamreactor.common.validation.validators.PatternMatchingTargetNameValidator;
import java.util.List;
import java.util.function.UnaryOperator;

public class EventHubsKcqlMappingsValidator {

private static final String TOPIC_NAME_REGEX = "^[\\w][\\w\\-\\_\\.]*$";

public static final String TOPIC_NAME_ERROR_MESSAGE =
"%s topic %s, name is not correctly specified (It can contain only letters, numbers and hyphens,"
+ " underscores and dots and has to start with number or letter)";

public static final UnaryOperator<String> OUTPUT_TOPIC_ERROR_MESSAGE_FUNCTION =
topicName -> String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", topicName);

public static final UnaryOperator<String> INPUT_TOPIC_ERROR_MESSAGE_FUNCTION =
topicName -> String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", topicName);

private static final ConnectorConfigKcqlValidator EVENT_HUBS_KCQL_VALIDATOR =
ConnectorConfigKcqlValidator.builder()
.singleKcqlValidators(List.of(
new PatternMatchingSourceNameValidator(TOPIC_NAME_REGEX, INPUT_TOPIC_ERROR_MESSAGE_FUNCTION),
new PatternMatchingTargetNameValidator(TOPIC_NAME_REGEX, OUTPUT_TOPIC_ERROR_MESSAGE_FUNCTION)
))
.allKcqlValidators(List.of(
new DistinctSourceNamesValidator(),
new DistinctTargetNamesValidator()
))
.build();

/**
* This method parses KCQL statements and fetches input and output topics checking against regex for invalid topic
* names in input and output.
*
* @param kcqlString string to parse
* @return map of input to output topic names
*/
public static Either<StreamReactorException, List<Kcql>> mapInputToOutputsFromConfig(String kcqlString) {
return EVENT_HUBS_KCQL_VALIDATOR.validateKcqlString(kcqlString);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,88 @@
*/
package io.lenses.streamreactor.connect.azure.eventhubs.util;

import java.util.ArrayList;
import java.util.HashMap;
import static io.lenses.streamreactor.common.util.StringUtils.getSystemsNewLineChar;

import cyclops.control.Either;
import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.common.exception.ConnectorStartupException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import io.lenses.kcql.Kcql;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Class that represents methods around KCQL topic handling.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class KcqlConfigTopicMapper {

private static final String TOPIC_NAME_REGEX = "^[\\w][\\w\\-\\_\\.]*$";
private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(TOPIC_NAME_REGEX);

private static final String ERROR_DELIMITER = ";" + getSystemsNewLineChar();
public static final String TOPIC_NAME_ERROR_MESSAGE =
"%s topic %s, name is not correctly specified (It can contain only letters, numbers and hyphens,"
+ " underscores and dots and has to start with number or letter";
+ " underscores and dots and has to start with number or letter)";

/**
* This method parses KCQL statements and fetches input and output topics checking against
* regex for invalid topic names in input and output.
*
* This method parses KCQL statements and fetches input and output topics checking against regex for invalid topic
* names in input and output.
*
* @param kcqlString string to parse
* @return map of input to output topic names
*/
public static Map<String, String> mapInputToOutputsFromConfig(String kcqlString) {
public static Either<ConnectorStartupException, List<Kcql>> mapInputToOutputsFromConfig(String kcqlString) {
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString);
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size());
List<String> outputTopics = new ArrayList<>(kcqls.size());

for (Kcql kcql : kcqls) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();
List<String> inputTopics = kcqls.stream().map(Kcql::getSource).collect(Collectors.toUnmodifiableList());
List<String> outputTopics = kcqls.stream().map(Kcql::getTarget).collect(Collectors.toUnmodifiableList());

if (!topicNameMatchesAgainstRegex(inputTopic)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic));
}
if (!topicNameMatchesAgainstRegex(outputTopic)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic));
}
if (inputToOutputTopics.containsKey(inputTopic)) {
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic));
}
if (outputTopics.contains(outputTopic)) {
throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic));
}
Set<String> allErrors =
Stream.of(
validateTopicMappings(inputTopics, "Input"),
validateTopicMappings(outputTopics, "Output"),
validateTopicName(inputTopics, "Input"),
validateTopicName(outputTopics, "Output")
).flatMap(Collection::stream).collect(Collectors.toUnmodifiableSet());

inputToOutputTopics.put(inputTopic, outputTopic);
outputTopics.add(outputTopic);
if (!allErrors.isEmpty()) {
return Either.left(new ConnectorStartupException(
String.format("The following errors occurred during validation: %s", String.join(ERROR_DELIMITER,
allErrors))));
}

return inputToOutputTopics;
return Either.right(List.copyOf(kcqls));
}

private static List<String> validateTopicName(List<String> topicNames, String description) {
return topicNames.stream()
.filter(topicName -> !topicNameMatchesAgainstRegex(topicName))
.map(topicName -> String.format(TOPIC_NAME_ERROR_MESSAGE, description, topicName))
.collect(Collectors.toUnmodifiableList());
}

private static List<String> detectDuplicateTopicMappings(List<String> topics) {
return topics.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
.entrySet().stream()
.filter(entry -> entry.getValue() > 1)
.map(Map.Entry::getKey)
.collect(Collectors.toUnmodifiableList());
}

private static List<String> validateTopicMappings(List<String> topics, String description) {
return detectDuplicateTopicMappings(topics).stream()
.map(
dupe -> String.format("%s '%s' cannot be mapped twice.", description, dupe)
).collect(Collectors.toUnmodifiableList());
}

private static boolean topicNameMatchesAgainstRegex(String topicName) {
Expand Down
Loading

0 comments on commit 2866a69

Please sign in to comment.