diff --git a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java index ddd0907c9a..c26ade3420 100644 --- a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java +++ b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java @@ -5,12 +5,6 @@ package org.opensearch.dataprepper.plugins.buffer.blockingbuffer; -import org.junit.jupiter.params.provider.ValueSource; -import org.opensearch.dataprepper.model.CheckpointState; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.metrics.MetricNames; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -21,6 +15,12 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.record.Record; import java.util.ArrayList; import java.util.Collection; @@ -51,8 +51,8 @@ public class BlockingBufferTests { private static final String PLUGIN_NAME = "BlockingBuffer"; private static final int TEST_BATCH_SIZE = 3; private static final int TEST_BUFFER_SIZE = 13; - private static final int TEST_WRITE_TIMEOUT = 1_00; - private static final int TEST_BATCH_READ_TIMEOUT = 5_000; + private static final int TEST_WRITE_TIMEOUT = 10; + private static final int TEST_BATCH_READ_TIMEOUT = 500; private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); @BeforeEach @@ -263,7 +263,7 @@ void testNonZeroBatchDelayReturnsAllRecords() throws Exception { final Collection> testRecords2 = generateBatchRecords(1); EXECUTOR.submit(() -> { try { - Thread.sleep(1000); + Thread.sleep(TEST_BATCH_READ_TIMEOUT / 2); buffer.writeAll(testRecords2, TEST_WRITE_TIMEOUT); } catch (final Exception e) { throw new RuntimeException(e); @@ -273,8 +273,8 @@ void testNonZeroBatchDelayReturnsAllRecords() throws Exception { final Map.Entry>, CheckpointState> readResult = buffer.read(TEST_BATCH_READ_TIMEOUT); final Collection> records = readResult.getKey(); final CheckpointState checkpointState = readResult.getValue(); - assertThat(records.size(), is(2)); - assertThat(checkpointState.getNumRecordsToBeChecked(), is(2)); + assertThat(records.size(), is(testRecords.size() + testRecords2.size())); + assertThat(checkpointState.getNumRecordsToBeChecked(), is(testRecords.size() + testRecords2.size())); } @Test