diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkThreadTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkThreadTest.java index 6f39084a25..1660484a72 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkThreadTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkThreadTest.java @@ -10,7 +10,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -19,14 +18,14 @@ @ExtendWith(MockitoExtension.class) public class SinkThreadTest { @Mock - AbstractSink sink; + private AbstractSink sink; - SinkThread sinkThread; + private SinkThread sinkThread; @Test public void testSinkThread() { when(sink.isReady()).thenReturn(true); - sinkThread = new SinkThread(sink, 5, 1000); + sinkThread = new SinkThread(sink, 5, 100); sinkThread.run(); verify(sink, times(1)).isReady(); } @@ -34,25 +33,18 @@ public void testSinkThread() { @Test public void testSinkThread2() { when(sink.isReady()).thenReturn(false); - sinkThread = new SinkThread(sink, 5, 1000); + sinkThread = new SinkThread(sink, 5, 100); sinkThread.run(); verify(sink, times(6)).isReady(); - try { - doAnswer((i) -> { - return null; - }).when(sink).doInitialize(); - verify(sink, times(5)).doInitialize(); - } catch (Exception e){} + verify(sink, times(5)).doInitialize(); when(sink.isReady()).thenReturn(false).thenReturn(true); sinkThread.run(); verify(sink, times(8)).isReady(); when(sink.isReady()).thenReturn(false).thenReturn(true); - try { - lenient().doAnswer((i) -> { - throw new InterruptedException("Fake interrupt"); - }).when(sink).doInitialize(); - sinkThread.run(); - verify(sink, times(7)).doInitialize(); - } catch (Exception e){} + lenient().doAnswer((i) -> { + throw new InterruptedException("Fake interrupt"); + }).when(sink).doInitialize(); + sinkThread.run(); + verify(sink, times(7)).doInitialize(); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index 9b015aea72..486617e9a0 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -14,6 +14,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; + +import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doAnswer; @@ -27,7 +29,7 @@ @ExtendWith(MockitoExtension.class) class DefaultAcknowledgementSetManagerTests { - private static final Duration TEST_TIMEOUT_MS = Duration.ofMillis(1000); + private static final Duration TEST_TIMEOUT = Duration.ofMillis(400); DefaultAcknowledgementSetManager acknowledgementSetManager; private ExecutorService callbackExecutor; @@ -61,21 +63,25 @@ void setup() { lenient().when(event2.getEventHandle()).thenReturn(eventHandle2); acknowledgementSetManager = createObjectUnderTest(); - AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS); + AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); acknowledgementSet1.add(event1); acknowledgementSet1.add(event2); acknowledgementSet1.complete(); } DefaultAcknowledgementSetManager createObjectUnderTest() { - return new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(TEST_TIMEOUT_MS.toMillis() * 2)); + return new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(TEST_TIMEOUT.toMillis() * 2)); } @Test - void testBasic() throws InterruptedException { + void testBasic() { acknowledgementSetManager.releaseEventReference(eventHandle2, true); acknowledgementSetManager.releaseEventReference(eventHandle1, true); - Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); + assertThat(result, equalTo(true)); + }); assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); assertThat(result, equalTo(true)); } @@ -83,13 +89,13 @@ void testBasic() throws InterruptedException { @Test void testExpirations() throws InterruptedException { acknowledgementSetManager.releaseEventReference(eventHandle2, true); - Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5); + Thread.sleep(TEST_TIMEOUT.multipliedBy(5).toMillis()); assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); assertThat(result, equalTo(null)); } @Test - void testMultipleAcknowledgementSets() throws InterruptedException { + void testMultipleAcknowledgementSets() { event3 = mock(JacksonEvent.class); doAnswer((i) -> { eventHandle3 = i.getArgument(0); @@ -97,13 +103,17 @@ void testMultipleAcknowledgementSets() throws InterruptedException { }).when(event3).setEventHandle(any()); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); - AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS); + AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); acknowledgementSet2.add(event3); acknowledgementSet2.complete(); acknowledgementSetManager.releaseEventReference(eventHandle2, true); acknowledgementSetManager.releaseEventReference(eventHandle3, true); - Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5); + await().atMost(TEST_TIMEOUT.multipliedBy(5)) + .untilAsserted(() -> { + assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); + assertThat(result, equalTo(true)); + }); assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0)); assertThat(result, equalTo(true)); }