Skip to content

Commit

Permalink
-Support for kafka-sink.
Browse files Browse the repository at this point in the history
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode committed Jul 10, 2023
1 parent 1139aec commit aeb3496
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void produceRecords(final Record<Event> record) {
kafkaSinkConfig.getTopics().forEach(topic -> {
Object dataForDlq = null;
try {
final String serdeFormat = kafkaSinkConfig.getSerdeFormat();
final String serdeFormat = kafkaSinkConfig.getSerdeFormat();
if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) {
final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
dataForDlq = dataNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ public class ProducerWorker implements Runnable {
public ProducerWorker(final KafkaSinkProducer producer,
final Record<Event> record) {
this.record = record;
this.producer=producer;
this.producer = producer;
}

@Override
public void run() {
producer.produceRecords(record);
producer.produceRecords(record);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
<<<<<<< HEAD
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
=======
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;

>>>>>>> 03504123 (Fixing import issues)

import static java.util.UUID.randomUUID;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;
Expand All @@ -48,35 +39,35 @@ public class DLQSink {
private final DlqProvider dlqProvider;
private final PluginSetting pluginSetting;

public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig,final PluginSetting pluginSetting) {
this.pluginSetting=pluginSetting;
this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig);
public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig, final PluginSetting pluginSetting) {
this.pluginSetting = pluginSetting;
this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig);
}

public void perform(final Object failedData,final Exception e) {
final DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName());
public void perform(final Object failedData, final Exception e) {
final DlqWriter dlqWriter = getDlqWriter();
final DlqObject dlqObject = DlqObject.builder()
.withPluginId(randomUUID().toString())
.withPluginName(pluginSetting.getName())
.withPipelineName(pluginSetting.getPipelineName())
.withFailedData(failedData)
.build();
logFailureForDlqObjects(dlqWriter, List.of(dlqObject),e );
logFailureForDlqObjects(dlqWriter, List.of(dlqObject), e);
}

private DlqWriter getDlqWriter( final String writerPipelineName) {
private DlqWriter getDlqWriter() {
final Optional<DlqWriter> potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER)
.add(pluginSetting.getPipelineName())
.add(pluginSetting.getName()).toString());
final DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null;
return dlqWriter;
}

private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) {
private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) {
final Map<String, Object> props = new HashMap<>();
kafkaSinkConfig.setDlqConfig(pluginSetting);
final Optional<PluginModel> dlq = kafkaSinkConfig.getDlq();
if(dlq.isPresent()){
if (dlq.isPresent()) {
final PluginModel dlqPluginModel = dlq.get();
final PluginSetting dlqPluginSetting = new PluginSetting(dlqPluginModel.getPluginName(), dlqPluginModel.getPluginSettings());
dlqPluginSetting.setPipelineName(pluginSetting.getPipelineName());
Expand All @@ -86,7 +77,7 @@ private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final Kaf
return null;
}

private void logFailureForDlqObjects(final DlqWriter dlqWriter,final List<DlqObject> dlqObjects, final Throwable failure) {
private void logFailureForDlqObjects(final DlqWriter dlqWriter, final List<DlqObject> dlqObjects, final Throwable failure) {
try {
dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginSetting.getName());
dlqObjects.forEach((dlqObject) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,7 @@ public void doOutput(Collection<Record<Event>> records) {
final KafkaSinkProducer producer = createProducer();
records.forEach(record -> {
producerWorker = new ProducerWorker(producer, record);
<<<<<<< HEAD
executorService.submit(producerWorker);
=======
//TODO: uncomment this line after testing as this is the right way to do things
executorService.submit(producerWorker);
//TODO: remove this line after testing as it executes the thread immediately
//executorService.execute(producerWorker);
>>>>>>> 03504123 (Fixing import issues)
});

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;



class KafkaSinkConfigTest {

KafkaSinkConfig kafkaSinkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,13 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;


import static org.junit.jupiter.api.Assertions.assertEquals;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;




@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void setUp() throws IOException {
Yaml yaml = new Yaml();
FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile());
Object data = yaml.load(fileReader);
if(data instanceof Map){
if (data instanceof Map) {
Map<String, Object> propertyMap = (Map<String, Object>) data;
Map<String, Object> logPipelineMap = (Map<String, Object>) propertyMap.get("log-pipeline");
Map<String, Object> sinkeMap = (Map<String, Object>) logPipelineMap.get("sink");
Expand All @@ -47,32 +47,33 @@ public void setUp() throws IOException {
}

@Test
public void testGetProducerPropertiesForJson(){
Properties props=SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig);
Assertions.assertEquals("30000",props.getProperty("session.timeout.ms"));
}
public void testGetProducerPropertiesForJson() {
Properties props = SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig);
Assertions.assertEquals("30000", props.getProperty("session.timeout.ms"));
}

@Test
public void testGetProducerPropertiesForPlainText(){
ReflectionTestUtils.setField(kafkaSinkConfig,"serdeFormat","plaintext");
Assertions.assertThrows(RuntimeException.class,()->SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig));
public void testGetProducerPropertiesForPlainText() {
ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", "plaintext");
Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig));

}

@Test
public void testGetProducerPropertiesForAvro(){
ReflectionTestUtils.setField(kafkaSinkConfig,"serdeFormat","avro");
SchemaConfig schemaConfig=kafkaSinkConfig.getSchemaConfig();
ReflectionTestUtils.setField(kafkaSinkConfig,"schemaConfig",null);
Assertions.assertThrows(RuntimeException.class,()->SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig));
ReflectionTestUtils.setField(kafkaSinkConfig,"schemaConfig",schemaConfig);
Assertions.assertEquals("30000",SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig).getProperty("session.timeout.ms"));
public void testGetProducerPropertiesForAvro() {
ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", "avro");
SchemaConfig schemaConfig = kafkaSinkConfig.getSchemaConfig();
ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null);
Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig));
ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", schemaConfig);
Assertions.assertEquals("30000", SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig).getProperty("session.timeout.ms"));

}

@Test
public void testGetProducerPropertiesForNoSerde(){
ReflectionTestUtils.setField(kafkaSinkConfig,"serdeFormat",null);
Assertions.assertThrows(RuntimeException.class,()->SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig));
public void testGetProducerPropertiesForNoSerde() {
ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", null);
Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig));

}

Expand Down

0 comments on commit aeb3496

Please sign in to comment.