Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AWS Kinesis Data Streams as a Source #4836

Merged
merged 8 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation project(path: ':data-prepper-plugins:aws-plugin-api')

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
Expand All @@ -25,6 +26,8 @@ dependencies {
testImplementation project(':data-prepper-plugin-framework')
testImplementation project(':data-prepper-pipeline-parser')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:newline-codecs')
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
Expand Down Expand Up @@ -53,18 +57,18 @@ public class KinesisService {
private final String kclMetricsNamespaceName;
private final String pipelineName;
private final AcknowledgementSetManager acknowledgementSetManager;
private final KinesisSourceConfig sourceConfig;
private final KinesisSourceConfig kinesisSourceConfig;
private final KinesisAsyncClient kinesisClient;
private final DynamoDbAsyncClient dynamoDbClient;
private final CloudWatchAsyncClient cloudWatchClient;
private final WorkerIdentifierGenerator workerIdentifierGenerator;
private final InputCodec codec;

@Setter
private Scheduler scheduler;

private final ExecutorService executorService;

public KinesisService(final KinesisSourceConfig sourceConfig,
public KinesisService(final KinesisSourceConfig kinesisSourceConfig,
final KinesisClientFactory kinesisClientFactory,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
Expand All @@ -73,7 +77,7 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier,
final WorkerIdentifierGenerator workerIdentifierGenerator
){
this.sourceConfig = sourceConfig;
this.kinesisSourceConfig = kinesisSourceConfig;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
Expand All @@ -85,21 +89,24 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName();
this.kclMetricsNamespaceName = this.tableName;
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(sourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.workerIdentifierGenerator = workerIdentifierGenerator;
this.executorService = Executors.newFixedThreadPool(1);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this codec the type of data they send to Kinesis? It's not just JSON records from Kinesis streams? Or is it always provided from Kinesis as just bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we read from Kinesis, it always comes in as bytes. Reference Record

final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
}

public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
throw new IllegalStateException("Buffer provided is null.");
}

if (sourceConfig.getStreams() == null || sourceConfig.getStreams().isEmpty()) {
throw new IllegalStateException("Streams are empty!");
if (kinesisSourceConfig.getStreams() == null || kinesisSourceConfig.getStreams().isEmpty()) {
throw new InvalidPluginConfigurationException("No Kinesis streams provided.");
}

scheduler = getScheduler(buffer);
Expand Down Expand Up @@ -129,31 +136,30 @@ public Scheduler getScheduler(final Buffer<Record<Event>> buffer) {

public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
final ShardRecordProcessorFactory processorFactory = new KinesisShardRecordProcessorFactory(
buffer, sourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory);
buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, codec);

ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
new KinesisMultiStreamTracker(kinesisClient, kinesisSourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
workerIdentifierGenerator.generate(), processorFactory
)
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy();
ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords())
.maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
sourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
}

return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public class KinesisStreamConfig {
@JsonProperty("checkpoint_interval")
private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL;

@Getter
@JsonProperty("enable_checkpoint")
private boolean enableCheckPoint = DEFAULT_ENABLE_CHECKPOINT;

public InitialPositionInStream getInitialPosition() {
return initialPosition.getPositionInStream();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class KinesisRecordConverter {

private final InputCodec codec;

public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords) throws IOException {
List<Record<Event>> records = new ArrayList<>();
for (KinesisClientRecord record : kinesisClientRecords) {
processRecord(record, records::add);
}
return records;
}

private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
// Read bytebuffer
byte[] arr = new byte[record.data().remaining()];
record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
codec.parse(byteArrayInputStream, eventConsumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@Builder
@Getter
@Setter
public class KinesisCheckpointerRecord {
private RecordProcessorCheckpointer checkpointer;
private ExtendedSequenceNumber extendedSequenceNumber;
private boolean readyToCheckpoint;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class KinesisCheckpointerTracker {
private final Map<ExtendedSequenceNumber, KinesisCheckpointerRecord> checkpointerRecordList = new LinkedHashMap<>();

public synchronized void addRecordForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber,
final RecordProcessorCheckpointer checkpointer) {
checkpointerRecordList.put(extendedSequenceNumber, KinesisCheckpointerRecord.builder()
.extendedSequenceNumber(extendedSequenceNumber)
.checkpointer(checkpointer)
.readyToCheckpoint(false)
.build());
}

public synchronized void markSequenceNumberForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber) {
if (!checkpointerRecordList.containsKey(extendedSequenceNumber)) {
throw new IllegalArgumentException("checkpointer not available");
}
checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true);
}

public synchronized Optional<KinesisCheckpointerRecord> getLatestAvailableCheckpointRecord() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this to popLatestAvailableCheckpointRecord or something similar. It is mutating the structure and a get method doesn't convey that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed this method to popLatestReadyToCheckpointRecord

Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = Optional.empty();
List<ExtendedSequenceNumber> toRemoveRecords = new ArrayList<>();

for (Map.Entry<ExtendedSequenceNumber, KinesisCheckpointerRecord> entry: checkpointerRecordList.entrySet()) {
KinesisCheckpointerRecord kinesisCheckpointerRecord = entry.getValue();

// Break out of the loop on the first record which is not ready for checkpoint
if (!kinesisCheckpointerRecord.isReadyToCheckpoint()) {
break;
}

kinesisCheckpointerRecordOptional = Optional.of(kinesisCheckpointerRecord);
toRemoveRecords.add(entry.getKey());
}

//Cleanup the ones which are already marked for checkpoint
for (ExtendedSequenceNumber extendedSequenceNumber: toRemoveRecords) {
checkpointerRecordList.remove(extendedSequenceNumber);
}

return kinesisCheckpointerRecordOptional;
}

public synchronized int size() {
return checkpointerRecordList.size();
}
}
Loading
Loading