diff --git a/data-prepper-plugins/kinesis-source/build.gradle b/data-prepper-plugins/kinesis-source/build.gradle index f5bb41385a..ae380ba0e1 100644 --- a/data-prepper-plugins/kinesis-source/build.gradle +++ b/data-prepper-plugins/kinesis-source/build.gradle @@ -10,24 +10,16 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') - implementation project(':data-prepper-plugins:buffer-common') - implementation libs.armeria.core implementation 'com.fasterxml.jackson.core:jackson-core' - implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation project(':data-prepper-plugins:blocking-buffer') - implementation 'software.amazon.awssdk:kinesis' - implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' + implementation 'io.micrometer:micrometer-core' implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0' compileOnly 'org.projectlombok:lombok:1.18.20' annotationProcessor 'org.projectlombok:lombok:1.18.20' - implementation("software.amazon.awssdk:dynamodb") - implementation("com.amazonaws:aws-java-sdk:1.12.394") implementation project(path: ':data-prepper-plugins:aws-plugin-api') - testImplementation 'org.yaml:snakeyaml:2.2' + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' testImplementation project(':data-prepper-test-common') - testImplementation platform('org.junit:junit-bom:5.9.1') - testImplementation 'org.junit.jupiter:junit-jupiter' testImplementation project(':data-prepper-test-event') testImplementation project(':data-prepper-core') testImplementation project(':data-prepper-plugin-framework') diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java deleted file mode 100644 index f7e4bd9e87..0000000000 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.opensearch.dataprepper.plugins.kinesis.extension; - -import java.util.Optional; - -public class DefaultKinesisLeaseConfigSupplier implements KinesisLeaseConfigSupplier { - - private KinesisLeaseConfig kinesisLeaseConfig; - - public DefaultKinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) { - this.kinesisLeaseConfig = kinesisLeaseConfig; - } - - @Override - public Optional getKinesisExtensionLeaseConfig() { - return kinesisLeaseConfig != null ? Optional.of(kinesisLeaseConfig) : Optional.empty(); - } -} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java index c5b4b03299..e2cf1ad475 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java @@ -11,7 +11,7 @@ public class KinesisLeaseConfigExtension implements ExtensionPlugin { private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; @DataPrepperPluginConstructor public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) { - this.kinesisLeaseConfigSupplier = new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig); + this.kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(kinesisLeaseConfig); } @Override diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java index 3db85d3ca5..6d61e3dd78 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java @@ -2,9 +2,15 @@ import java.util.Optional; -public interface KinesisLeaseConfigSupplier { +public class KinesisLeaseConfigSupplier { - default Optional getKinesisExtensionLeaseConfig() { - return Optional.empty(); + private KinesisLeaseConfig kinesisLeaseConfig; + + public KinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) { + this.kinesisLeaseConfig = kinesisLeaseConfig; + } + + public Optional getKinesisExtensionLeaseConfig() { + return Optional.ofNullable(kinesisLeaseConfig); } } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/HostNameWorkerIdentifierGenerator.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/HostNameWorkerIdentifierGenerator.java new file mode 100644 index 0000000000..962b9f5d82 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/HostNameWorkerIdentifierGenerator.java @@ -0,0 +1,29 @@ +package org.opensearch.dataprepper.plugins.kinesis.source; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * Generate a unique ID to represent a consumer application instance. + */ +public class HostNameWorkerIdentifierGenerator implements WorkerIdentifierGenerator { + + private static final String hostName; + + static { + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (final UnknownHostException e) { + throw new RuntimeException(e); + } + } + + + /** + * @return Default to use host name. + */ + @Override + public String generate() { + return hostName; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java index e85b66aa37..ed699bae20 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java @@ -4,7 +4,6 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -13,6 +12,7 @@ public class KinesisClientFactory { private final AwsCredentialsProvider awsCredentialsProvider; + private final AwsCredentialsProvider defaultCredentialsProvider; private final AwsAuthenticationConfig awsAuthenticationConfig; public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, @@ -23,27 +23,28 @@ public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, .withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId()) .withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides()) .build()); + defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions()); this.awsAuthenticationConfig = awsAuthenticationConfig; } public DynamoDbAsyncClient buildDynamoDBClient(Region region) { return DynamoDbAsyncClient.builder() - .credentialsProvider(DefaultCredentialsProvider.create()) + .credentialsProvider(defaultCredentialsProvider) .region(region) .build(); } - public KinesisAsyncClient buildKinesisAsyncClient() { + public KinesisAsyncClient buildKinesisAsyncClient(Region region) { return KinesisClientUtil.createKinesisAsyncClient( KinesisAsyncClient.builder() - .credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration()) - .region(awsAuthenticationConfig.getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .region(region) ); } public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) { return CloudWatchAsyncClient.builder() - .credentialsProvider(DefaultCredentialsProvider.create()) + .credentialsProvider(defaultCredentialsProvider) .region(region) .build(); } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index cf2f96d42f..33cf786884 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -47,6 +47,7 @@ public class KinesisService { private final KinesisAsyncClient kinesisClient; private final DynamoDbAsyncClient dynamoDbClient; private final CloudWatchAsyncClient cloudWatchClient; + private final WorkerIdentifierGenerator workerIdentifierGenerator; @Setter private Scheduler scheduler; @@ -59,7 +60,8 @@ public KinesisService(final KinesisSourceConfig sourceConfig, final PluginFactory pluginFactory, final PipelineDescription pipelineDescription, final AcknowledgementSetManager acknowledgementSetManager, - final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier + final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier, + final WorkerIdentifierGenerator workerIdentifierGenerator ){ this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; @@ -73,10 +75,11 @@ public KinesisService(final KinesisSourceConfig sourceConfig, this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(); this.kclMetricsNamespaceName = this.tableName; this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); - this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(); + this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(sourceConfig.getAwsAuthenticationConfig().getAwsRegion()); this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); this.pipelineName = pipelineDescription.getPipelineName(); this.applicationName = pipelineName; + this.workerIdentifierGenerator = workerIdentifierGenerator; this.executorService = Executors.newFixedThreadPool(1); } @@ -122,7 +125,7 @@ public Scheduler createScheduler(final Buffer> buffer) { new ConfigsBuilder( new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName), applicationName, kinesisClient, dynamoDbClient, cloudWatchClient, - new WorkerIdentifierGenerator().generate(), processorFactory + workerIdentifierGenerator.generate(), processorFactory ) .tableName(tableName) .namespace(kclMetricsNamespaceName); @@ -133,7 +136,7 @@ public Scheduler createScheduler(final Buffer> buffer) { new PollingConfig(kinesisClient) .maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords()) .idleTimeBetweenReadsInMillis( - sourceConfig.getPollingConfig().getIdleTimeBetweenReadsInMillis())); + sourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis())); } return new Scheduler( diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java index 16b25d072b..afca88ba3d 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java @@ -38,7 +38,7 @@ public KinesisSource(final KinesisSourceConfig kinesisSourceConfig, this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier; KinesisClientFactory kinesisClientFactory = new KinesisClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig()); this.kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, - pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, new HostNameWorkerIdentifierGenerator()); } @Override public void start(final Buffer> buffer) { diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java index 7686861a87..91baa9a831 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java @@ -1,28 +1,6 @@ package org.opensearch.dataprepper.plugins.kinesis.source; -import java.net.InetAddress; -import java.net.UnknownHostException; +public interface WorkerIdentifierGenerator { -/** - * Generate a unique ID to represent a consumer application instance. - */ -public class WorkerIdentifierGenerator { - - private static final String hostName; - - static { - try { - hostName = InetAddress.getLocalHost().getHostName(); - } catch (final UnknownHostException e) { - throw new RuntimeException(e); - } - } - - - /** - * @return Default to use host name. - */ - public String generate() { - return hostName; - } + String generate(); } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java index e70ec337e7..25a1db6fc5 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java @@ -8,16 +8,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; import lombok.Getter; -import software.amazon.awssdk.arns.Arn; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sts.StsClient; -import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; -import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import java.util.Map; -import java.util.UUID; public class AwsAuthenticationConfig { private static final String AWS_IAM_ROLE = "role"; @@ -45,35 +38,4 @@ public class AwsAuthenticationConfig { public Region getAwsRegion() { return awsRegion != null ? Region.of(awsRegion) : null; } - - public AwsCredentialsProvider authenticateAwsConfiguration() { - - final AwsCredentialsProvider awsCredentialsProvider; - if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { - try { - Arn.fromString(awsStsRoleArn); - } catch (final Exception e) { - throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); - } - - final StsClient stsClient = StsClient.builder().region(getAwsRegion()).build(); - - AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() - .roleSessionName("Kinesis-source-" + UUID.randomUUID()).roleArn(awsStsRoleArn); - - if (awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { - assumeRoleRequestBuilder = assumeRoleRequestBuilder.overrideConfiguration( - configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); - } - - awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() - .stsClient(stsClient) - .refreshRequest(assumeRoleRequestBuilder.build()) - .build(); - - } else { - awsCredentialsProvider = DefaultCredentialsProvider.create(); - } - return awsCredentialsProvider; - } } \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java index fa507f55d5..59a2fd2522 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java @@ -2,13 +2,15 @@ import com.fasterxml.jackson.annotation.JsonValue; -// Reference: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html +/** + * @see Enhanced Consumers + */ public enum ConsumerStrategy { - POLLING("Polling"), + POLLING("polling"), - ENHANCED_FAN_OUT("Fan-Out"); + ENHANCED_FAN_OUT("fan-out"); private final String value; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfig.java new file mode 100644 index 0000000000..058be8109c --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfig.java @@ -0,0 +1,37 @@ +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; + +import lombok.Getter; +import software.amazon.kinesis.common.InitialPositionInStream; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +@Getter +public enum InitialPositionInStreamConfig { + LATEST("latest", InitialPositionInStream.LATEST), + EARLIEST("earliest", InitialPositionInStream.TRIM_HORIZON); + + private final String position; + + private final InitialPositionInStream positionInStream; + + InitialPositionInStreamConfig(final String position, final InitialPositionInStream positionInStream) { + this.position = position; + this.positionInStream = positionInStream; + } + + private static final Map POSITIONS_MAP = Arrays.stream(InitialPositionInStreamConfig.values()) + .collect(Collectors.toMap( + value -> value.position, + value -> value + )); + + public static InitialPositionInStreamConfig fromPositionValue(final String position) { + return POSITIONS_MAP.get(position.toLowerCase()); + } + + public String toString() { + return this.position; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java index 4be5bb7f95..c8aeb01bfd 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java @@ -19,7 +19,7 @@ public class KinesisSourceConfig { @JsonProperty("streams") @NotNull @Valid - @Size(min = 1, max = 4, message = "Only support a maximum of 4 streams") + @Size(min = 1, max = 4, message = "Provide 1-4 streams to read from.") private List streams; @Getter @@ -49,6 +49,7 @@ public class KinesisSourceConfig { private KinesisStreamPollingConfig pollingConfig; @Getter + @NotNull @JsonProperty("codec") private PluginModel codec; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java index 3faf55469c..a336acece3 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java @@ -6,10 +6,12 @@ import lombok.Getter; import software.amazon.kinesis.common.InitialPositionInStream; +import java.time.Duration; + @Getter public class KinesisStreamConfig { // Checkpointing interval - private static final int MINIMAL_CHECKPOINT_INTERVAL_MILLIS = 2 * 60 * 1000; // 2 minute + private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute private static final boolean DEFAULT_ENABLE_CHECKPOINT = false; @JsonProperty("stream_name") @@ -17,16 +19,17 @@ public class KinesisStreamConfig { @Valid private String name; - @JsonProperty("stream_arn") - private String arn; - @JsonProperty("initial_position") - private InitialPositionInStream initialPosition = InitialPositionInStream.LATEST; + private InitialPositionInStreamConfig initialPosition = InitialPositionInStreamConfig.LATEST; @JsonProperty("checkpoint_interval") - private int checkPointIntervalInMilliseconds = MINIMAL_CHECKPOINT_INTERVAL_MILLIS; + private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL; @Getter - @JsonProperty("enableCheckpoint") + @JsonProperty("enable_checkpoint") private boolean enableCheckPoint = DEFAULT_ENABLE_CHECKPOINT; + + public InitialPositionInStream getInitialPosition() { + return initialPosition.getPositionInStream(); + } } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java index 36dbedbdd6..b205df2b2f 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java @@ -3,15 +3,17 @@ import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; +import java.time.Duration; + public class KinesisStreamPollingConfig { private static final int DEFAULT_MAX_RECORDS = 10000; - private static final int IDLE_TIME_BETWEEN_READS_IN_MILLIS = 250; + private static final Duration IDLE_TIME_BETWEEN_READS = Duration.ofMillis(250); @Getter - @JsonProperty("maxPollingRecords") + @JsonProperty("max_polling_records") private int maxPollingRecords = DEFAULT_MAX_RECORDS; @Getter - @JsonProperty("idleTimeBetweenReadsInMillis") - private int idleTimeBetweenReadsInMillis = IDLE_TIME_BETWEEN_READS_IN_MILLIS; + @JsonProperty("idle_time_between_reads") + private Duration idleTimeBetweenReads = IDLE_TIME_BETWEEN_READS; } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index ada8094908..956ba88f7a 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -41,7 +41,7 @@ public class KinesisRecordProcessor implements ShardRecordProcessor { private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); private final StreamIdentifier streamIdentifier; private final KinesisStreamConfig kinesisStreamConfig; - private final int checkpointIntervalMilliSeconds; + private final Duration checkpointInterval; private final KinesisSourceConfig kinesisSourceConfig; private final Buffer> buffer; private String kinesisShardId; @@ -75,7 +75,7 @@ public KinesisRecordProcessor(Buffer> buffer, this.acknowledgementSetCallbackCounter = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); - this.checkpointIntervalMilliSeconds = kinesisStreamConfig.getCheckPointIntervalInMilliseconds(); + this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval(); this.buffer = buffer; } @@ -87,7 +87,7 @@ private KinesisStreamConfig getStreamConfig(final KinesisSourceConfig kinesisSou public void initialize(InitializationInput initializationInput) { // Called once when the processor is initialized. kinesisShardId = initializationInput.shardId(); - LOG.info("Initialize Processor for shard: " + kinesisShardId); + LOG.info("Initialize Processor for shard: {}", kinesisShardId); lastCheckpointTimeInMillis = System.currentTimeMillis(); } @@ -126,7 +126,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { acknowledgementSetOpt.ifPresent(AcknowledgementSet::complete); // Checkpoint for shard - if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointIntervalMilliSeconds) { + if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointInterval.toMillis()) { LOG.info("Regular checkpointing for shard " + kinesisShardId); checkpoint(processRecordsInput.checkpointer()); lastCheckpointTimeInMillis = System.currentTimeMillis(); @@ -152,7 +152,7 @@ public void leaseLost(LeaseLostInput leaseLostInput) { @Override public void shardEnded(ShardEndedInput shardEndedInput) { - LOG.info("Reached shard end, checkpointing shard: {}", kinesisShardId); + LOG.debug("Reached shard end, checkpointing shard: {}", kinesisShardId); checkpoint(shardEndedInput.checkpointer()); } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java similarity index 84% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java index dc9bbf3fbe..c3eeb28d40 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java @@ -12,7 +12,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class DefaultKinesisLeaseConfigSupplierTest { +public class KinesisLeaseConfigSupplierTest { private static final String LEASE_COORDINATION_TABLE = "lease-table"; @Mock KinesisLeaseConfig kinesisLeaseConfig; @@ -20,8 +20,8 @@ public class DefaultKinesisLeaseConfigSupplierTest { @Mock KinesisLeaseCoordinationTableConfig kinesisLeaseCoordinationTableConfig; - private DefaultKinesisLeaseConfigSupplier createObjectUnderTest() { - return new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig); + private KinesisLeaseConfigSupplier createObjectUnderTest() { + return new KinesisLeaseConfigSupplier(kinesisLeaseConfig); } @Test @@ -37,14 +37,14 @@ void testGetters() { @Test void testGettersWithNullTableConfig() { when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(null); - DefaultKinesisLeaseConfigSupplier defaultKinesisLeaseConfigSupplier = createObjectUnderTest(); + KinesisLeaseConfigSupplier defaultKinesisLeaseConfigSupplier = createObjectUnderTest(); assertThat(defaultKinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable(), equalTo(null)); } @Test void testGettersWithNullConfig() { - KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = new DefaultKinesisLeaseConfigSupplier(null); + KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(null); assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig(), equalTo(Optional.empty())); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java index 5c6e298760..fc81595f82 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java @@ -1,9 +1,11 @@ package org.opensearch.dataprepper.plugins.kinesis.source; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -14,7 +16,9 @@ import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class KinesisClientFactoryTest { private Region region = Region.US_EAST_1; @@ -33,12 +37,15 @@ void testCreateClient() throws NoSuchFieldException, IllegalAccessException { ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", "us-east-1"); ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", roleArn); + AwsCredentialsProvider defaultCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(eq(AwsCredentialsOptions.defaultOptions()))).thenReturn(defaultCredentialsProvider); + KinesisClientFactory clientFactory = new KinesisClientFactory(awsCredentialsSupplier, awsAuthenticationOptionsConfig); final DynamoDbAsyncClient dynamoDbAsyncClient = clientFactory.buildDynamoDBClient(Region.US_EAST_1); assertNotNull(dynamoDbAsyncClient); - final KinesisAsyncClient kinesisAsyncClient = clientFactory.buildKinesisAsyncClient(); + final KinesisAsyncClient kinesisAsyncClient = clientFactory.buildKinesisAsyncClient(Region.US_EAST_1); assertNotNull(kinesisAsyncClient); final CloudWatchAsyncClient cloudWatchAsyncClient = clientFactory.buildCloudWatchAsyncClient(Region.US_EAST_1); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index d5d8a93c4f..541fd10bd5 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -30,6 +30,7 @@ import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.metrics.MetricsLevel; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -53,7 +54,7 @@ public class KinesisServiceTest { private final String PIPELINE_NAME = "kinesis-pipeline-test"; private final String streamId = "stream-1"; - private static final int CHECKPOINT_INTERVAL_MS = 0; + private static final Duration CHECKPOINT_INTERVAL = Duration.ofMillis(0); private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10; private static final int DEFAULT_MAX_RECORDS = 10000; private static final int IDLE_TIME_BETWEEN_READS_IN_MILLIS = 250; @@ -111,6 +112,9 @@ public class KinesisServiceTest { @Mock KinesisLeaseCoordinationTableConfig kinesisLeaseCoordinationTableConfig; + @Mock + WorkerIdentifierGenerator workerIdentifierGenerator; + @BeforeEach void setup() { awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); @@ -126,6 +130,7 @@ void setup() { buffer = mock(Buffer.class); kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class); kinesisLeaseConfig = mock(KinesisLeaseConfig.class); + workerIdentifierGenerator = mock(WorkerIdentifierGenerator.class); kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class); when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig); when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("kinesis-lease-table"); @@ -156,12 +161,12 @@ void setup() { when(kinesisSourceConfig.getAwsAuthenticationConfig()).thenReturn(awsAuthenticationConfig); when(kinesisStreamConfig.getName()).thenReturn(streamId); - when(kinesisStreamConfig.getCheckPointIntervalInMilliseconds()).thenReturn(CHECKPOINT_INTERVAL_MS); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(CHECKPOINT_INTERVAL); when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.LATEST); - when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); + when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.ENHANCED_FAN_OUT); when(kinesisSourceConfig.getPollingConfig()).thenReturn(kinesisStreamPollingConfig); when(kinesisStreamPollingConfig.getMaxPollingRecords()).thenReturn(DEFAULT_MAX_RECORDS); - when(kinesisStreamPollingConfig.getIdleTimeBetweenReadsInMillis()).thenReturn(IDLE_TIME_BETWEEN_READS_IN_MILLIS); + when(kinesisStreamPollingConfig.getIdleTimeBetweenReads()).thenReturn(Duration.ofMillis(IDLE_TIME_BETWEEN_READS_IN_MILLIS)); List streamConfigs = new ArrayList<>(); streamConfigs.add(kinesisStreamConfig); @@ -169,16 +174,17 @@ void setup() { when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(dynamoDbClient); - when(kinesisClientFactory.buildKinesisAsyncClient()).thenReturn(kinesisClient); + when(kinesisClientFactory.buildKinesisAsyncClient(awsAuthenticationConfig.getAwsRegion())).thenReturn(kinesisClient); when(kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient); when(kinesisClient.serviceClientConfiguration()).thenReturn(KinesisServiceClientConfiguration.builder().region(Region.US_EAST_1).build()); when(scheduler.startGracefulShutdown()).thenReturn(CompletableFuture.completedFuture(true)); when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); + when(workerIdentifierGenerator.generate()).thenReturn(UUID.randomUUID().toString()); } public KinesisService createObjectUnderTest() { return new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, - pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); } @Test @@ -186,19 +192,20 @@ void testServiceStart() { KinesisService kinesisService = createObjectUnderTest(); kinesisService.start(buffer); assertNotNull(kinesisService.getScheduler(buffer)); + verify(workerIdentifierGenerator, times(1)).generate(); } @Test void testServiceThrowsWhenLeaseConfigIsInvalid() { when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.empty()); assertThrows(IllegalStateException.class, () -> new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, - pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier)); + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator)); } @Test void testCreateScheduler() { KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, - pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); assertNotNull(schedulerObjectUnderTest); @@ -210,13 +217,14 @@ void testCreateScheduler() { assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); assertNotNull(schedulerObjectUnderTest.processorConfig()); assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + verify(workerIdentifierGenerator, times(1)).generate(); } @Test void testCreateSchedulerWithPollingStrategy() { when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, - pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); assertNotNull(schedulerObjectUnderTest); @@ -228,6 +236,7 @@ void testCreateSchedulerWithPollingStrategy() { assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); assertNotNull(schedulerObjectUnderTest.processorConfig()); assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + verify(workerIdentifierGenerator, times(1)).generate(); } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java index 14ac812138..eeb7f70eee 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java @@ -3,6 +3,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -17,6 +18,7 @@ import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; import java.util.List; @@ -85,12 +87,16 @@ void setup() { kinesisLeaseConfig = mock(KinesisLeaseConfig.class); kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class); when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig); - when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("us-east-1"); + when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("table-name"); + when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1"); + when(kinesisLeaseCoordinationTableConfig.getAwsRegion()).thenReturn(Region.US_EAST_1); when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.ofNullable(kinesisLeaseConfig)); - when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); + when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.US_EAST_1); when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString()); when(awsAuthenticationConfig.getAwsStsExternalId()).thenReturn(UUID.randomUUID().toString()); final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + AwsCredentialsProvider defaultCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions())).thenReturn(defaultCredentialsProvider); when(awsAuthenticationConfig.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); when(kinesisSourceConfig.getAwsAuthenticationConfig()).thenReturn(awsAuthenticationConfig); when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java index 09c64966f7..00f2b668e0 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java @@ -1,44 +1,21 @@ package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.databind.ObjectMapper; -import org.hamcrest.CoreMatchers; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; -import org.opensearch.dataprepper.test.helper.ReflectivelySetField; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sts.StsClient; -import software.amazon.awssdk.services.sts.StsClientBuilder; -import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import java.util.Collections; import java.util.Map; import java.util.UUID; -import java.util.function.Consumer; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; public class AwsAuthenticationConfigTest { private ObjectMapper objectMapper = new ObjectMapper(); - private final String TEST_ROLE = "arn:aws:iam::123456789012:role/test-role"; @ParameterizedTest @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) @@ -100,164 +77,4 @@ void getAwsStsHeaderOverridesReturnsNullIfNotInJSON() { final AwsAuthenticationConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationConfig.class); assertThat(objectUnderTest.getAwsStsHeaderOverrides(), nullValue()); } - - @Test - void authenticateAWSConfigurationShouldReturnWithoutStsRoleArn() throws NoSuchFieldException, IllegalAccessException { - AwsAuthenticationConfig awsAuthenticationOptionsConfig = new AwsAuthenticationConfig(); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", "us-east-1"); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", null); - - final DefaultCredentialsProvider mockedCredentialsProvider = mock(DefaultCredentialsProvider.class); - final AwsCredentialsProvider actualCredentialsProvider; - try (final MockedStatic defaultCredentialsProviderMockedStatic = mockStatic(DefaultCredentialsProvider.class)) { - defaultCredentialsProviderMockedStatic.when(DefaultCredentialsProvider::create) - .thenReturn(mockedCredentialsProvider); - actualCredentialsProvider = awsAuthenticationOptionsConfig.authenticateAwsConfiguration(); - } - - assertThat(actualCredentialsProvider, sameInstance(mockedCredentialsProvider)); - } - - - @Nested - class WithSts { - private StsClient stsClient; - private StsClientBuilder stsClientBuilder; - - @BeforeEach - void setUp() { - stsClient = mock(StsClient.class); - stsClientBuilder = mock(StsClientBuilder.class); - - when(stsClientBuilder.build()).thenReturn(stsClient); - } - - @Test - void authenticateAWSConfigurationShouldReturnWithStsRoleArn() throws NoSuchFieldException, IllegalAccessException { - AwsAuthenticationConfig awsAuthenticationOptionsConfig = new AwsAuthenticationConfig(); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", "us-east-1"); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", TEST_ROLE); - - when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); - final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); - when(assumeRoleRequestBuilder.roleSessionName(anyString())) - .thenReturn(assumeRoleRequestBuilder); - when(assumeRoleRequestBuilder.roleArn(anyString())) - .thenReturn(assumeRoleRequestBuilder); - - final AwsCredentialsProvider actualCredentialsProvider; - try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); - final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { - stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); - assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); - actualCredentialsProvider = awsAuthenticationOptionsConfig.authenticateAwsConfiguration(); - } - - assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); - - verify(assumeRoleRequestBuilder).roleArn(TEST_ROLE); - verify(assumeRoleRequestBuilder).roleSessionName(anyString()); - verify(assumeRoleRequestBuilder).build(); - verifyNoMoreInteractions(assumeRoleRequestBuilder); - } - - @Test - void authenticateAWSConfigurationShouldReturnWithStsRoleArnWhenNoRegion() throws NoSuchFieldException, IllegalAccessException { - AwsAuthenticationConfig awsAuthenticationOptionsConfig = new AwsAuthenticationConfig(); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", null); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", TEST_ROLE); - assertThat(awsAuthenticationOptionsConfig.getAwsRegion(), CoreMatchers.equalTo(null)); - - when(stsClientBuilder.region(null)).thenReturn(stsClientBuilder); - - final AwsCredentialsProvider actualCredentialsProvider; - try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class)) { - stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); - actualCredentialsProvider = awsAuthenticationOptionsConfig.authenticateAwsConfiguration(); - } - - assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); - } - - @Test - void authenticateAWSConfigurationShouldOverrideSTSHeadersWhenHeaderOverridesSet() throws NoSuchFieldException, IllegalAccessException { - final String headerName1 = UUID.randomUUID().toString(); - final String headerValue1 = UUID.randomUUID().toString(); - final String headerName2 = UUID.randomUUID().toString(); - final String headerValue2 = UUID.randomUUID().toString(); - final Map overrideHeaders = Map.of(headerName1, headerValue1, headerName2, headerValue2); - - AwsAuthenticationConfig awsAuthenticationOptionsConfig = new AwsAuthenticationConfig(); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", "us-east-1"); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", TEST_ROLE); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsHeaderOverrides", overrideHeaders); - - when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); - - final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); - when(assumeRoleRequestBuilder.roleSessionName(anyString())) - .thenReturn(assumeRoleRequestBuilder); - when(assumeRoleRequestBuilder.roleArn(anyString())) - .thenReturn(assumeRoleRequestBuilder); - when(assumeRoleRequestBuilder.overrideConfiguration(any(Consumer.class))) - .thenReturn(assumeRoleRequestBuilder); - - final AwsCredentialsProvider actualCredentialsProvider; - try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); - final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { - stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); - assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); - actualCredentialsProvider = awsAuthenticationOptionsConfig.authenticateAwsConfiguration(); - } - - assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); - - final ArgumentCaptor> configurationCaptor = ArgumentCaptor.forClass(Consumer.class); - - verify(assumeRoleRequestBuilder).roleArn(TEST_ROLE); - verify(assumeRoleRequestBuilder).roleSessionName(anyString()); - verify(assumeRoleRequestBuilder).overrideConfiguration(configurationCaptor.capture()); - verify(assumeRoleRequestBuilder).build(); - verifyNoMoreInteractions(assumeRoleRequestBuilder); - - final Consumer actualOverride = configurationCaptor.getValue(); - - final AwsRequestOverrideConfiguration.Builder configurationBuilder = mock(AwsRequestOverrideConfiguration.Builder.class); - actualOverride.accept(configurationBuilder); - verify(configurationBuilder).putHeader(headerName1, headerValue1); - verify(configurationBuilder).putHeader(headerName2, headerValue2); - verifyNoMoreInteractions(configurationBuilder); - } - - @Test - void authenticateAWSConfigurationShouldNotOverrideSTSHeadersWhenHeaderOverridesAreEmpty() throws NoSuchFieldException, IllegalAccessException { - - AwsAuthenticationConfig awsAuthenticationOptionsConfig = new AwsAuthenticationConfig(); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", "us-east-1"); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", TEST_ROLE); - ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsHeaderOverrides", Collections.emptyMap()); - - when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); - final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); - when(assumeRoleRequestBuilder.roleSessionName(anyString())) - .thenReturn(assumeRoleRequestBuilder); - when(assumeRoleRequestBuilder.roleArn(anyString())) - .thenReturn(assumeRoleRequestBuilder); - - final AwsCredentialsProvider actualCredentialsProvider; - try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); - final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { - stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); - assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); - actualCredentialsProvider = awsAuthenticationOptionsConfig.authenticateAwsConfiguration(); - } - - assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); - - verify(assumeRoleRequestBuilder).roleArn(TEST_ROLE); - verify(assumeRoleRequestBuilder).roleSessionName(anyString()); - verify(assumeRoleRequestBuilder).build(); - verifyNoMoreInteractions(assumeRoleRequestBuilder); - } - } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfigTest.java new file mode 100644 index 0000000000..68d66acf86 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/InitialPositionInStreamConfigTest.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; + +import org.junit.jupiter.api.Test; +import software.amazon.kinesis.common.InitialPositionInStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class InitialPositionInStreamConfigTest { + + @Test + void testInitialPositionGetByNameLATEST() { + final InitialPositionInStreamConfig initialPositionInStreamConfig = InitialPositionInStreamConfig.fromPositionValue("latest"); + assertEquals(initialPositionInStreamConfig, InitialPositionInStreamConfig.LATEST); + assertEquals(initialPositionInStreamConfig.toString(), "latest"); + assertEquals(initialPositionInStreamConfig.getPosition(), "latest"); + assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.LATEST); + } + + @Test + void testInitialPositionGetByNameEarliest() { + final InitialPositionInStreamConfig initialPositionInStreamConfig = InitialPositionInStreamConfig.fromPositionValue("earliest"); + assertEquals(initialPositionInStreamConfig, InitialPositionInStreamConfig.EARLIEST); + assertEquals(initialPositionInStreamConfig.toString(), "earliest"); + assertEquals(initialPositionInStreamConfig.getPosition(), "earliest"); + assertEquals(initialPositionInStreamConfig.getPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + } + +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index 863e7fb4b2..0de9e870e8 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -1,19 +1,23 @@ package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.yaml.snakeyaml.Yaml; +import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationDeserializer; import software.amazon.awssdk.regions.Region; import software.amazon.kinesis.common.InitialPositionInStream; -import java.io.FileReader; +import java.io.File; import java.io.IOException; import java.io.Reader; import java.io.StringReader; +import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,35 +30,36 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KinesisSourceConfigTest { - private static final String SIMPLE_PIPELINE_CONFIG = "simple-pipeline.yaml"; - private static final String SIMPLE_PIPELINE_CONFIG_2 = "simple-pipeline-2.yaml"; - private static final int MINIMAL_CHECKPOINT_INTERVAL_MILLIS = 2 * 60 * 1000; // 2 minute - private static final int DEFAULT_MAX_RECORDS = 10000; - private static final int IDLE_TIME_BETWEEN_READS_IN_MILLIS = 250; + private static final String PIPELINE_CONFIG_WITH_ACKS_ENABLED = "pipeline_with_acks_enabled.yaml"; + private static final String PIPELINE_CONFIG_WITH_POLLING_CONFIG_ENABLED = "pipeline_with_polling_config_enabled.yaml"; + private static final String PIPELINE_CONFIG_CHECKPOINT_ENABLED = "pipeline_with_checkpoint_enabled.yaml"; + private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute KinesisSourceConfig kinesisSourceConfig; + ObjectMapper objectMapper; + @BeforeEach void setUp(TestInfo testInfo) throws IOException { String fileName = testInfo.getTags().stream().findFirst().orElse(""); - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource(fileName).getFile()); - Object data = yaml.load(fileReader); - ObjectMapper mapper = new ObjectMapper(); - if (data instanceof Map) { - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("kinesis-pipeline"); - Map sourceMap = (Map) logPipelineMap.get("source"); - Map kinesisConfigMap = (Map) sourceMap.get("kinesis"); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kinesisConfigMap); - Reader reader = new StringReader(json); - kinesisSourceConfig = mapper.readValue(reader, KinesisSourceConfig.class); - } + final File configurationFile = new File(getClass().getClassLoader().getResource(fileName).getFile()); + objectMapper = new ObjectMapper(new YAMLFactory()); + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.registerModule(simpleModule); + + final Map pipelineConfig = objectMapper.readValue(configurationFile, Map.class); + final Map sourceMap = (Map) pipelineConfig.get("source"); + final Map kinesisConfigMap = (Map) sourceMap.get("kinesis"); + String json = objectMapper.writeValueAsString(kinesisConfigMap); + final Reader reader = new StringReader(json); + kinesisSourceConfig = objectMapper.readValue(reader, KinesisSourceConfig.class); + } @Test - @Tag(SIMPLE_PIPELINE_CONFIG) + @Tag(PIPELINE_CONFIG_WITH_ACKS_ENABLED) void testSourceConfig() { assertThat(kinesisSourceConfig, notNullValue()); @@ -69,21 +74,20 @@ void testSourceConfig() { assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsHeaderOverrides()); List streamConfigs = kinesisSourceConfig.getStreams(); - assertNull(kinesisSourceConfig.getCodec()); + assertNotNull(kinesisSourceConfig.getCodec()); assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.ENHANCED_FAN_OUT); assertNull(kinesisSourceConfig.getPollingConfig()); for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { assertTrue(kinesisStreamConfig.getName().contains("stream")); - assertTrue(kinesisStreamConfig.getArn().contains("123456789012:stream/stream")); assertFalse(kinesisStreamConfig.isEnableCheckPoint()); assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.LATEST); - assertEquals(kinesisStreamConfig.getCheckPointIntervalInMilliseconds(), MINIMAL_CHECKPOINT_INTERVAL_MILLIS); + assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL); } } @Test - @Tag(SIMPLE_PIPELINE_CONFIG_2) + @Tag(PIPELINE_CONFIG_WITH_POLLING_CONFIG_ENABLED) void testSourceConfigWithStreamCodec() { assertThat(kinesisSourceConfig, notNullValue()); @@ -101,14 +105,44 @@ void testSourceConfigWithStreamCodec() { assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.POLLING); assertNotNull(kinesisSourceConfig.getPollingConfig()); assertEquals(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords(), 10); - assertEquals(kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReadsInMillis(), 10); + assertEquals(kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads(), Duration.ofSeconds(10)); for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { assertTrue(kinesisStreamConfig.getName().contains("stream")); - assertTrue(kinesisStreamConfig.getArn().contains("123456789012:stream/stream")); assertFalse(kinesisStreamConfig.isEnableCheckPoint()); assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.LATEST); - assertEquals(kinesisStreamConfig.getCheckPointIntervalInMilliseconds(), MINIMAL_CHECKPOINT_INTERVAL_MILLIS); + assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL); + } + } + + @Test + @Tag(PIPELINE_CONFIG_CHECKPOINT_ENABLED) + void testSourceConfigWithInitialPosition() { + + assertThat(kinesisSourceConfig, notNullValue()); + assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); + assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); + assertFalse(kinesisSourceConfig.isAcknowledgments()); + assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); + assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); + assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); + assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn(), "arn:aws:iam::123456789012:role/OSI-PipelineRole"); + assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsExternalId()); + assertNull(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsHeaderOverrides()); + assertNotNull(kinesisSourceConfig.getCodec()); + List streamConfigs = kinesisSourceConfig.getStreams(); + assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.ENHANCED_FAN_OUT); + + Map expectedCheckpointIntervals = new HashMap<>(); + expectedCheckpointIntervals.put("stream-1", Duration.ofSeconds(20)); + expectedCheckpointIntervals.put("stream-2", Duration.ofMinutes(15)); + expectedCheckpointIntervals.put("stream-3", Duration.ofHours(2)); + + for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { + assertTrue(kinesisStreamConfig.getName().contains("stream")); + assertTrue(kinesisStreamConfig.isEnableCheckPoint()); + assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.TRIM_HORIZON); + assertEquals(kinesisStreamConfig.getCheckPointInterval(), expectedCheckpointIntervals.get(kinesisStreamConfig.getName())); } } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java index 21e83e4e39..9d5d8aefec 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java @@ -2,6 +2,8 @@ import org.junit.jupiter.api.Test; +import java.time.Duration; + import static org.junit.jupiter.api.Assertions.assertEquals; public class KinesisStreamPollingConfigTest { @@ -12,7 +14,7 @@ public class KinesisStreamPollingConfigTest { void testConfig() { KinesisStreamPollingConfig kinesisStreamPollingConfig = new KinesisStreamPollingConfig(); assertEquals(kinesisStreamPollingConfig.getMaxPollingRecords(), DEFAULT_MAX_RECORDS); - assertEquals(kinesisStreamPollingConfig.getIdleTimeBetweenReadsInMillis(), IDLE_TIME_BETWEEN_READS_IN_MILLIS); + assertEquals(kinesisStreamPollingConfig.getIdleTimeBetweenReads(), Duration.ofMillis(IDLE_TIME_BETWEEN_READS_IN_MILLIS)); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index e226596849..c9a3617b1a 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -62,7 +62,7 @@ public class KinesisRecordProcessorTest { private static final String streamId = "stream-1"; private static final String codec_plugin_name = "json"; - private static final int CHECKPOINT_INTERVAL_MS = 1000; + private static final Duration CHECKPOINT_INTERVAL = Duration.ofMillis(1000); private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10; @Mock @@ -124,7 +124,7 @@ public void setup() { InputCodec codec = mock(InputCodec.class); when(pluginFactory.loadPlugin(eq(InputCodec.class), any())).thenReturn(codec); - when(kinesisStreamConfig.getCheckPointIntervalInMilliseconds()).thenReturn(CHECKPOINT_INTERVAL_MS); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(CHECKPOINT_INTERVAL); when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); when(kinesisSourceConfig.getStreams()).thenReturn(List.of(kinesisStreamConfig)); when(processRecordsInput.checkpointer()).thenReturn(checkpointer); diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_acks_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_acks_enabled.yaml new file mode 100644 index 0000000000..e5260372f5 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_acks_enabled.yaml @@ -0,0 +1,12 @@ +source: + kinesis: + streams: + - stream_name: "stream-1" + - stream_name: "stream-2" + - stream_name: "stream-3" + codec: + ndjson: + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" + region: "us-east-1" + acknowledgments: true \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml new file mode 100644 index 0000000000..e918048529 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml @@ -0,0 +1,20 @@ +source: + kinesis: + streams: + - stream_name: "stream-1" + initial_position: "EARLIEST" + enable_checkpoint: true + checkpoint_interval: "20s" + - stream_name: "stream-2" + initial_position: "EARLIEST" + enable_checkpoint: true + checkpoint_interval: "PT15M" + - stream_name: "stream-3" + initial_position: "EARLIEST" + enable_checkpoint: true + checkpoint_interval: "PT2H" + codec: + ndjson: + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" + region: "us-east-1" \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_polling_config_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_polling_config_enabled.yaml new file mode 100644 index 0000000000..4a3156ec2a --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_polling_config_enabled.yaml @@ -0,0 +1,15 @@ +source: + kinesis: + streams: + - stream_name: "stream-1" + - stream_name: "stream-2" + - stream_name: "stream-3" + codec: + ndjson: + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" + region: "us-east-1" + consumer_strategy: "polling" + polling: + max_polling_records: 10 + idle_time_between_reads: 10s \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/simple-pipeline-2.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/simple-pipeline-2.yaml deleted file mode 100644 index 46234d50fb..0000000000 --- a/data-prepper-plugins/kinesis-source/src/test/resources/simple-pipeline-2.yaml +++ /dev/null @@ -1,21 +0,0 @@ -kinesis-pipeline: - source: - kinesis: - streams: - - stream_name: "stream-1" - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/stream-1" - - stream_name: "stream-2" - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/stream-2" - - stream_name: "stream-3" - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/stream-3" - codec: - ndjson: - aws: - sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" - region: "us-east-1" - consumer_strategy: "Polling" - polling: - maxPollingRecords: 10 - idleTimeBetweenReadsInMillis: 10 - sink: - - stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/simple-pipeline.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/simple-pipeline.yaml deleted file mode 100644 index 07caf50d02..0000000000 --- a/data-prepper-plugins/kinesis-source/src/test/resources/simple-pipeline.yaml +++ /dev/null @@ -1,16 +0,0 @@ -kinesis-pipeline: - source: - kinesis: - streams: - - stream_name: "stream-1" - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/stream-1" - - stream_name: "stream-2" - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/stream-2" - - stream_name: "stream-3" - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/stream-3" - aws: - sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" - region: "us-east-1" - acknowledgments: true - sink: - - stdout: \ No newline at end of file