Skip to content

Commit

Permalink
Added Kafka config to support acknowledgments and MSK arn (#2976)
Browse files Browse the repository at this point in the history
* Added Kafka config to support acknowledgments and MSK arn

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified to use data-prepper-core in testImplementation

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed failing test

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
2 people authored and rajeshLovesToCode committed Jul 5, 2023
1 parent 10eee70 commit 657e89c
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 46 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:blocking-buffer')
testImplementation project(':data-prepper-core')
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;

public class AwsConfig {
@JsonProperty("msk_arn")
@Size(min = 20, max = 2048, message = "mskArn length should be between 20 and 2048 characters")
private String awsMskArn;

public String getAwsMskArn() {
return awsMskArn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import jakarta.validation.constraints.Size;

import java.util.List;
import java.time.Duration;

/**
* * A helper class that helps to read user configuration values from
* pipelines.yaml
*/

public class KafkaSourceConfig {
public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30);

@JsonProperty("bootstrap_servers")
@NotNull
Expand All @@ -36,6 +38,23 @@ public class KafkaSourceConfig {
@JsonProperty("authentication")
private AuthConfig authConfig;

@JsonProperty("aws")
private AwsConfig awsConfig;

@JsonProperty("acknowledgments")
private Boolean acknowledgementsEnabled = false;

@JsonProperty("acknowledgments_timeout")
private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;

public Boolean getAcknowledgementsEnabled() {
return acknowledgementsEnabled;
}

public Duration getAcknowledgementsTimeout() {
return acknowledgementsTimeout;
}

public List<TopicConfig> getTopics() {
return topics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -19,7 +20,10 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,8 +35,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.apache.commons.lang3.Range;

/**
* * A utility class which will handle the core Kafka consumer operation.
Expand All @@ -41,6 +47,10 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class);
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
static final String POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME = "positiveAcknowledgementSetCounter";
static final String NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME = "negativeAcknowledgementSetCounter";
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
private KafkaConsumer consumer= null;
private AtomicBoolean shutdownInProgress;
Expand All @@ -53,33 +63,113 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private static final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();
private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Map<Integer, TopicPartitionCommitTracker> partitionCommitTrackerMap;
private final Counter positiveAcknowledgementSetCounter;
private final Counter negativeAcknowledgementSetCounter;
private final boolean acknowledgementsEnabled;
private final Duration acknowledgementsTimeout;

public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
final Buffer<Record<Event>> buffer,
final KafkaSourceConfig sourceConfig,
final TopicConfig topicConfig,
final String schemaType,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics) {
this.topicName = topicConfig.getName();
this.topicConfig = topicConfig;
this.shutdownInProgress = shutdownInProgress;
this.consumer = consumer;
this.buffer = buffer;
this.offsetsToCommit = new HashMap<>();
this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout();
// If the timeout value is different from default value, then enable acknowledgements automatically.
this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
schema = MessageFormat.getByMessageFormatByName(schemaType);
this.partitionCommitTrackerMap = new HashMap<>();
this.schema = MessageFormat.getByMessageFormatByName(schemaType);
Duration bufferTimeout = Duration.ofSeconds(1);
bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
lastCommitTime = System.currentTimeMillis();
this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
this.lastCommitTime = System.currentTimeMillis();
this.positiveAcknowledgementSetCounter = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME);
this.negativeAcknowledgementSetCounter = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME);
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
if (Objects.isNull(offsetAndMetadata)) {
return;
}
synchronized (this) {
offsetsToCommit.put(partition, offsetAndMetadata);
}
}

private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Long>> offsets) {
AcknowledgementSet acknowledgementSet =
acknowledgementSetManager.create((result) -> {
if (result == true) {
positiveAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;

partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
});
} else {
positiveAcknowledgementSetCounter.increment();
}
}, acknowledgementsTimeout);
return acknowledgementSet;
}

double getPositiveAcknowledgementsCount() {
return positiveAcknowledgementSetCounter.count();
}

public <T> void consumeRecords() throws Exception {
ConsumerRecords<String, T> records =
consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
if (!records.isEmpty() && records.count() > 0) {
Map<TopicPartition, OffsetAndMetadata> offsets = iterateRecordPartitions(records);
offsets.forEach((partition, offset) ->
offsetsToCommit.put(partition, offset));
Map<TopicPartition, Range<Long>> offsets = new HashMap<>();
AcknowledgementSet acknowledgementSet = null;
if (acknowledgementsEnabled) {
acknowledgementSet = createAcknowledgementSet(offsets);
}
iterateRecordPartitions(records, acknowledgementSet, offsets);
if (!acknowledgementsEnabled) {
offsets.forEach((partition, offsetRange) ->
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1)));
}
}
}

private void commitOffsets() {
if (topicConfig.getAutoCommit()) {
return;
}
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < COMMIT_OFFSET_INTERVAL_MS) {
return;
}
synchronized (this) {
if (offsetsToCommit.isEmpty()) {
return;
}
try {
consumer.commitSync();
offsetsToCommit.clear();
lastCommitTime = currentTimeMillis;
} catch (CommitFailedException e) {
LOG.error("Failed to commit offsets in topic "+topicName);
}
}
}

Expand All @@ -93,17 +183,7 @@ public void run() {
consumer.subscribe(Arrays.asList(topicName));
while (!shutdownInProgress.get()) {
consumeRecords();
long currentTimeMillis = System.currentTimeMillis();
if (!topicConfig.getAutoCommit() && !offsetsToCommit.isEmpty() &&
(currentTimeMillis - lastCommitTime) >= COMMIT_OFFSET_INTERVAL_MS) {
try {
consumer.commitSync(offsetsToCommit);
offsetsToCommit.clear();
lastCommitTime = currentTimeMillis;
} catch (CommitFailedException e) {
LOG.error("Failed to commit offsets in topic "+topicName);
}
}
commitOffsets();
}
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic...", exp);
Expand All @@ -113,38 +193,46 @@ public void run() {
private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord) {
Map<String, Object> data = new HashMap<>();
Event event;
Object value;
String key = (String)consumerRecord.key();
if (Objects.isNull(key)) {
key = DEFAULT_KEY;
}
if (schema == MessageFormat.JSON || schema == MessageFormat.AVRO) {
Map<String, Object> message = new HashMap<>();
value = new HashMap<>();
try {
final JsonParser jsonParser = jsonFactory.createParser((String)consumerRecord.value().toString());
message = objectMapper.readValue(jsonParser, Map.class);
value = objectMapper.readValue(jsonParser, Map.class);
} catch (Exception e){
LOG.error("Failed to parse JSON or AVRO record");
return null;
}
data.put(consumerRecord.key(), message);
} else {
data.put(consumerRecord.key(), (String)consumerRecord.value());
value = (String)consumerRecord.value();
}
data.put(key, value);
event = JacksonLog.builder().withData(data).build();
return new Record<Event>(event);
}

private <T> Map<TopicPartition, OffsetAndMetadata> iterateRecordPartitions(ConsumerRecords<String, T> records) throws Exception {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, final AcknowledgementSet acknowledgementSet, Map<TopicPartition, Range<Long>> offsets) throws Exception {
for (TopicPartition topicPartition : records.partitions()) {
List<Record<Event>> kafkaRecords = new ArrayList<>();
List<ConsumerRecord<String, T>> partitionRecords = records.records(topicPartition);
for (ConsumerRecord<String, T> consumerRecord : partitionRecords) {
Record<Event> record = getRecord(consumerRecord);
if (record != null) {
bufferAccumulator.add(record);
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
}
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(topicPartition, new OffsetAndMetadata(lastOffset + 1));
long firstOffset = partitionRecords.get(0).offset();
Range<Long> offsetRange = Range.between(firstOffset, lastOffset);
offsets.put(topicPartition, offsetRange);
}
return offsets;
}

public void closeConsumer(){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.consumer;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.lang3.Range;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class TopicPartitionCommitTracker {
private long committedOffset;
private final TopicPartition topicPartition;
private final Map<Long, Range<Long>> offsetMaxMap;
private final Map<Long, Range<Long>> offsetMinMap;

public TopicPartitionCommitTracker(final TopicPartition topicPartition, Long committedOffset) {
this.topicPartition = topicPartition;
this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset : -1L;
this.offsetMaxMap = new HashMap<>();
this.offsetMinMap = new HashMap<>();
this.offsetMaxMap.put(this.committedOffset, Range.between(this.committedOffset, this.committedOffset));
}

public TopicPartitionCommitTracker(final String topic, final int partition, Long committedOffset) {
this(new TopicPartition(topic, partition), committedOffset);
}

public OffsetAndMetadata addCompletedOffsets(final Range<Long> offsetRange) {
Long min = offsetRange.getMinimum();
Long max = offsetRange.getMaximum();
boolean merged = false;
if (offsetMaxMap.containsKey(min - 1)) {
Range<Long> entry = offsetMaxMap.get(min - 1);
offsetMaxMap.remove(min - 1);
offsetMinMap.remove(entry.getMinimum());
min = entry.getMinimum();
Range<Long> newEntry = Range.between(min, max);
offsetMaxMap.put(max, newEntry);
offsetMinMap.put(min, newEntry);
merged = true;
}
if (offsetMinMap.containsKey(max + 1)) {
Range<Long> entry = offsetMinMap.get(max + 1);
offsetMinMap.remove(max + 1);
if (merged) {
offsetMinMap.remove(min);
offsetMaxMap.remove(max);
}
max = entry.getMaximum();
offsetMaxMap.remove(max);
Range<Long> newEntry = Range.between(min, max);
offsetMaxMap.put(max, newEntry);
offsetMinMap.put(min, newEntry);
merged = true;
}
if (!merged) {
offsetMaxMap.put(max, offsetRange);
offsetMinMap.put(min, offsetRange);
return null;
}
if (offsetMinMap.containsKey(committedOffset)) {
Long maxValue = offsetMinMap.get(committedOffset).getMaximum();
if (maxValue != committedOffset) {
offsetMinMap.remove(committedOffset);
committedOffset = maxValue;
offsetMaxMap.put(committedOffset, Range.between(committedOffset, committedOffset));
return new OffsetAndMetadata(committedOffset + 1);
}
}
return null;
}

}
Loading

0 comments on commit 657e89c

Please sign in to comment.