Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sbose2k21 committed Aug 29, 2024
1 parent 09212fa commit ba2532d
Show file tree
Hide file tree
Showing 30 changed files with 306 additions and 394 deletions.
14 changes: 3 additions & 11 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,16 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:buffer-common')
implementation libs.armeria.core
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation project(':data-prepper-plugins:blocking-buffer')
implementation 'software.amazon.awssdk:kinesis'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation("software.amazon.awssdk:dynamodb")
implementation("com.amazonaws:aws-java-sdk:1.12.394")
implementation project(path: ':data-prepper-plugins:aws-plugin-api')

testImplementation 'org.yaml:snakeyaml:2.2'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class KinesisLeaseConfigExtension implements ExtensionPlugin {
private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
@DataPrepperPluginConstructor
public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfigSupplier = new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig);
this.kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(kinesisLeaseConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

import java.util.Optional;

public interface KinesisLeaseConfigSupplier {
public class KinesisLeaseConfigSupplier {

default Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return Optional.empty();
private KinesisLeaseConfig kinesisLeaseConfig;

public KinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfig = kinesisLeaseConfig;
}

public Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return Optional.ofNullable(kinesisLeaseConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.dataprepper.plugins.kinesis.source;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* Generate a unique ID to represent a consumer application instance.
*/
public class HostNameWorkerIdentifierGenerator implements WorkerIdentifierGenerator {

private static final String hostName;

static {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
}


/**
* @return Default to use host name.
*/
@Override
public String generate() {
return hostName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
Expand All @@ -13,6 +12,7 @@

public class KinesisClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsCredentialsProvider defaultCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;

public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
Expand All @@ -23,27 +23,28 @@ public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions());
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient(Region region) {
return DynamoDbAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}

public KinesisAsyncClient buildKinesisAsyncClient() {
public KinesisAsyncClient buildKinesisAsyncClient(Region region) {
return KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder()
.credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration())
.region(awsAuthenticationConfig.getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.region(region)
);
}

public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) {
return CloudWatchAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class KinesisService {
private final KinesisAsyncClient kinesisClient;
private final DynamoDbAsyncClient dynamoDbClient;
private final CloudWatchAsyncClient cloudWatchClient;
private final WorkerIdentifierGenerator workerIdentifierGenerator;

@Setter
private Scheduler scheduler;
Expand All @@ -59,7 +60,8 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription,
final AcknowledgementSetManager acknowledgementSetManager,
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier,
final WorkerIdentifierGenerator workerIdentifierGenerator
){
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
Expand All @@ -73,10 +75,11 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName();
this.kclMetricsNamespaceName = this.tableName;
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient();
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(sourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.workerIdentifierGenerator = workerIdentifierGenerator;
this.executorService = Executors.newFixedThreadPool(1);
}

Expand Down Expand Up @@ -122,7 +125,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
new WorkerIdentifierGenerator().generate(), processorFactory
workerIdentifierGenerator.generate(), processorFactory
)
.tableName(tableName)
.namespace(kclMetricsNamespaceName);
Expand All @@ -133,7 +136,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
new PollingConfig(kinesisClient)
.maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
sourceConfig.getPollingConfig().getIdleTimeBetweenReadsInMillis()));
sourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
}

return new Scheduler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public KinesisSource(final KinesisSourceConfig kinesisSourceConfig,
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
KinesisClientFactory kinesisClientFactory = new KinesisClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig());
this.kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier);
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, new HostNameWorkerIdentifierGenerator());
}
@Override
public void start(final Buffer<Record<Event>> buffer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,6 @@
package org.opensearch.dataprepper.plugins.kinesis.source;

import java.net.InetAddress;
import java.net.UnknownHostException;
public interface WorkerIdentifierGenerator {

/**
* Generate a unique ID to represent a consumer application instance.
*/
public class WorkerIdentifierGenerator {

private static final String hostName;

static {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
}


/**
* @return Default to use host name.
*/
public String generate() {
return hostName;
}
String generate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import lombok.Getter;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.util.Map;
import java.util.UUID;

public class AwsAuthenticationConfig {
private static final String AWS_IAM_ROLE = "role";
Expand Down Expand Up @@ -45,35 +38,4 @@ public class AwsAuthenticationConfig {
public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {
try {
Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn");
}

final StsClient stsClient = StsClient.builder().region(getAwsRegion()).build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("Kinesis-source-" + UUID.randomUUID()).roleArn(awsStsRoleArn);

if (awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) {
assumeRoleRequestBuilder = assumeRoleRequestBuilder.overrideConfiguration(
configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
awsCredentialsProvider = DefaultCredentialsProvider.create();
}
return awsCredentialsProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.fasterxml.jackson.annotation.JsonValue;

// Reference: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
/**
* @see <a href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Enhanced Consumers</a>
*/

public enum ConsumerStrategy {

POLLING("Polling"),
POLLING("polling"),

ENHANCED_FAN_OUT("Fan-Out");
ENHANCED_FAN_OUT("fan-out");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

import lombok.Getter;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

@Getter
public enum InitialPositionInStreamConfig {
LATEST("latest", InitialPositionInStream.LATEST),
EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON);

private final String position;

private final InitialPositionInStream positionInStream;

InitialPositionInStreamConfig(final String position, final InitialPositionInStream positionInStream) {
this.position = position;
this.positionInStream = positionInStream;
}

private static final Map<String, InitialPositionInStreamConfig> POSITIONS_MAP = Arrays.stream(InitialPositionInStreamConfig.values())
.collect(Collectors.toMap(
value -> value.position,
value -> value
));

public static InitialPositionInStreamConfig fromPositionValue(final String position) {
return POSITIONS_MAP.get(position.toLowerCase());
}

public String toString() {
return this.position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class KinesisSourceConfig {
@JsonProperty("streams")
@NotNull
@Valid
@Size(min = 1, max = 4, message = "Only support a maximum of 4 streams")
@Size(min = 1, max = 4, message = "Provide 1-4 streams to read from.")
private List<KinesisStreamConfig> streams;

@Getter
Expand Down Expand Up @@ -49,6 +49,7 @@ public class KinesisSourceConfig {
private KinesisStreamPollingConfig pollingConfig;

@Getter
@NotNull
@JsonProperty("codec")
private PluginModel codec;

Expand Down
Loading

0 comments on commit ba2532d

Please sign in to comment.