From 806f78d574a2d49b71acba07a54b2509e643e5f3 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Tue, 17 Sep 2024 10:17:41 -0700 Subject: [PATCH] Rename function and address comments. Signed-off-by: Souvik Bose --- .../source/KinesisMultiStreamTracker.java | 16 ++---- .../processor/KinesisCheckpointerTracker.java | 2 +- .../processor/KinesisRecordProcessor.java | 48 +++++++--------- .../KinesisShardRecordProcessorFactory.java | 4 +- .../KinesisCheckpointerTrackerTest.java | 8 +-- .../processor/KinesisRecordProcessorTest.java | 55 ++++++++++--------- 6 files changed, 64 insertions(+), 69 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java index 4414d9fd39..638751f17e 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java @@ -10,6 +10,7 @@ package org.opensearch.dataprepper.plugins.kinesis.source; +import com.amazonaws.arn.Arn; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -44,34 +45,29 @@ public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final Kinesis public List streamConfigList() { List streamConfigList = new ArrayList<>(); for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) { - StreamConfig streamConfig; - try { - streamConfig = getStreamConfig(kinesisStreamConfig); - } catch (Exception e) { - throw new RuntimeException(e); - } + StreamConfig streamConfig = getStreamConfig(kinesisStreamConfig); streamConfigList.add(streamConfig); } return streamConfigList; } - private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception { + private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) { StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig); return new StreamConfig(sourceStreamIdentifier, InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())); } - private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception { + private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(kinesisStreamConfig.getName()) .build(); - DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get(); + DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).join(); String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription()); return StreamIdentifier.multiStreamInstance(streamIdentifierString); } private String getStreamIdentifierString(StreamDescription streamDescription) { - String accountId = streamDescription.streamARN().split(COLON)[4]; + String accountId = Arn.fromString(streamDescription.streamARN()).getAccountId(); long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond(); return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond)); } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java index 9f8e5f7625..8fb7c5ec6c 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java @@ -38,7 +38,7 @@ public synchronized void markSequenceNumberForCheckpoint(final ExtendedSequenceN checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true); } - public synchronized Optional getLatestAvailableCheckpointRecord() { + public synchronized Optional popLatestReadyToCheckpointRecord() { Optional kinesisCheckpointerRecordOptional = Optional.empty(); List toRemoveRecords = new ArrayList<>(); diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index 7d02ff959a..6df0760ca3 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -49,31 +49,34 @@ public class KinesisRecordProcessor implements ShardRecordProcessor { private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class); + private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; + private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20); + private final StreamIdentifier streamIdentifier; private final KinesisStreamConfig kinesisStreamConfig; private final Duration checkpointInterval; private final KinesisSourceConfig kinesisSourceConfig; private final BufferAccumulator> bufferAccumulator; private final KinesisRecordConverter kinesisRecordConverter; + private final KinesisCheckpointerTracker kinesisCheckpointerTracker; + private final ExecutorService executorService; private String kinesisShardId; private long lastCheckpointTimeInMillis; private final int bufferTimeoutMillis; private final AcknowledgementSetManager acknowledgementSetManager; + private final Counter acknowledgementSetSuccesses; private final Counter acknowledgementSetFailures; private final Counter recordsProcessed; private final Counter recordProcessingErrors; private final Counter checkpointFailures; - private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20); public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses"; public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures"; public static final String KINESIS_RECORD_PROCESSED = "recordProcessed"; public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors"; public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures"; public static final String KINESIS_STREAM_TAG_KEY = "stream"; - private KinesisCheckpointerTracker kinesisCheckpointerTracker; - private final ExecutorService executorService; private AtomicBoolean isStopRequested; public KinesisRecordProcessor(final BufferAccumulator> bufferAccumulator, @@ -81,6 +84,7 @@ public KinesisRecordProcessor(final BufferAccumulator> bufferAccum final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics, final KinesisRecordConverter kinesisRecordConverter, + final KinesisCheckpointerTracker kinesisCheckpointerTracker, final StreamIdentifier streamIdentifier) { this.bufferTimeoutMillis = (int) kinesisSourceConfig.getBufferTimeout().toMillis(); this.streamIdentifier = streamIdentifier; @@ -95,7 +99,7 @@ public KinesisRecordProcessor(final BufferAccumulator> bufferAccum this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval(); this.bufferAccumulator = bufferAccumulator; - this.kinesisCheckpointerTracker = new KinesisCheckpointerTracker(); + this.kinesisCheckpointerTracker = kinesisCheckpointerTracker; this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("kinesis-ack-monitor")); this.isStopRequested = new AtomicBoolean(false); } @@ -120,16 +124,7 @@ public void initialize(InitializationInput initializationInput) { private void monitorCheckpoint(final ExecutorService executorService) { while (!isStopRequested.get()) { if (System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis()) { - LOG.debug("Regular checkpointing for shard {}", kinesisShardId); - - Optional kinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); - if (kinesisCheckpointerRecordOptional.isPresent()) { - RecordProcessorCheckpointer recordProcessorCheckpointer = kinesisCheckpointerRecordOptional.get().getCheckpointer(); - String sequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber().sequenceNumber(); - Long subSequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber().subSequenceNumber(); - checkpoint(recordProcessorCheckpointer, sequenceNumber, subSequenceNumber); - lastCheckpointTimeInMillis = System.currentTimeMillis(); - } + doCheckpoint(); } try { Thread.sleep(DEFAULT_MONITOR_WAIT_TIME_MS); @@ -195,14 +190,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { // Checkpoint for shard if (!acknowledgementsEnabled && (System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis())) { - LOG.debug("Regular checkpointing for shard {}", kinesisShardId); - - Optional KinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); - if (KinesisCheckpointerRecordOptional.isPresent()) { - ExtendedSequenceNumber lastExtendedSequenceNumber = KinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber(); - checkpoint(processRecordsInput.checkpointer(), lastExtendedSequenceNumber.sequenceNumber(), lastExtendedSequenceNumber.subSequenceNumber()); - lastCheckpointTimeInMillis = System.currentTimeMillis(); - } + doCheckpoint(); } } catch (Exception ex) { recordProcessingErrors.increment(); @@ -242,6 +230,17 @@ public void checkpoint(RecordProcessorCheckpointer checkpointer, String sequence } } + private void doCheckpoint() { + LOG.debug("Regular checkpointing for shard {}", kinesisShardId); + Optional kinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord(); + if (kinesisCheckpointerRecordOptional.isPresent()) { + ExtendedSequenceNumber lastExtendedSequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber(); + RecordProcessorCheckpointer recordProcessorCheckpointer = kinesisCheckpointerRecordOptional.get().getCheckpointer(); + checkpoint(recordProcessorCheckpointer, lastExtendedSequenceNumber.sequenceNumber(), lastExtendedSequenceNumber.subSequenceNumber()); + lastCheckpointTimeInMillis = System.currentTimeMillis(); + } + } + private void checkpoint(RecordProcessorCheckpointer checkpointer) { try { String kinesisStream = streamIdentifier.streamName(); @@ -268,9 +267,4 @@ private ExtendedSequenceNumber getLatestSequenceNumberFromInput(final ProcessRec } return largestExtendedSequenceNumber; } - - @VisibleForTesting - public void setKinesisCheckpointerTracker(final KinesisCheckpointerTracker kinesisCheckpointerTracker) { - this.kinesisCheckpointerTracker = kinesisCheckpointerTracker; - } } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java index e326789312..ff9943a41d 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java @@ -52,6 +52,8 @@ public ShardRecordProcessor shardRecordProcessor() { public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, kinesisSourceConfig.getNumberOfRecordsToAccumulate(), kinesisSourceConfig.getBufferTimeout()); - return new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + KinesisCheckpointerTracker kinesisCheckpointerTracker = new KinesisCheckpointerTracker(); + return new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, + pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); } } \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java index ea76d1f789..fe0ab06877 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java @@ -45,7 +45,7 @@ void testCheckPointerAddAndGet() { ExtendedSequenceNumber last = extendedSequenceNumberList.get(extendedSequenceNumberList.size()-1); kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(last); - Optional checkpointRecord = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + Optional checkpointRecord = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord(); assertTrue(checkpointRecord.isEmpty()); assertEquals(kinesisCheckpointerTracker.size(), numRecords); @@ -53,7 +53,7 @@ void testCheckPointerAddAndGet() { ExtendedSequenceNumber extendedSequenceNumber1 = extendedSequenceNumberList.get(idx); kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber1); - Optional firstcheckpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + Optional firstcheckpointer = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord(); if (idx != 0) { assertTrue(firstcheckpointer.isEmpty()); assertEquals(kinesisCheckpointerTracker.size(), numRecords); @@ -79,7 +79,7 @@ void testGetLastCheckpointerAndStoreIsEmpty() { kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber); } - Optional checkpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + Optional checkpointer = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord(); assertTrue(checkpointer.isPresent()); assertEquals(0, kinesisCheckpointerTracker.size()); } @@ -92,7 +92,7 @@ public void testMarkCheckpointerReadyForCheckpoint() { ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); assertThrows(IllegalArgumentException.class, () -> kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber)); - Optional checkpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + Optional checkpointer = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord(); assertTrue(checkpointer.isEmpty()); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index 902a70bfa6..ea002e27e9 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -193,14 +193,15 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() records.add(record); when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); KinesisCheckpointerRecord kinesisCheckpointerRecord = mock(KinesisCheckpointerRecord.class); ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); when(extendedSequenceNumber.sequenceNumber()).thenReturn(sequence_number); when(extendedSequenceNumber.subSequenceNumber()).thenReturn(sub_sequence_number); when(kinesisCheckpointerRecord.getExtendedSequenceNumber()).thenReturn(extendedSequenceNumber); - when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); - kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + when(kinesisCheckpointerRecord.getCheckpointer()).thenReturn(checkpointer); + when(kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); kinesisRecordProcessor.initialize(initializationInput); kinesisRecordProcessor.processRecords(processRecordsInput); @@ -238,9 +239,9 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled() records.add(record); when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.empty()); - kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); + when(kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord()).thenReturn(Optional.empty()); kinesisRecordProcessor.initialize(initializationInput); kinesisRecordProcessor.processRecords(processRecordsInput); @@ -288,14 +289,15 @@ void testProcessRecordsWithAcknowledgementsEnabled() records.add(record); when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); KinesisCheckpointerRecord kinesisCheckpointerRecord = mock(KinesisCheckpointerRecord.class); ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); when(extendedSequenceNumber.sequenceNumber()).thenReturn(sequence_number); when(extendedSequenceNumber.subSequenceNumber()).thenReturn(sub_sequence_number); when(kinesisCheckpointerRecord.getExtendedSequenceNumber()).thenReturn(extendedSequenceNumber); - when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); - kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + when(kinesisCheckpointerRecord.getCheckpointer()).thenReturn(checkpointer); + when(kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); kinesisRecordProcessor.initialize(initializationInput); kinesisRecordProcessor.processRecords(processRecordsInput); @@ -341,14 +343,15 @@ void testProcessRecordsWithNDJsonInputCodec() records.add(record); when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); KinesisCheckpointerRecord kinesisCheckpointerRecord = mock(KinesisCheckpointerRecord.class); ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); when(extendedSequenceNumber.sequenceNumber()).thenReturn(sequence_number); when(extendedSequenceNumber.subSequenceNumber()).thenReturn(sub_sequence_number); + when(kinesisCheckpointerRecord.getCheckpointer()).thenReturn(checkpointer); when(kinesisCheckpointerRecord.getExtendedSequenceNumber()).thenReturn(extendedSequenceNumber); - when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); + when(kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); kinesisRecordProcessor.initialize(initializationInput); kinesisRecordProcessor.processRecords(processRecordsInput); @@ -384,8 +387,8 @@ void testProcessRecordsNoThrowException() final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).add(any(Record.class)); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); kinesisRecordProcessor.initialize(initializationInput); assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput)); @@ -408,8 +411,8 @@ void testProcessRecordsBufferFlushNoThrowException() final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).flush(); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); kinesisRecordProcessor.initialize(initializationInput); assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput)); @@ -420,10 +423,10 @@ void testProcessRecordsBufferFlushNoThrowException() @Test void testShardEndedLatestCheckpoint() { - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); ShardEndedInput shardEndedInput = mock(ShardEndedInput.class); when(shardEndedInput.checkpointer()).thenReturn(checkpointer); - mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); mockKinesisRecordProcessor.shardEnded(shardEndedInput); @@ -436,8 +439,8 @@ void testShardEndedCheckpointerThrowsNoThrowException(final Class exc checkpointFailures = mock(Counter.class); when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(checkpointFailures); - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); ShardEndedInput shardEndedInput = mock(ShardEndedInput.class); when(shardEndedInput.checkpointer()).thenReturn(checkpointer); doThrow(exceptionType).when(checkpointer).checkpoint(); @@ -454,8 +457,8 @@ void testShutdownRequestedWithLatestCheckpoint() { checkpointFailures = mock(Counter.class); when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(checkpointFailures); - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); ShutdownRequestedInput shutdownRequestedInput = mock(ShutdownRequestedInput.class); when(shutdownRequestedInput.checkpointer()).thenReturn(checkpointer); @@ -471,8 +474,8 @@ void testShutdownRequestedCheckpointerThrowsNoThrowException(final Class mockKinesisRecordProcessor.checkpoint(checkpointer, sequence_number, sub_sequence_number)); @@ -487,8 +490,8 @@ void testShutdownRequestedCheckpointerThrowsNoThrowExceptionRegularCheckpoint(fi checkpointFailures = mock(Counter.class); when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(checkpointFailures); - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); - mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, + acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); ShutdownRequestedInput shutdownRequestedInput = mock(ShutdownRequestedInput.class); when(shutdownRequestedInput.checkpointer()).thenReturn(checkpointer); doThrow(exceptionType).when(checkpointer).checkpoint();