Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AWS Kinesis Data Streams as a Source #4836

Merged
merged 8 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation project(':data-prepper-pipeline-parser')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:newline-codecs')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 1.0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every class needs a license header. Please add to all files.

See: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

Hint: If you use IntelliJ you can configure it to automatically add this. And you can have it add the headers to all the files in this Gradle module (since they are all new).


import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;

@Getter
public class KinesisLeaseConfig {
@JsonProperty("lease_coordination")
private KinesisLeaseCoordinationTableConfig leaseCoordinationTable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.opensearch.dataprepper.model.plugin.ExtensionPoints;

@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis", allowInPipelineConfigurations = true)
public class KinesisLeaseConfigExtension implements ExtensionPlugin {

private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
@DataPrepperPluginConstructor
public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(kinesisLeaseConfig);
}

@Override
public void apply(final ExtensionPoints extensionPoints) {
extensionPoints.addExtensionProvider(new KinesisLeaseConfigProvider(this.kinesisLeaseConfigSupplier));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.plugin.ExtensionProvider;

import java.util.Optional;

class KinesisLeaseConfigProvider implements ExtensionProvider<KinesisLeaseConfigSupplier> {
private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;

public KinesisLeaseConfigProvider(final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) {
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
}

@Override
public Optional<KinesisLeaseConfigSupplier> provideInstance(Context context) {
return Optional.of(this.kinesisLeaseConfigSupplier);
}

@Override
public Class<KinesisLeaseConfigSupplier> supportedClass() {
return KinesisLeaseConfigSupplier.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import java.util.Optional;

public class KinesisLeaseConfigSupplier {

private KinesisLeaseConfig kinesisLeaseConfig;

public KinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfig = kinesisLeaseConfig;
}

public Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return Optional.ofNullable(kinesisLeaseConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.awssdk.regions.Region;

@Getter
public class KinesisLeaseCoordinationTableConfig {

@JsonProperty("table_name")
@NonNull
private String tableName;
dlvenable marked this conversation as resolved.
Show resolved Hide resolved

@JsonProperty("region")
@NonNull
sb2k16 marked this conversation as resolved.
Show resolved Hide resolved
private String region;

public Region getAwsRegion() {
return Region.of(region);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* Generate a unique ID to represent a consumer application instance.
*/
public class HostNameWorkerIdentifierGenerator implements WorkerIdentifierGenerator {

private static final String hostName;

static {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
}


/**
* @return Default to use host name.
*/
@Override
public String generate() {
return hostName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.KinesisClientUtil;

public class KinesisClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsCredentialsProvider defaultCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;

public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions());
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient(Region region) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it recommended to use the Async client? If so why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0 for your review. This is required to construct ConfigsBuilder class in KCL

return DynamoDbAsyncClient.builder()
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}

public KinesisAsyncClient buildKinesisAsyncClient(Region region) {
return KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder()
.credentialsProvider(awsCredentialsProvider)
.region(region)
);
}

public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) {
return CloudWatchAsyncClient.builder()
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.arn.Arn;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;


public class KinesisMultiStreamTracker implements MultiStreamTracker {
private static final String COLON = ":";

private final KinesisAsyncClient kinesisClient;
private final KinesisSourceConfig sourceConfig;
private final String applicationName;

public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) {
this.kinesisClient = kinesisClient;
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
}

@Override
public List<StreamConfig> streamConfigList() {
List<StreamConfig> streamConfigList = new ArrayList<>();
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
StreamConfig streamConfig = getStreamConfig(kinesisStreamConfig);
streamConfigList.add(streamConfig);
}
return streamConfigList;
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).join();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = Arn.fromString(streamDescription.streamARN()).getAccountId();
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}

/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofSeconds(10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are trade offs for having this higher or lower?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the strategy to perform cleaning up of the leases for streams that just has been deleted. The wait period is to allow for any pending processing of records before cleaning up the lease from the database.

}
};

}
}
Loading
Loading