From 1ca007716da972b4146b90abcf236fde09f4aa19 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Tue, 27 Aug 2024 20:25:57 -0700 Subject: [PATCH] Update tests and changes per review comments Signed-off-by: Souvik Bose --- ...Factory.java => KinesisClientFactory.java} | 6 +- .../kinesis/source/KinesisService.java | 8 +-- .../plugins/kinesis/source/KinesisSource.java | 4 +- .../AwsAuthenticationConfig.java | 18 ++--- .../processor/KinesisRecordProcessor.java | 21 ++---- .../extension/KinesisLeaseConfigTest.java | 3 +- ...est.java => KinesisClientFactoryTest.java} | 4 +- .../kinesis/source/KinesisServiceTest.java | 23 +++--- .../processor/KinesisRecordProcessorTest.java | 70 +++++++++++++++++++ 9 files changed, 110 insertions(+), 47 deletions(-) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/{ClientFactory.java => KinesisClientFactory.java} (91%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/{ClientFactoryTest.java => KinesisClientFactoryTest.java} (92%) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java similarity index 91% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactory.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java index 64378854a4..e85b66aa37 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java @@ -11,12 +11,12 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; -public class ClientFactory { +public class KinesisClientFactory { private final AwsCredentialsProvider awsCredentialsProvider; private final AwsAuthenticationConfig awsAuthenticationConfig; - public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, - final AwsAuthenticationConfig awsAuthenticationConfig) { + public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, + final AwsAuthenticationConfig awsAuthenticationConfig) { awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() .withRegion(awsAuthenticationConfig.getAwsRegion()) .withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn()) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index fa36259a3d..cf2f96d42f 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -54,7 +54,7 @@ public class KinesisService { private final ExecutorService executorService; public KinesisService(final KinesisSourceConfig sourceConfig, - final ClientFactory clientFactory, + final KinesisClientFactory kinesisClientFactory, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final PipelineDescription pipelineDescription, @@ -72,9 +72,9 @@ public KinesisService(final KinesisSourceConfig sourceConfig, kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get(); this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(); this.kclMetricsNamespaceName = this.tableName; - this.dynamoDbClient = clientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); - this.kinesisClient = clientFactory.buildKinesisAsyncClient(); - this.cloudWatchClient = clientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); + this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); + this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(); + this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); this.pipelineName = pipelineDescription.getPipelineName(); this.applicationName = pipelineName; this.executorService = Executors.newFixedThreadPool(1); diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java index a88cff0de0..16b25d072b 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java @@ -36,8 +36,8 @@ public KinesisSource(final KinesisSourceConfig kinesisSourceConfig, final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) { this.kinesisSourceConfig = kinesisSourceConfig; this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier; - ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig()); - this.kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + KinesisClientFactory kinesisClientFactory = new KinesisClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig()); + this.kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); } @Override diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java index 27774828ef..e70ec337e7 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; +import lombok.Getter; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; @@ -26,14 +27,17 @@ public class AwsAuthenticationConfig { @Size(min = 1, message = "Region cannot be empty string") private String awsRegion; + @Getter @JsonProperty("sts_role_arn") @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") private String awsStsRoleArn; + @Getter @JsonProperty("sts_external_id") @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") private String awsStsExternalId; + @Getter @JsonProperty("sts_header_overrides") @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") private Map awsStsHeaderOverrides; @@ -42,18 +46,6 @@ public Region getAwsRegion() { return awsRegion != null ? Region.of(awsRegion) : null; } - public String getAwsStsRoleArn() { - return awsStsRoleArn; - } - - public String getAwsStsExternalId() { - return awsStsExternalId; - } - - public Map getAwsStsHeaderOverrides() { - return awsStsHeaderOverrides; - } - public AwsCredentialsProvider authenticateAwsConfiguration() { final AwsCredentialsProvider awsCredentialsProvider; @@ -67,7 +59,7 @@ public AwsCredentialsProvider authenticateAwsConfiguration() { final StsClient stsClient = StsClient.builder().region(getAwsRegion()).build(); AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() - .roleSessionName("GeoIP-Processor-" + UUID.randomUUID()).roleArn(awsStsRoleArn); + .roleSessionName("Kinesis-source-" + UUID.randomUUID()).roleArn(awsStsRoleArn); if (awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { assumeRoleRequestBuilder = assumeRoleRequestBuilder.overrideConfiguration( diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index bfa7c2c023..ada8094908 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -34,6 +34,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; public class KinesisRecordProcessor implements ShardRecordProcessor { @@ -52,7 +53,7 @@ public class KinesisRecordProcessor implements ShardRecordProcessor { private final Counter recordProcessingErrors; private final Counter checkpointFailures; private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20); - private static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; + public static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors"; public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures"; public static final String KINESIS_STREAM_TAG_KEY = "stream"; @@ -71,7 +72,7 @@ public KinesisRecordProcessor(Buffer> buffer, final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); this.acknowledgementSetManager = acknowledgementSetManager; - this.acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); + this.acknowledgementSetCallbackCounter = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.checkpointIntervalMilliSeconds = kinesisStreamConfig.getCheckPointIntervalInMilliseconds(); @@ -108,29 +109,21 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { List> records = new ArrayList<>(); try { - AcknowledgementSet acknowledgementSet; + Optional acknowledgementSetOpt = Optional.empty(); boolean acknowledgementsEnabled = kinesisSourceConfig.isAcknowledgments(); if (acknowledgementsEnabled) { - acknowledgementSet = createAcknowledgmentSet(processRecordsInput); - } else { - acknowledgementSet = null; + acknowledgementSetOpt = Optional.of(createAcknowledgmentSet(processRecordsInput)); } for (KinesisClientRecord record : processRecordsInput.records()) { processRecord(record, records::add); } - if (acknowledgementSet != null) { - records.forEach(record -> { - acknowledgementSet.add(record.getData()); - }); - } + acknowledgementSetOpt.ifPresent(acknowledgementSet -> records.forEach(record -> acknowledgementSet.add(record.getData()))); buffer.writeAll(records, bufferTimeoutMillis); - if (acknowledgementSet != null) { - acknowledgementSet.complete(); - } + acknowledgementSetOpt.ifPresent(AcknowledgementSet::complete); // Checkpoint for shard if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointIntervalMilliSeconds) { diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java index 1f7b8fbfc4..582bf68f69 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.pipeline.parser.ByteCountDeserializer; import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationDeserializer; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import software.amazon.awssdk.regions.Region; import java.io.File; import java.io.IOException; @@ -48,7 +49,7 @@ void testConfigWithTestExtension() throws IOException { assertNotNull(kinesisLeaseConfig.getLeaseCoordinationTable()); assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(), "kinesis-pipeline-kcl"); assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getRegion(), "us-east-1"); - + assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion(), Region.US_EAST_1); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactoryTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java similarity index 92% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactoryTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java index 78faf5382d..5c6e298760 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactoryTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactoryTest.java @@ -16,7 +16,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.mock; -public class ClientFactoryTest { +public class KinesisClientFactoryTest { private Region region = Region.US_EAST_1; private String roleArn; private Map stsHeader; @@ -33,7 +33,7 @@ void testCreateClient() throws NoSuchFieldException, IllegalAccessException { ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsRegion", "us-east-1"); ReflectivelySetField.setField(AwsAuthenticationConfig.class, awsAuthenticationOptionsConfig, "awsStsRoleArn", roleArn); - ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, awsAuthenticationOptionsConfig); + KinesisClientFactory clientFactory = new KinesisClientFactory(awsCredentialsSupplier, awsAuthenticationOptionsConfig); final DynamoDbAsyncClient dynamoDbAsyncClient = clientFactory.buildDynamoDBClient(Region.US_EAST_1); assertNotNull(dynamoDbAsyncClient); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index dfc4455d26..d5d8a93c4f 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -85,7 +85,7 @@ public class KinesisServiceTest { private PipelineDescription pipelineDescription; @Mock - private ClientFactory clientFactory; + private KinesisClientFactory kinesisClientFactory; @Mock private KinesisAsyncClient kinesisClient; @@ -120,7 +120,7 @@ void setup() { kinesisClient = mock(KinesisAsyncClient.class); dynamoDbClient = mock(DynamoDbAsyncClient.class); cloudWatchClient = mock(CloudWatchAsyncClient.class); - clientFactory = mock(ClientFactory.class); + kinesisClientFactory = mock(KinesisClientFactory.class); scheduler = mock(Scheduler.class); pipelineDescription = mock(PipelineDescription.class); buffer = mock(Buffer.class); @@ -168,16 +168,16 @@ void setup() { when(kinesisSourceConfig.getStreams()).thenReturn(streamConfigs); when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); - when(clientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(dynamoDbClient); - when(clientFactory.buildKinesisAsyncClient()).thenReturn(kinesisClient); - when(clientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient); + when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(dynamoDbClient); + when(kinesisClientFactory.buildKinesisAsyncClient()).thenReturn(kinesisClient); + when(kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient); when(kinesisClient.serviceClientConfiguration()).thenReturn(KinesisServiceClientConfiguration.builder().region(Region.US_EAST_1).build()); when(scheduler.startGracefulShutdown()).thenReturn(CompletableFuture.completedFuture(true)); when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); } public KinesisService createObjectUnderTest() { - return new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + return new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); } @@ -188,9 +188,16 @@ void testServiceStart() { assertNotNull(kinesisService.getScheduler(buffer)); } + @Test + void testServiceThrowsWhenLeaseConfigIsInvalid() { + when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.empty()); + assertThrows(IllegalStateException.class, () -> new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier)); + } + @Test void testCreateScheduler() { - KinesisService kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); @@ -208,7 +215,7 @@ void testCreateScheduler() { @Test void testCreateSchedulerWithPollingStrategy() { when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); - KinesisService kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index bb851837ac..e226596849 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -36,18 +36,22 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES; import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS; import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY; @@ -100,6 +104,9 @@ public class KinesisRecordProcessorTest { @Mock private Counter checkpointFailures; + @Mock + private Counter acknowledgementSetCallbackCounter; + @BeforeEach public void setup() { MockitoAnnotations.initMocks(this); @@ -121,6 +128,7 @@ public void setup() { when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); when(kinesisSourceConfig.getStreams()).thenReturn(List.of(kinesisStreamConfig)); when(processRecordsInput.checkpointer()).thenReturn(checkpointer); + when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(acknowledgementSetCallbackCounter); } @Test @@ -143,6 +151,68 @@ void testProcessRecordsWithoutAcknowledgementsCheckpointsEnabled() verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); } + @Test + void testProcessRecordsWithAcknowledgementsCheckpointsEnabled() + throws Exception { + List kinesisClientRecords = createInputKinesisClientRecords(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + when(kinesisSourceConfig.isAcknowledgments()).thenReturn(true); + when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); + AtomicReference numEventsAdded = new AtomicReference<>(0); + doAnswer(a -> { + numEventsAdded.getAndSet(numEventsAdded.get() + 1); + return null; + }).when(acknowledgementSet).add(any()); + + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); + + kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + kinesisRecordProcessor.initialize(initializationInput); + + Thread.sleep(2000); + + kinesisRecordProcessor.processRecords(processRecordsInput); + + verify(checkpointer).checkpoint(); + verify(buffer).writeAll(anyCollection(), anyInt()); + verify(acknowledgementSetManager, times(1)).create(any(), any(Duration.class)); + } + + @Test + void testProcessRecordsWithAcknowledgementsEnabledAndAcksReturnFalse() + throws Exception { + List kinesisClientRecords = createInputKinesisClientRecords(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + when(kinesisSourceConfig.isAcknowledgments()).thenReturn(true); + when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); + AtomicReference numEventsAdded = new AtomicReference<>(0); + doAnswer(a -> { + numEventsAdded.getAndSet(numEventsAdded.get() + 1); + return null; + }).when(acknowledgementSet).add(any()); + + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(false); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); + + kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + kinesisRecordProcessor.initialize(initializationInput); + + Thread.sleep(2000); + + kinesisRecordProcessor.processRecords(processRecordsInput); + + verify(checkpointer, times(0)).checkpoint(); + verify(buffer).writeAll(anyCollection(), anyInt()); + verify(acknowledgementSetManager, times(1)).create(any(), any(Duration.class)); + } + @Test void testProcessRecordsWithNDJsonInputCodec() throws Exception {