From aeb3496000102f13a2b50d092465c8bbfb4a8a5b Mon Sep 17 00:00:00 2001 From: rajeshLovesToCode Date: Mon, 10 Jul 2023 12:51:52 +0530 Subject: [PATCH] -Support for kafka-sink. Signed-off-by: rajeshLovesToCode --- .../kafka/producer/KafkaSinkProducer.java | 2 +- .../kafka/producer/ProducerWorker.java | 6 +-- .../plugins/kafka/sink/DLQSink.java | 29 +++++---------- .../plugins/kafka/sink/KafkaSink.java | 7 ---- .../configuration/KafkaSinkConfigTest.java | 2 - .../kafka/producer/KafkaSinkProducerTest.java | 4 -- .../util/SinkPropertyConfigurerTest.java | 37 ++++++++++--------- 7 files changed, 33 insertions(+), 54 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java index 5070e8e66c..bb92c8afcf 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java @@ -69,7 +69,7 @@ public void produceRecords(final Record record) { kafkaSinkConfig.getTopics().forEach(topic -> { Object dataForDlq = null; try { - final String serdeFormat = kafkaSinkConfig.getSerdeFormat(); + final String serdeFormat = kafkaSinkConfig.getSerdeFormat(); if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) { final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); dataForDlq = dataNode; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java index 3b1eed861b..dbb05e7401 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java @@ -22,12 +22,12 @@ public class ProducerWorker implements Runnable { public ProducerWorker(final KafkaSinkProducer producer, final Record record) { this.record = record; - this.producer=producer; + this.producer = producer; } @Override public void run() { - producer.produceRecords(record); + producer.produceRecords(record); } - } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java index b99a0d8f6c..5b393d3f19 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java @@ -18,20 +18,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -<<<<<<< HEAD import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.StringJoiner; -======= -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.StringJoiner; - ->>>>>>> 03504123 (Fixing import issues) import static java.util.UUID.randomUUID; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; @@ -48,23 +39,23 @@ public class DLQSink { private final DlqProvider dlqProvider; private final PluginSetting pluginSetting; - public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig,final PluginSetting pluginSetting) { - this.pluginSetting=pluginSetting; - this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig); + public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig, final PluginSetting pluginSetting) { + this.pluginSetting = pluginSetting; + this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig); } - public void perform(final Object failedData,final Exception e) { - final DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName()); + public void perform(final Object failedData, final Exception e) { + final DlqWriter dlqWriter = getDlqWriter(); final DlqObject dlqObject = DlqObject.builder() .withPluginId(randomUUID().toString()) .withPluginName(pluginSetting.getName()) .withPipelineName(pluginSetting.getPipelineName()) .withFailedData(failedData) .build(); - logFailureForDlqObjects(dlqWriter, List.of(dlqObject),e ); + logFailureForDlqObjects(dlqWriter, List.of(dlqObject), e); } - private DlqWriter getDlqWriter( final String writerPipelineName) { + private DlqWriter getDlqWriter() { final Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) .add(pluginSetting.getPipelineName()) .add(pluginSetting.getName()).toString()); @@ -72,11 +63,11 @@ private DlqWriter getDlqWriter( final String writerPipelineName) { return dlqWriter; } - private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) { + private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) { final Map props = new HashMap<>(); kafkaSinkConfig.setDlqConfig(pluginSetting); final Optional dlq = kafkaSinkConfig.getDlq(); - if(dlq.isPresent()){ + if (dlq.isPresent()) { final PluginModel dlqPluginModel = dlq.get(); final PluginSetting dlqPluginSetting = new PluginSetting(dlqPluginModel.getPluginName(), dlqPluginModel.getPluginSettings()); dlqPluginSetting.setPipelineName(pluginSetting.getPipelineName()); @@ -86,7 +77,7 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final Kaf return null; } - private void logFailureForDlqObjects(final DlqWriter dlqWriter,final List dlqObjects, final Throwable failure) { + private void logFailureForDlqObjects(final DlqWriter dlqWriter, final List dlqObjects, final Throwable failure) { try { dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginSetting.getName()); dlqObjects.forEach((dlqObject) -> { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index 4f5e5df835..89d5f719b9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -92,14 +92,7 @@ public void doOutput(Collection> records) { final KafkaSinkProducer producer = createProducer(); records.forEach(record -> { producerWorker = new ProducerWorker(producer, record); -<<<<<<< HEAD executorService.submit(producerWorker); -======= - //TODO: uncomment this line after testing as this is the right way to do things - executorService.submit(producerWorker); - //TODO: remove this line after testing as it executes the thread immediately - //executorService.execute(producerWorker); ->>>>>>> 03504123 (Fixing import issues) }); } catch (Exception e) { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java index 4aeae91957..cbae5f882e 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java @@ -20,12 +20,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.notNullValue; - import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; - class KafkaSinkConfigTest { KafkaSinkConfig kafkaSinkConfig; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java index a77531bd8a..5ee3f1faf4 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java @@ -35,17 +35,13 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; - import static org.junit.jupiter.api.Assertions.assertEquals; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java index 9d39d44c19..9bfe6ff06a 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java @@ -33,7 +33,7 @@ public void setUp() throws IOException { Yaml yaml = new Yaml(); FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); Object data = yaml.load(fileReader); - if(data instanceof Map){ + if (data instanceof Map) { Map propertyMap = (Map) data; Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); Map sinkeMap = (Map) logPipelineMap.get("sink"); @@ -47,32 +47,33 @@ public void setUp() throws IOException { } @Test - public void testGetProducerPropertiesForJson(){ - Properties props=SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig); - Assertions.assertEquals("30000",props.getProperty("session.timeout.ms")); - } + public void testGetProducerPropertiesForJson() { + Properties props = SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig); + Assertions.assertEquals("30000", props.getProperty("session.timeout.ms")); + } @Test - public void testGetProducerPropertiesForPlainText(){ - ReflectionTestUtils.setField(kafkaSinkConfig,"serdeFormat","plaintext"); - Assertions.assertThrows(RuntimeException.class,()->SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); + public void testGetProducerPropertiesForPlainText() { + ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", "plaintext"); + Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); } + @Test - public void testGetProducerPropertiesForAvro(){ - ReflectionTestUtils.setField(kafkaSinkConfig,"serdeFormat","avro"); - SchemaConfig schemaConfig=kafkaSinkConfig.getSchemaConfig(); - ReflectionTestUtils.setField(kafkaSinkConfig,"schemaConfig",null); - Assertions.assertThrows(RuntimeException.class,()->SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); - ReflectionTestUtils.setField(kafkaSinkConfig,"schemaConfig",schemaConfig); - Assertions.assertEquals("30000",SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig).getProperty("session.timeout.ms")); + public void testGetProducerPropertiesForAvro() { + ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", "avro"); + SchemaConfig schemaConfig = kafkaSinkConfig.getSchemaConfig(); + ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null); + Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); + ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", schemaConfig); + Assertions.assertEquals("30000", SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig).getProperty("session.timeout.ms")); } @Test - public void testGetProducerPropertiesForNoSerde(){ - ReflectionTestUtils.setField(kafkaSinkConfig,"serdeFormat",null); - Assertions.assertThrows(RuntimeException.class,()->SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); + public void testGetProducerPropertiesForNoSerde() { + ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", null); + Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); }