Skip to content

Commit

Permalink
-Support for kafka-sink
Browse files Browse the repository at this point in the history
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode committed Jul 7, 2023
1 parent 070b3be commit 379bc07
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 434 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,10 @@ public class AuthConfig {
@JsonProperty("sasl_oauth")
private OAuthConfig oAuthConfig;

public OAuthConfig getoAuthConfig() {
public OAuthConfig getOAuthConfig() {
return oAuthConfig;
}

public void setoAuthConfig(OAuthConfig oAuthConfig) {
this.oAuthConfig = oAuthConfig;
}

public void setPlainTextAuthConfig(PlainTextAuthConfig plainTextAuthConfig) {
this.plainTextAuthConfig = plainTextAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* * A helper class that helps to read user configuration values from
Expand All @@ -19,29 +24,37 @@

public class KafkaSinkConfig {

public static final String DLQ = "dlq";

@JsonProperty("bootstrap_servers")
@NotNull
@Size(min = 1, message = "Bootstrap servers can't be empty")
private List<String> bootStrapServers;

@JsonProperty("aws")
@NotNull
AwsDLQConfig dlqConfig;
private PluginModel dlq;

@JsonProperty("thread_wait_time")
private Long threadWaitTime;
public Optional<PluginModel> getDlq() {
return Optional.ofNullable(dlq);
}

@JsonProperty("compression_type")
private String compressionType;
public void setDlqConfig(final PluginSetting pluginSetting) {
final LinkedHashMap<String, Map<String, Object>> dlq = (LinkedHashMap) pluginSetting.getAttributeFromSettings(DLQ);
if (dlq != null) {
if (dlq.size() != 1) {
throw new RuntimeException("dlq option must declare exactly one dlq configuration");
}
final Map.Entry<String, Map<String, Object>> entry = dlq.entrySet().stream()
.findFirst()
.get();

@JsonProperty("batch_size")
private Long batchSize;
this.dlq = new PluginModel(entry.getKey(), entry.getValue());

@JsonProperty("max_request_size")
private String maxRequestSize;
}
}

@JsonProperty("acks")
private String acks;

@JsonProperty("thread_wait_time")
private Long threadWaitTime;

@JsonProperty("topics")
private List<TopicConfig> topics;
Expand All @@ -54,30 +67,14 @@ public class KafkaSinkConfig {
@Valid
private SchemaConfig schemaConfig;

@JsonProperty(value = "serde_format",defaultValue = "plaintext")
@JsonProperty(value = "serde_format", defaultValue = "plaintext")
private String serdeFormat;


public SchemaConfig getSchemaConfig() {
return schemaConfig;
}

public String getCompressionType() {
return compressionType;
}

public Long getBatchSize() {
return batchSize;
}

public String getMaxRequestSize() {
return maxRequestSize;
}

public String getAcks() {
return acks;
}


public List<TopicConfig> getTopics() {
return topics;
Expand All @@ -93,10 +90,6 @@ public List<String> getBootStrapServers() {
return bootStrapServers;
}

public AwsDLQConfig getDlqConfig() {
return dlqConfig;
}

public String getSerdeFormat() {
return serdeFormat;
}
Expand All @@ -105,5 +98,27 @@ public Long getThreadWaitTime() {
return threadWaitTime;
}

public void setBootStrapServers(List<String> bootStrapServers) {
this.bootStrapServers = bootStrapServers;
}

public void setThreadWaitTime(Long threadWaitTime) {
this.threadWaitTime = threadWaitTime;
}

public void setAuthConfig(AuthConfig authConfig) {
this.authConfig = authConfig;
}

public void setSchemaConfig(SchemaConfig schemaConfig) {
this.schemaConfig = schemaConfig;
}

public void setSerdeFormat(String serdeFormat) {
this.serdeFormat = serdeFormat;
}

public void setTopics(List<TopicConfig> topics) {
this.topics = topics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.util.Collection;
import java.util.LinkedList;


/**
* * A helper class which helps takes the buffer data
Expand All @@ -35,48 +39,66 @@ public class KafkaSinkProducer<T> {

private final CachedSchemaRegistryClient schemaRegistryClient;

private final Collection<EventHandle> bufferedEventHandles;

public KafkaSinkProducer(final Producer producer,
final KafkaSinkConfig kafkaSinkConfig,
final DLQSink dlqSink) {
this.producer = producer;
this.kafkaSinkConfig = kafkaSinkConfig;
this.dlqSink=dlqSink;;
schemaRegistryClient=getSchemaRegistryClient();
this.dlqSink = dlqSink;
schemaRegistryClient = getSchemaRegistryClient();
bufferedEventHandles = new LinkedList<>();
}

public KafkaSinkProducer(final Producer producer,
final KafkaSinkConfig kafkaSinkConfig,
final DLQSink dlqSink,
final CachedSchemaRegistryClient schemaRegistryClient) {
this.producer = producer;
this.kafkaSinkConfig = kafkaSinkConfig;
this.dlqSink = dlqSink;
this.schemaRegistryClient = schemaRegistryClient;
bufferedEventHandles = new LinkedList<>();
}

public void produceRecords(final Record<Event> record) {
if (record.getData().getEventHandle() != null) {
bufferedEventHandles.add(record.getData().getEventHandle());
}
kafkaSinkConfig.getTopics().forEach(topic -> {
Object dataForDlq = null;
try {
String serdeFormat=kafkaSinkConfig.getSerdeFormat();
final String serdeFormat = kafkaSinkConfig.getSerdeFormat();
if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) {
JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
dataForDlq = dataNode;
producer.send(new ProducerRecord(topic.getName(), dataNode));
}
else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) {
} else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) {
final String valueToParse = schemaRegistryClient.
getLatestSchemaMetadata(topic.getName() + "-value").getSchema();
Schema schema =new Schema.Parser().parse(valueToParse);
GenericRecord genericRecord = getGenericRecord(record.getData(),schema);
final Schema schema = new Schema.Parser().parse(valueToParse);
final GenericRecord genericRecord = getGenericRecord(record.getData(), schema);
dataForDlq = genericRecord;
producer.send(new ProducerRecord(topic.getName(), genericRecord));
} else {
dataForDlq = record.getData().toJsonString();
producer.send(new ProducerRecord(topic.getName(), record.getData().toJsonString()));
}
releaseEventHandles(true);
} catch (Exception e) {
dlqSink.perform(dataForDlq);
dlqSink.perform(dataForDlq, e);
releaseEventHandles(false);
}
});


}

private CachedSchemaRegistryClient getSchemaRegistryClient(){
return new CachedSchemaRegistryClient(kafkaSinkConfig.getSchemaConfig().getRegistryURL(),
private CachedSchemaRegistryClient getSchemaRegistryClient() {

return new CachedSchemaRegistryClient(
kafkaSinkConfig.getSchemaConfig().getRegistryURL(),
100);
}

Expand All @@ -90,5 +112,12 @@ private GenericRecord getGenericRecord(Event event, Schema schema) {
return record;
}

private void releaseEventHandles(final boolean result) {
for (final EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}
bufferedEventHandles.clear();
}


}
Loading

0 comments on commit 379bc07

Please sign in to comment.