From 379bc07fcbad2cd66db90ee18be5de8a90c8c2f9 Mon Sep 17 00:00:00 2001 From: rajeshLovesToCode Date: Fri, 7 Jul 2023 22:00:43 +0530 Subject: [PATCH] -Support for kafka-sink Signed-off-by: rajeshLovesToCode --- .../kafka/configuration/AuthConfig.java | 10 +- .../kafka/configuration/AwsDLQConfig.java | 36 ---- .../kafka/configuration/KafkaSinkConfig.java | 83 +++++---- .../kafka/producer/KafkaSinkProducer.java | 51 ++++-- .../plugins/kafka/sink/DLQSink.java | 78 ++++---- .../plugins/kafka/sink/KafkaSink.java | 7 +- .../AuthenticationPropertyConfigurer.java | 30 ++- .../kafka/util/SinkPropertyConfigurer.java | 36 ++-- .../kafka/configuration/AuthConfigTest.java | 2 +- .../kafka/configuration/AwsDlqConfigTest.java | 55 ------ .../configuration/KafkaSinkConfigTest.java | 172 +++++++++--------- .../kafka/configuration/OAuthConfigTest.java | 42 +++-- .../kafka/producer/KafkaSinkProducerTest.java | 58 +++--- .../kafka/producer/ProducerWorkerTest.java | 33 ++-- .../plugins/kafka/sink/DLQSinkTest.java | 18 +- .../plugins/kafka/sink/KafkasinkTest.java | 43 ++--- .../AuthenticationPropertyConfigurerTest.java | 29 ++- .../sample-pipelines-sink-oauth.yaml | 36 ++-- .../test/resources/sample-pipelines-sink.yaml | 21 ++- 19 files changed, 406 insertions(+), 434 deletions(-) delete mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDLQConfig.java delete mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDlqConfigTest.java diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java index 0885d9b5f5..fd617f2052 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java @@ -19,18 +19,10 @@ public class AuthConfig { @JsonProperty("sasl_oauth") private OAuthConfig oAuthConfig; - public OAuthConfig getoAuthConfig() { + public OAuthConfig getOAuthConfig() { return oAuthConfig; } - public void setoAuthConfig(OAuthConfig oAuthConfig) { - this.oAuthConfig = oAuthConfig; - } - - public void setPlainTextAuthConfig(PlainTextAuthConfig plainTextAuthConfig) { - this.plainTextAuthConfig = plainTextAuthConfig; - } - public PlainTextAuthConfig getPlainTextAuthConfig() { return plainTextAuthConfig; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDLQConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDLQConfig.java deleted file mode 100644 index 4eeed13931..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDLQConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.opensearch.dataprepper.plugins.kafka.configuration; - -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import jakarta.validation.constraints.Size; - -public class AwsDLQConfig { - - @JsonProperty("bucket") - @NotNull - @NotEmpty - private String bucket; - - @JsonProperty("sts_role_arn") - @NotNull - @NotEmpty - @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") - private String roleArn; - - @JsonProperty("region") - @Size(min = 1, message = "Region cannot be empty string") - private String region; - - public String getBucket() { - return bucket; - } - - public String getRoleArn() { - return roleArn; - } - - public String getRegion() { - return region; - } -} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java index 1e57d34b7e..dc71b45dab 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java @@ -9,8 +9,13 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; /** * * A helper class that helps to read user configuration values from @@ -19,29 +24,37 @@ public class KafkaSinkConfig { + public static final String DLQ = "dlq"; + @JsonProperty("bootstrap_servers") @NotNull @Size(min = 1, message = "Bootstrap servers can't be empty") private List bootStrapServers; - @JsonProperty("aws") - @NotNull - AwsDLQConfig dlqConfig; + private PluginModel dlq; - @JsonProperty("thread_wait_time") - private Long threadWaitTime; + public Optional getDlq() { + return Optional.ofNullable(dlq); + } - @JsonProperty("compression_type") - private String compressionType; + public void setDlqConfig(final PluginSetting pluginSetting) { + final LinkedHashMap> dlq = (LinkedHashMap) pluginSetting.getAttributeFromSettings(DLQ); + if (dlq != null) { + if (dlq.size() != 1) { + throw new RuntimeException("dlq option must declare exactly one dlq configuration"); + } + final Map.Entry> entry = dlq.entrySet().stream() + .findFirst() + .get(); - @JsonProperty("batch_size") - private Long batchSize; + this.dlq = new PluginModel(entry.getKey(), entry.getValue()); - @JsonProperty("max_request_size") - private String maxRequestSize; + } + } - @JsonProperty("acks") - private String acks; + + @JsonProperty("thread_wait_time") + private Long threadWaitTime; @JsonProperty("topics") private List topics; @@ -54,7 +67,7 @@ public class KafkaSinkConfig { @Valid private SchemaConfig schemaConfig; - @JsonProperty(value = "serde_format",defaultValue = "plaintext") + @JsonProperty(value = "serde_format", defaultValue = "plaintext") private String serdeFormat; @@ -62,22 +75,6 @@ public SchemaConfig getSchemaConfig() { return schemaConfig; } - public String getCompressionType() { - return compressionType; - } - - public Long getBatchSize() { - return batchSize; - } - - public String getMaxRequestSize() { - return maxRequestSize; - } - - public String getAcks() { - return acks; - } - public List getTopics() { return topics; @@ -93,10 +90,6 @@ public List getBootStrapServers() { return bootStrapServers; } - public AwsDLQConfig getDlqConfig() { - return dlqConfig; - } - public String getSerdeFormat() { return serdeFormat; } @@ -105,5 +98,27 @@ public Long getThreadWaitTime() { return threadWaitTime; } + public void setBootStrapServers(List bootStrapServers) { + this.bootStrapServers = bootStrapServers; + } + + public void setThreadWaitTime(Long threadWaitTime) { + this.threadWaitTime = threadWaitTime; + } + + public void setAuthConfig(AuthConfig authConfig) { + this.authConfig = authConfig; + } + + public void setSchemaConfig(SchemaConfig schemaConfig) { + this.schemaConfig = schemaConfig; + } + public void setSerdeFormat(String serdeFormat) { + this.serdeFormat = serdeFormat; + } + + public void setTopics(List topics) { + this.topics = topics; + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java index 00c8adfa51..5070e8e66c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java @@ -14,11 +14,15 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import java.util.Collection; +import java.util.LinkedList; + /** * * A helper class which helps takes the buffer data @@ -35,48 +39,66 @@ public class KafkaSinkProducer { private final CachedSchemaRegistryClient schemaRegistryClient; + private final Collection bufferedEventHandles; public KafkaSinkProducer(final Producer producer, final KafkaSinkConfig kafkaSinkConfig, final DLQSink dlqSink) { this.producer = producer; this.kafkaSinkConfig = kafkaSinkConfig; - this.dlqSink=dlqSink;; - schemaRegistryClient=getSchemaRegistryClient(); + this.dlqSink = dlqSink; + schemaRegistryClient = getSchemaRegistryClient(); + bufferedEventHandles = new LinkedList<>(); } + public KafkaSinkProducer(final Producer producer, + final KafkaSinkConfig kafkaSinkConfig, + final DLQSink dlqSink, + final CachedSchemaRegistryClient schemaRegistryClient) { + this.producer = producer; + this.kafkaSinkConfig = kafkaSinkConfig; + this.dlqSink = dlqSink; + this.schemaRegistryClient = schemaRegistryClient; + bufferedEventHandles = new LinkedList<>(); + } public void produceRecords(final Record record) { + if (record.getData().getEventHandle() != null) { + bufferedEventHandles.add(record.getData().getEventHandle()); + } kafkaSinkConfig.getTopics().forEach(topic -> { Object dataForDlq = null; try { - String serdeFormat=kafkaSinkConfig.getSerdeFormat(); + final String serdeFormat = kafkaSinkConfig.getSerdeFormat(); if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) { - JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); + final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); dataForDlq = dataNode; producer.send(new ProducerRecord(topic.getName(), dataNode)); - } - else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) { + } else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) { final String valueToParse = schemaRegistryClient. getLatestSchemaMetadata(topic.getName() + "-value").getSchema(); - Schema schema =new Schema.Parser().parse(valueToParse); - GenericRecord genericRecord = getGenericRecord(record.getData(),schema); + final Schema schema = new Schema.Parser().parse(valueToParse); + final GenericRecord genericRecord = getGenericRecord(record.getData(), schema); dataForDlq = genericRecord; producer.send(new ProducerRecord(topic.getName(), genericRecord)); } else { dataForDlq = record.getData().toJsonString(); producer.send(new ProducerRecord(topic.getName(), record.getData().toJsonString())); } + releaseEventHandles(true); } catch (Exception e) { - dlqSink.perform(dataForDlq); + dlqSink.perform(dataForDlq, e); + releaseEventHandles(false); } }); } - private CachedSchemaRegistryClient getSchemaRegistryClient(){ - return new CachedSchemaRegistryClient(kafkaSinkConfig.getSchemaConfig().getRegistryURL(), + private CachedSchemaRegistryClient getSchemaRegistryClient() { + + return new CachedSchemaRegistryClient( + kafkaSinkConfig.getSchemaConfig().getRegistryURL(), 100); } @@ -90,5 +112,12 @@ private GenericRecord getGenericRecord(Event event, Schema schema) { return record; } + private void releaseEventHandles(final boolean result) { + for (final EventHandle eventHandle : bufferedEventHandles) { + eventHandle.release(result); + } + bufferedEventHandles.clear(); + } + } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java index eb21df9a7c..3e780709d4 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java @@ -7,6 +7,7 @@ package org.opensearch.dataprepper.plugins.kafka.sink; import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.plugin.PluginFactory; @@ -17,9 +18,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; import static java.util.UUID.randomUUID; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; /** @@ -30,51 +36,59 @@ public class DLQSink { private static final Logger LOG = LoggerFactory.getLogger(DLQSink.class); - private static final String BUCKET = "bucket"; - private static final String ROLE_ARN = "sts_role_arn"; - private static final String REGION = "region"; - private static final String S3_PLUGIN_NAME = "s3"; private final DlqProvider dlqProvider; - final PluginSetting pluginSetting; + private final PluginSetting pluginSetting; public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig,final PluginSetting pluginSetting) { + this.pluginSetting=pluginSetting; this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig); - this.pluginSetting=pluginSetting; - } - public void perform(final Object failedData) { - DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName()); - try { - String pluginId = randomUUID().toString(); - DlqObject dlqObject = DlqObject.builder() - .withPluginId(pluginId) - .withPluginName(pluginSetting.getName()) - .withPipelineName(pluginSetting.getPipelineName()) - .withFailedData(failedData) - .build(); - - dlqWriter.write(Arrays.asList(dlqObject), pluginSetting.getPipelineName(), pluginId); - } catch (final IOException io) { - LOG.error("Error occured while performing DLQ operation ",io); - } + public void perform(final Object failedData,final Exception e) { + final DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName()); + final DlqObject dlqObject = DlqObject.builder() + .withPluginId(randomUUID().toString()) + .withPluginName(pluginSetting.getName()) + .withPipelineName(pluginSetting.getPipelineName()) + .withFailedData(failedData) + .build(); + logFailureForDlqObjects(dlqWriter, List.of(dlqObject),e ); } private DlqWriter getDlqWriter( final String writerPipelineName) { - Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) - .add(writerPipelineName).toString()); - DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null; + final Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) + .add(pluginSetting.getPipelineName()) + .add(pluginSetting.getName()).toString()); + final DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null; return dlqWriter; } private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) { final Map props = new HashMap<>(); - props.put(BUCKET, kafkaSinkConfig.getDlqConfig().getBucket()); - props.put(ROLE_ARN, kafkaSinkConfig.getDlqConfig().getRoleArn()); - props.put(REGION, kafkaSinkConfig.getDlqConfig().getRegion()); - final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); - DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); - return dlqProvider; + kafkaSinkConfig.setDlqConfig(pluginSetting); + final Optional dlq = kafkaSinkConfig.getDlq(); + if(dlq.isPresent()){ + final PluginModel dlqPluginModel = dlq.get(); + final PluginSetting dlqPluginSetting = new PluginSetting(dlqPluginModel.getPluginName(), dlqPluginModel.getPluginSettings()); + dlqPluginSetting.setPipelineName(pluginSetting.getPipelineName()); + final DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); + return dlqProvider; + } + return null; + } + + private void logFailureForDlqObjects(final DlqWriter dlqWriter,final List dlqObjects, final Throwable failure) { + try { + dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginSetting.getName()); + dlqObjects.forEach((dlqObject) -> { + dlqObject.releaseEventHandle(true); + }); + } catch (final IOException e) { + dlqObjects.forEach(dlqObject -> { + LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e); + dlqObject.releaseEventHandle(false); + }); + } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index 803eaa0e48..89d5f719b9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -92,10 +92,7 @@ public void doOutput(Collection> records) { final KafkaSinkProducer producer = createProducer(); records.forEach(record -> { producerWorker = new ProducerWorker(producer, record); - //TODO: uncomment this line after testing as this is the right way to do things - //executorService.submit(producerWorker); - //TODO: remove this line after testing as it executes the thread immediately - executorService.execute(producerWorker); + executorService.submit(producerWorker); }); } catch (Exception e) { @@ -108,7 +105,7 @@ public KafkaSinkProducer createProducer() { Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig); properties = Objects.requireNonNull(properties); return new KafkaSinkProducer(new KafkaProducer<>(properties), - kafkaSinkConfig, new DLQSink(pluginFactory, kafkaSinkConfig,pluginSetting)); + kafkaSinkConfig, new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting)); } @Override diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java index 00fa5bd48a..3c60dfc43c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java @@ -15,8 +15,6 @@ public class AuthenticationPropertyConfigurer { - private static final String SESSION_TIMEOUT_MS_CONFIG = "30000"; - private static final String SASL_MECHANISM = "sasl.mechanism"; private static final String SASL_SECURITY_PROTOCOL = "security.protocol"; @@ -57,20 +55,20 @@ public static void setSaslPlainTextProperties(final KafkaSinkConfig kafkaSinkCon public static void setOauthProperties(final KafkaSinkConfig kafkaSinkConfig, final Properties properties) { - String oauthClientId = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthClientId(); - String oauthClientSecret = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthClientSecret(); - String oauthLoginServer = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthLoginServer(); - String oauthLoginEndpoint = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthLoginEndpoint(); - String oauthLoginGrantType = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthLoginGrantType(); - String oauthLoginScope = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthLoginScope(); - String oauthAuthorizationToken = Base64.getEncoder().encodeToString((oauthClientId + ":" + oauthClientSecret).getBytes()); - String oauthIntrospectEndpoint = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthIntrospectEndpoint(); - String tokenEndPointURL = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthTokenEndpointURL(); - String saslMechanism = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthSaslMechanism(); - String securityProtocol = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthSecurityProtocol(); - String loginCallBackHandler = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthSaslLoginCallbackHandlerClass(); - String oauthJwksEndpointURL = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthJwksEndpointURL(); - String introspectServer = kafkaSinkConfig.getAuthConfig().getoAuthConfig().getOauthIntrospectServer(); + final String oauthClientId = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthClientId(); + final String oauthClientSecret = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthClientSecret(); + final String oauthLoginServer = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthLoginServer(); + final String oauthLoginEndpoint = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthLoginEndpoint(); + final String oauthLoginGrantType = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthLoginGrantType(); + final String oauthLoginScope = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthLoginScope(); + final String oauthAuthorizationToken = Base64.getEncoder().encodeToString((oauthClientId + ":" + oauthClientSecret).getBytes()); + final String oauthIntrospectEndpoint = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthIntrospectEndpoint(); + final String tokenEndPointURL = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthTokenEndpointURL(); + final String saslMechanism = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthSaslMechanism(); + final String securityProtocol = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthSecurityProtocol(); + final String loginCallBackHandler = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthSaslLoginCallbackHandlerClass(); + final String oauthJwksEndpointURL = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthJwksEndpointURL(); + final String introspectServer = kafkaSinkConfig.getAuthConfig().getOAuthConfig().getOauthIntrospectServer(); properties.put(SASL_MECHANISM, saslMechanism); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java index 938a7f3281..6c9c595bea 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.util; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -8,6 +13,9 @@ import java.util.Properties; +/** + * * This is static property configurer for related information given in pipeline.yml + */ public class SinkPropertyConfigurer { private static final String VALUE_SERIALIZER = "value.serializer"; @@ -18,24 +26,24 @@ public class SinkPropertyConfigurer { private static final String REGISTRY_URL = "schema.registry.url"; - public static Properties getProducerProperties(final KafkaSinkConfig kafkaSinkConfig) { - Properties properties = new Properties(); - setCommonServerProperties(properties,kafkaSinkConfig); - setPropertiesForSerializer(properties, kafkaSinkConfig.getSerdeFormat(),kafkaSinkConfig); + public static Properties getProducerProperties(final KafkaSinkConfig kafkaSinkConfig) { + final Properties properties = new Properties(); + setCommonServerProperties(properties, kafkaSinkConfig); + setPropertiesForSerializer(properties, kafkaSinkConfig.getSerdeFormat(), kafkaSinkConfig); if (kafkaSinkConfig.getAuthConfig().getPlainTextAuthConfig() != null) { AuthenticationPropertyConfigurer.setSaslPlainTextProperties(kafkaSinkConfig, properties); - } else if (kafkaSinkConfig.getAuthConfig().getoAuthConfig() != null) { + } else if (kafkaSinkConfig.getAuthConfig().getOAuthConfig() != null) { AuthenticationPropertyConfigurer.setOauthProperties(kafkaSinkConfig, properties); } return properties; } - private static void setCommonServerProperties(final Properties properties,final KafkaSinkConfig kafkaSinkConfig) { + private static void setCommonServerProperties(final Properties properties, final KafkaSinkConfig kafkaSinkConfig) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootStrapServers()); properties.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG); } - private static void setPropertiesForSerializer(Properties properties, final String serdeFormat,final KafkaSinkConfig kafkaSinkConfig) { + private static void setPropertiesForSerializer(final Properties properties, final String serdeFormat, final KafkaSinkConfig kafkaSinkConfig) { properties.put(KEY_SERIALIZER, StringSerializer.class.getName()); validateForRegistryURL(kafkaSinkConfig); if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) { @@ -50,16 +58,16 @@ private static void setPropertiesForSerializer(Properties properties, final Stri } private static void validateForRegistryURL(KafkaSinkConfig kafkaSinkConfig) { - final String serdeFormat=kafkaSinkConfig.getSerdeFormat(); - if(serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())){ - if(kafkaSinkConfig.getSchemaConfig()==null ||kafkaSinkConfig.getSchemaConfig().getRegistryURL()==null|| - kafkaSinkConfig.getSchemaConfig().getRegistryURL().isBlank()||kafkaSinkConfig.getSchemaConfig().getRegistryURL().isEmpty()){ + final String serdeFormat = kafkaSinkConfig.getSerdeFormat(); + if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) { + if (kafkaSinkConfig.getSchemaConfig() == null || kafkaSinkConfig.getSchemaConfig().getRegistryURL() == null || + kafkaSinkConfig.getSchemaConfig().getRegistryURL().isBlank() || kafkaSinkConfig.getSchemaConfig().getRegistryURL().isEmpty()) { throw new RuntimeException("Schema registry is mandatory when serde type is avro"); } } - if(serdeFormat.equalsIgnoreCase(MessageFormat.PLAINTEXT.toString())){ - if(kafkaSinkConfig.getSchemaConfig()!=null && - kafkaSinkConfig.getSchemaConfig().getRegistryURL()!=null){ + if (serdeFormat.equalsIgnoreCase(MessageFormat.PLAINTEXT.toString())) { + if (kafkaSinkConfig.getSchemaConfig() != null && + kafkaSinkConfig.getSchemaConfig().getRegistryURL() != null) { throw new RuntimeException("Schema registry is not required for type plaintext"); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java index 1a610cedd4..83d8e9cc3d 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java @@ -49,7 +49,7 @@ void testConfig() { assertThat(authConfig.getPlainTextAuthConfig(), notNullValue()); assertThat(authConfig.getPlainTextAuthConfig(), hasProperty("username")); assertThat(authConfig.getPlainTextAuthConfig(), hasProperty("password")); - assertThat(authConfig.getoAuthConfig(), notNullValue()); + assertThat(authConfig.getOAuthConfig(), notNullValue()); } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDlqConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDlqConfigTest.java deleted file mode 100644 index 785d9e5520..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDlqConfigTest.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.opensearch.dataprepper.plugins.kafka.configuration; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.yaml.snakeyaml.Yaml; - -import java.io.FileReader; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; - -class AwsDlqConfigTest { - - - AwsDLQConfig awsDLQConfig; - - List bootstrapServers; - - @BeforeEach - void setUp() throws IOException { - //Added to load Yaml file - Start - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); - Object data = yaml.load(fileReader); - if(data instanceof Map){ - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); - Map sinkeMap = (Map) logPipelineMap.get("sink"); - Map kafkaConfigMap = (Map) sinkeMap.get("kafka-sink"); - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kafkaConfigMap); - Reader reader = new StringReader(json); - awsDLQConfig = mapper.readValue(reader, KafkaSinkConfig.class).getDlqConfig(); - } - } - @Test - void test_aws_props(){ - assertThat(awsDLQConfig.getBucket(), notNullValue()); - assertThat(awsDLQConfig.getRegion(), notNullValue()); - assertThat(awsDLQConfig.getRoleArn(), notNullValue()); - - } - - -} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java index 252f65ec0d..e83235073d 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java @@ -1,99 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.kafka.configuration; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.yaml.snakeyaml.Yaml; +import org.opensearch.dataprepper.model.configuration.PluginSetting; -import java.io.FileReader; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + class KafkaSinkConfigTest { - KafkaSinkConfig kafkaSinkConfig; - - List bootstrapServers; - - @BeforeEach - void setUp() throws IOException { - //Added to load Yaml file - Start - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); - Object data = yaml.load(fileReader); - if(data instanceof Map){ - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); - Map sinkeMap = (Map) logPipelineMap.get("sink"); - Map kafkaConfigMap = (Map) sinkeMap.get("kafka-sink"); - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kafkaConfigMap); - Reader reader = new StringReader(json); - kafkaSinkConfig = mapper.readValue(reader, KafkaSinkConfig.class); - } - } - - @Test - void test_kafka_config_not_null() { - assertThat(kafkaSinkConfig, notNullValue()); - } - - @Test - void test_bootStrapServers_not_null(){ - assertThat(kafkaSinkConfig.getBootStrapServers(), notNullValue()); - List servers = kafkaSinkConfig.getBootStrapServers(); - bootstrapServers = servers.stream(). - flatMap(str -> Arrays.stream(str.split(","))). - map(String::trim). - collect(Collectors.toList()); - assertThat(bootstrapServers.size(), equalTo(1)); - assertThat(bootstrapServers, hasItem("localhost:29092")); - } - - @Test - void test_topics_not_null(){ - assertThat(kafkaSinkConfig.getTopics(), notNullValue()); - } - @Test - void test_schema_not_null(){ - assertThat(kafkaSinkConfig.getSchemaConfig(), notNullValue()); - } - @Test - void test_authentication_not_null(){ - assertThat(kafkaSinkConfig.getAuthConfig(), notNullValue()); - } - @Test - void test_batch_size_not_null(){ - assertThat(kafkaSinkConfig.getBatchSize(), notNullValue()); - } - @Test - void test_compression_type_not_null(){ - assertThat(kafkaSinkConfig.getCompressionType(), notNullValue()); - } - @Test - void test_acks_null(){ - assertThat(kafkaSinkConfig.getAcks(), notNullValue()); - } - - @Test - void test_dlq_config_null(){ - assertThat(kafkaSinkConfig.getDlqConfig(), notNullValue()); - } - - @Test - void test_thread_wait_time_null(){ - assertThat(kafkaSinkConfig.getThreadWaitTime(), notNullValue()); - } + KafkaSinkConfig kafkaSinkConfig; + + List bootstrapServers; + + @BeforeEach + void setUp() { + kafkaSinkConfig = new KafkaSinkConfig(); + kafkaSinkConfig.setBootStrapServers(Arrays.asList("127.0.0.1:9093")); + kafkaSinkConfig.setAuthConfig(mock(AuthConfig.class)); + kafkaSinkConfig.setTopics(Arrays.asList(mock(TopicConfig.class))); + kafkaSinkConfig.setSchemaConfig((mock(SchemaConfig.class))); + kafkaSinkConfig.setThreadWaitTime(10L); + kafkaSinkConfig.setSerdeFormat("JSON"); + + } + + @Test + void test_kafka_config_not_null() { + assertThat(kafkaSinkConfig, notNullValue()); + } + + @Test + void test_bootStrapServers_not_null() { + assertThat(kafkaSinkConfig.getBootStrapServers(), notNullValue()); + List servers = kafkaSinkConfig.getBootStrapServers(); + bootstrapServers = servers.stream(). + flatMap(str -> Arrays.stream(str.split(","))). + map(String::trim). + collect(Collectors.toList()); + assertThat(bootstrapServers.size(), equalTo(1)); + assertThat(bootstrapServers, hasItem("127.0.0.1:9093")); + } + + @Test + void test_topics_not_null() { + assertThat(kafkaSinkConfig.getTopics(), notNullValue()); + } + + @Test + void test_schema_not_null() { + assertThat(kafkaSinkConfig.getSchemaConfig(), notNullValue()); + } + + @Test + void test_authentication_not_null() { + assertThat(kafkaSinkConfig.getAuthConfig(), notNullValue()); + } + + @Test + void test_serde_format_not_null() { + assertThat(kafkaSinkConfig.getSerdeFormat(), notNullValue()); + } + + @Test + void test_thread_wait_time_null() { + assertThat(kafkaSinkConfig.getThreadWaitTime(), notNullValue()); + } + + @Test + public void testDLQConfiguration() { + final Map fakePlugin = new LinkedHashMap<>(); + final Map lowLevelPluginSettings = new HashMap<>(); + lowLevelPluginSettings.put("field1", "value1"); + lowLevelPluginSettings.put("field2", "value2"); + fakePlugin.put("another_dlq", lowLevelPluginSettings); + kafkaSinkConfig.setDlqConfig(generatePluginSetting(fakePlugin)); + assertEquals("another_dlq", kafkaSinkConfig.getDlq().get().getPluginName()); + } + + private PluginSetting generatePluginSetting(final Map pluginSettings) { + final Map metadata = new HashMap<>(); + if (pluginSettings != null) { + metadata.put(KafkaSinkConfig.DLQ, pluginSettings); + } + return new PluginSetting("S3", metadata); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java index fd5dbd3a60..3f10bcb194 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java @@ -35,6 +35,7 @@ public class OAuthConfigTest { private String oauthJwksEndpointURL; private static final String YAML_FILE_WITH_CONSUMER_CONFIG = "sample-pipelines.yaml"; + @BeforeEach void setUp() throws IOException { oAuthConfig = new OAuthConfig(); @@ -51,20 +52,20 @@ void setUp() throws IOException { String json = mapper.writeValueAsString(kafkaConfigMap); Reader reader = new StringReader(json); KafkaSourceConfig kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - oAuthConfig = kafkaSourceConfig.getAuthConfig().getoAuthConfig(); - oauthClientId= oAuthConfig.getOauthClientId(); + oAuthConfig = kafkaSourceConfig.getAuthConfig().getOAuthConfig(); + oauthClientId = oAuthConfig.getOauthClientId(); oauthClientSecret = oAuthConfig.getOauthClientSecret(); - oauthLoginServer= oAuthConfig.getOauthLoginServer(); - oauthLoginEndpoint= oAuthConfig.getOauthLoginEndpoint(); - oauthLoginGrantType= oAuthConfig.getOauthLoginGrantType(); - oauthLoginScope= oAuthConfig.getOauthLoginScope(); - oauthAuthorizationToken= Base64.getEncoder().encodeToString((oauthClientId +":" + oauthClientSecret).getBytes()); - oauthIntrospectEndpoint= oAuthConfig.getOauthIntrospectEndpoint(); - tokenEndPointURL= oAuthConfig.getOauthTokenEndpointURL(); - saslMechanism= oAuthConfig.getOauthSaslMechanism(); + oauthLoginServer = oAuthConfig.getOauthLoginServer(); + oauthLoginEndpoint = oAuthConfig.getOauthLoginEndpoint(); + oauthLoginGrantType = oAuthConfig.getOauthLoginGrantType(); + oauthLoginScope = oAuthConfig.getOauthLoginScope(); + oauthAuthorizationToken = Base64.getEncoder().encodeToString((oauthClientId + ":" + oauthClientSecret).getBytes()); + oauthIntrospectEndpoint = oAuthConfig.getOauthIntrospectEndpoint(); + tokenEndPointURL = oAuthConfig.getOauthTokenEndpointURL(); + saslMechanism = oAuthConfig.getOauthSaslMechanism(); securityProtocol = oAuthConfig.getOauthSecurityProtocol(); - loginCallBackHandler= oAuthConfig.getOauthSaslLoginCallbackHandlerClass(); - oauthJwksEndpointURL= oAuthConfig.getOauthJwksEndpointURL(); + loginCallBackHandler = oAuthConfig.getOauthSaslLoginCallbackHandlerClass(); + oauthJwksEndpointURL = oAuthConfig.getOauthJwksEndpointURL(); } } @@ -72,24 +73,25 @@ void setUp() throws IOException { void testConfig() { assertThat(oAuthConfig, notNullValue()); } + @Test - void assertConfigValues(){ + void assertConfigValues() { assertEquals(oAuthConfig.getOauthClientId(), "0oa9wc21447Pc5vsV5d7"); assertEquals(oAuthConfig.getOauthClientSecret(), "aGmOfHqIEvBJGDxXAOOcatiE9PvsPgoEePx8IPPa"); assertEquals(oAuthConfig.getOauthJwksEndpointURL(), "https://dev-13650048.okta.com/oauth2/default/v1/keys"); - assertEquals(oAuthConfig.getOauthSaslLoginCallbackHandlerClass(),"org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"); - assertEquals(oAuthConfig.getOauthSaslMechanism(),"OAUTHBEARER"); + assertEquals(oAuthConfig.getOauthSaslLoginCallbackHandlerClass(), "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"); + assertEquals(oAuthConfig.getOauthSaslMechanism(), "OAUTHBEARER"); //assertEquals(oAuthConfig.getOauthAuthorizationToken(),""); - assertEquals(oAuthConfig.getOauthIntrospectEndpoint(),"/oauth2/default/v1/introspect"); - assertEquals(oAuthConfig.getOauthIntrospectServer(),"https://dev-13650048.okta.com"); - assertEquals(oAuthConfig.getOauthLoginEndpoint(),"/oauth2/default/v1/token"); - assertEquals(oAuthConfig.getOauthLoginGrantType(),"refresh_token"); + assertEquals(oAuthConfig.getOauthIntrospectEndpoint(), "/oauth2/default/v1/introspect"); + assertEquals(oAuthConfig.getOauthIntrospectServer(), "https://dev-13650048.okta.com"); + assertEquals(oAuthConfig.getOauthLoginEndpoint(), "/oauth2/default/v1/token"); + assertEquals(oAuthConfig.getOauthLoginGrantType(), "refresh_token"); } @Test @Tag(YAML_FILE_WITH_CONSUMER_CONFIG) - void assertNotNullForConfigs(){ + void assertNotNullForConfigs() { /*assertNotNull(oAuthConfig.getOauthClientId()); assertNotNull(oAuthConfig.getOauthClientSecret()); assertNotNull(oAuthConfig.getOauthJwksEndpointURL()); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java index ddea3fab49..10c2fb073f 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java @@ -1,7 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.dataprepper.plugins.kafka.producer; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.MockProducer; @@ -22,19 +26,23 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; -import org.powermock.api.mockito.PowerMockito; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; -import static org.mockito.Mockito.*; -import static org.powermock.api.mockito.PowerMockito.whenNew; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) + public class KafkaSinkProducerTest { private KafkaSinkProducer producer; @@ -42,7 +50,7 @@ public class KafkaSinkProducerTest { @Mock private KafkaSinkConfig kafkaSinkConfig; - List topics = new ArrayList(); + List topics = new ArrayList<>(); private Record record; @@ -53,51 +61,55 @@ public class KafkaSinkProducerTest { private DLQSink dlqSink; private Event event; - @BeforeEach - public void setUp(){ + @Mock + private CachedSchemaRegistryClient cachedSchemaRegistryClient; + + @BeforeEach + public void setUp() { event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - record=new Record<>(event); + record = new Record<>(event); final TopicConfig topicConfig = new TopicConfig(); topicConfig.setName("test-topic"); topics.add(topicConfig); - when(kafkaSinkConfig.getTopics()).thenReturn(topics); when(kafkaSinkConfig.getSchemaConfig()).thenReturn(mock(SchemaConfig.class)); when(kafkaSinkConfig.getSchemaConfig().getRegistryURL()).thenReturn("http://localhost:8085/"); } + @Test - public void producePlainTextRecordsTest() { + public void producePlainTextRecordsTest() throws ExecutionException, InterruptedException { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); - producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig,dlqSink); + producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient); sinkProducer = spy(producer); sinkProducer.produceRecords(record); verify(sinkProducer).produceRecords(record); + assertEquals(1, mockProducer.history().size()); } @Test - public void produceJsonRecordsTest() { + public void produceJsonRecordsTest() { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("json"); MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); - producer=new KafkaSinkProducer(mockProducer, kafkaSinkConfig,dlqSink); - sinkProducer=spy(producer); + producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient); + sinkProducer = spy(producer); sinkProducer.produceRecords(record); verify(sinkProducer).produceRecords(record); - + assertEquals(1, mockProducer.history().size()); } @Test public void produceAvroRecordsTest() throws Exception { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("avro"); - CachedSchemaRegistryClient schemaRegistryClient=PowerMockito.mock(CachedSchemaRegistryClient.class); - whenNew(CachedSchemaRegistryClient.class).withArguments(kafkaSinkConfig.getSchemaConfig().getRegistryURL(), 100) - .thenReturn(schemaRegistryClient); - - MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new KafkaAvroSerializer()); - producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig,dlqSink); + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient); + SchemaMetadata schemaMetadata = mock(SchemaMetadata.class); + String avroSchema = "{\"type\":\"record\",\"name\":\"MyMessage\",\"fields\":[{\"name\":\"message\",\"type\":\"string\"}]}"; + when(schemaMetadata.getSchema()).thenReturn(avroSchema); + when(cachedSchemaRegistryClient.getLatestSchemaMetadata(topics.get(0).getName() + "-value")).thenReturn(schemaMetadata); sinkProducer = spy(producer); sinkProducer.produceRecords(record); verify(sinkProducer).produceRecords(record); @@ -106,11 +118,11 @@ public void produceAvroRecordsTest() throws Exception { @Test public void testGetGenericRecord() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - producer=new KafkaSinkProducer(new MockProducer(), kafkaSinkConfig,dlqSink); + producer = new KafkaSinkProducer(new MockProducer(), kafkaSinkConfig, dlqSink); final Schema schema = createMockSchema(); Method privateMethod = KafkaSinkProducer.class.getDeclaredMethod("getGenericRecord", Event.class, Schema.class); privateMethod.setAccessible(true); - GenericRecord result = (GenericRecord) privateMethod.invoke(producer, event,schema); + GenericRecord result = (GenericRecord) privateMethod.invoke(producer, event, schema); Assertions.assertNotNull(result); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java index d086d9d790..92bc4998e7 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java @@ -1,3 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.dataprepper.plugins.kafka.producer; import org.junit.jupiter.api.BeforeEach; @@ -6,43 +10,38 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; class ProducerWorkerTest { @Mock - ProducerWorker multithreadedProducer; - - @Mock - KafkaSinkConfig kafkaSinkConfig; + ProducerWorker producerWorker; private Record record; @BeforeEach - public void setUp(){ - Event event= JacksonEvent.fromMessage("Testing multithreaded producer"); - record=new Record<>(event); + public void setUp() { + Event event = JacksonEvent.fromMessage("Testing multithreaded producer"); + record = new Record<>(event); } - private ProducerWorker createObjectUnderTest(){ - return new ProducerWorker(mock(KafkaSinkProducer.class),record); + private ProducerWorker createObjectUnderTest() { + return new ProducerWorker(mock(KafkaSinkProducer.class), record); } @Test - void testWritingToTopic() { - multithreadedProducer = createObjectUnderTest(); - Thread spySink = spy(new Thread(multithreadedProducer)); + void testWritingToTopic() { + producerWorker = createObjectUnderTest(); + Thread spySink = spy(new Thread(producerWorker)); spySink.start(); verify(spySink).start(); } - - - } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java index 820283ad50..0d8b6a4b42 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java @@ -23,7 +23,14 @@ import java.util.Map; import java.util.Optional; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class DLQSinkTest { @@ -49,14 +56,13 @@ public void setUp() throws IOException { MockitoAnnotations.initMocks(this); when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); - pluginSetting=new PluginSetting("kafka-sink",new HashMap<>()); - + pluginSetting = new PluginSetting("kafka-sink", new HashMap<>()); Yaml yaml = new Yaml(); FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); Object data = yaml.load(fileReader); - if(data instanceof Map){ + if (data instanceof Map) { Map propertyMap = (Map) data; Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); Map sinkeMap = (Map) logPipelineMap.get("sink"); @@ -75,9 +81,9 @@ public void setUp() throws IOException { @Test public void testPerform() throws IOException { Object failedData = new Object(); - ReflectionTestUtils.setField(pluginSetting,"pipelineName","test"); + ReflectionTestUtils.setField(pluginSetting, "pipelineName", "test"); doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); - dlqSink.perform(failedData); + dlqSink.perform(failedData, mock(Exception.class)); verify(dlqWriter).write(anyList(), anyString(), anyString()); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java index 11f05f9bd4..d61aff4fb1 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java @@ -17,16 +17,19 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.*; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; -import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; import org.springframework.test.util.ReflectionTestUtils; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; import java.io.Reader; import java.io.StringReader; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; @@ -34,7 +37,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -43,23 +52,17 @@ public class KafkasinkTest { KafkaSink kafkaSink; - // @Mock KafkaSinkConfig kafkaSinkConfig; ExecutorService executorService; - - private SchemaConfig schemaConfig; - @Mock PluginSetting pluginSetting; @Mock FutureTask futureTask; - @Mock - AuthConfig authConfig; Event event; @@ -68,29 +71,11 @@ public class KafkasinkTest { private static final Integer totalWorkers = 1; MockedStatic executorsMockedStatic; - - MockedStatic propertyConfigurerMockedStatic; - - @Mock - PlainTextAuthConfig plainTextAuthConfig; - - @Mock - OAuthConfig oAuthConfig; - - @Mock - private PluginSetting pluginSettingMock; - - @Mock - private KafkaSinkConfig kafkaSinkConfigMock; - @Mock private PluginFactory pluginFactoryMock; Properties props; - @Mock - AwsDLQConfig dlqConfig; - @BeforeEach void setUp() throws Exception { @@ -153,7 +138,7 @@ public void doOutputEmptyRecordsTest() { } @Test - public void shutdownTest() throws InterruptedException { + public void shutdownTest() { spySink.shutdown(); verify(spySink).shutdown(); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java index 0aa5c2d48e..195fb45104 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java @@ -3,7 +3,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; @@ -21,17 +20,14 @@ public class AuthenticationPropertyConfigurerTest { - KafkaSinkConfig kafkaSinkConfig; - - private KafkaSinkConfig createKafkaSinkConfig(String fileName) throws IOException { Yaml yaml = new Yaml(); FileReader fileReader = new FileReader(getClass().getClassLoader().getResource(fileName).getFile()); Object data = yaml.load(fileReader); - if(data instanceof Map){ + if (data instanceof Map) { Map propertyMap = (Map) data; Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); Map sinkeMap = (Map) logPipelineMap.get("sink"); @@ -40,24 +36,25 @@ private KafkaSinkConfig createKafkaSinkConfig(String fileName) throws IOExceptio mapper.registerModule(new JavaTimeModule()); String json = mapper.writeValueAsString(kafkaConfigMap); Reader reader = new StringReader(json); - return mapper.readValue(reader, KafkaSinkConfig.class); + return mapper.readValue(reader, KafkaSinkConfig.class); } return null; } @Test - public void testSetSaslPlainTextProperties() throws IOException { - Properties props=new Properties(); - kafkaSinkConfig=createKafkaSinkConfig("sample-pipelines-sink.yaml"); - AuthenticationPropertyConfigurer.setSaslPlainTextProperties(kafkaSinkConfig,props); - Assertions.assertEquals("PLAIN",props.getProperty("sasl.mechanism")); - } + public void testSetSaslPlainTextProperties() throws IOException { + Properties props = new Properties(); + kafkaSinkConfig = createKafkaSinkConfig("sample-pipelines-sink.yaml"); + AuthenticationPropertyConfigurer.setSaslPlainTextProperties(kafkaSinkConfig, props); + Assertions.assertEquals("PLAIN", props.getProperty("sasl.mechanism")); + } + @Test public void testSetSaslOauthProperties() throws IOException { - Properties props=new Properties(); - kafkaSinkConfig=createKafkaSinkConfig("sample-pipelines-sink-oauth.yaml"); - AuthenticationPropertyConfigurer.setOauthProperties(kafkaSinkConfig,props); - Assertions.assertEquals("OAUTHBEARER",props.getProperty("sasl.mechanism")); + Properties props = new Properties(); + kafkaSinkConfig = createKafkaSinkConfig("sample-pipelines-sink-oauth.yaml"); + AuthenticationPropertyConfigurer.setOauthProperties(kafkaSinkConfig, props); + Assertions.assertEquals("OAUTHBEARER", props.getProperty("sasl.mechanism")); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml index 7f9273aef7..868b560318 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml @@ -1,26 +1,26 @@ log-pipeline : source : random : - sink : - kafka-sink : - bootstrap_servers : - - "localhost:29092" - batch_size: 100 - acks: "acks" - compression_type: "none" - thread_wait_time: 1000 - aws: + sink: + kafka-sink: + bootstrap_servers: + - "localhost:29092" + thread_wait_time: 1000 + dlq: + s3: bucket: "mydlqtestbucket" - sts_role_arn: "arn:aws:iam::045129910014:role/dataprepper" + key_path_prefix: "dlq-files/" + sts_role_arn: "arn:aws:iam::xxxxx:role/dataprepper" region: "ap-south-1" - serde_format: json - topics: - - name: my-test-topic2 - schema: - registry_url: http://localhost:8085/ - version: 1 - authentication: - sasl_oauth: + + serde_format: json + topics: + - name: my-test-topic2 + schema: + registry_url: http://localhost:8085/ + version: 1 + authentication: + sasl_oauth: oauth_client_id: XXXXXXXXXXXXXXX oauth_client_secret: XXXXXXXXXXXXXXX oauth_login_server: https://dev-XXXXXXXXXXXXXXX.okta.com diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml index c9d7feb4da..9c69580769 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml @@ -1,18 +1,18 @@ log-pipeline : source : random : - sink : - kafka-sink : - bootstrap_servers : + sink: + kafka-sink: + bootstrap_servers: - "localhost:29092" - batch_size: 100 - acks: "acks" - compression_type: "none" thread_wait_time: 1000 - aws: - bucket: "mydlqtestbucket" - sts_role_arn: "arn:aws:iam::045129910014:role/dataprepper" - region: "ap-south-1" + dlq: + s3: + bucket: "mydlqtestbucket" + key_path_prefix: "dlq-files/" + sts_role_arn: "arn:aws:iam::xxxxx:role/dataprepper" + region: "ap-south-1" + serde_format: json topics: - name: my-test-topic2 @@ -24,3 +24,4 @@ log-pipeline : username: "broker" password: "broker" +