Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work to reduce test time by reducing some repeated tests, using Awaitility, and reducing delays #3019

Merged
merged 2 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

import static org.awaitility.Awaitility.await;

public class AbstractSinkTest {
@Test
public void testMetrics() {
Expand Down Expand Up @@ -51,8 +54,8 @@ public void testMetrics() {
Assert.assertEquals(1.0, MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.COUNT).getValue(), 0);
Assert.assertTrue(MetricsTestUtil.isBetween(
MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.TOTAL_TIME).getValue(),
0.5,
0.6));
0.2,
0.3));
Assert.assertEquals(abstractSink.getRetryThreadState(), null);
abstractSink.shutdown();
}
Expand All @@ -71,14 +74,8 @@ public void testSinkNotReady() {
// Do another intialize to make sure the sink is still not ready
abstractSink.initialize();
Assert.assertEquals(abstractSink.isReady(), false);
while (!abstractSink.isReady()) {
try {
Thread.sleep(1000);
} catch (Exception e) {}
}
try {
Thread.sleep(2000);
} catch (Exception e) {}
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
Assert.assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
abstractSink.shutdown();
}
Expand All @@ -92,7 +89,7 @@ public AbstractSinkImpl(PluginSetting pluginSetting) {
@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(500);
Thread.sleep(200);
} catch (InterruptedException e) {

}
Expand Down Expand Up @@ -126,7 +123,7 @@ public AbstractSinkNotReadyImpl(PluginSetting pluginSetting) {
@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(500);
Thread.sleep(100);
} catch (InterruptedException e) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@ExtendWith(MockitoExtension.class)
public class AcknowledgementSetMonitorTests {
private static final int DEFAULT_WAIT_TIME_MS = 2000;
private static final int DEFAULT_WAIT_TIME_MS = 500;
@Mock
DefaultAcknowledgementSet acknowledgementSet1;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand All @@ -67,7 +68,7 @@
import static org.mockito.Mockito.when;

class PipelineTests {
private static final int TEST_READ_BATCH_TIMEOUT = 3000;
private static final int TEST_READ_BATCH_TIMEOUT = 500;
private static final int TEST_PROCESSOR_THREADS = 1;
private static final String TEST_PIPELINE_NAME = "test-pipeline";

Expand Down Expand Up @@ -168,9 +169,9 @@ void testPipelineStateWithProcessor() {

@Test
void testPipelineDelayedReady() throws InterruptedException {
final int delayTimeSeconds = 10;
final Duration delayTime = Duration.ofMillis(2000);
final Source<Record<String>> testSource = new TestSource();
final TestSink testSink = new TestSink(delayTimeSeconds);
final TestSink testSink = new TestSink(delayTime);
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>()));
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
Expand All @@ -182,11 +183,11 @@ void testPipelineDelayedReady() throws InterruptedException {
Instant startTime = Instant.now();
testPipeline.execute();
assertFalse(testPipeline.isReady());
for (int i = 0; i < delayTimeSeconds + 2; i++) {
Thread.sleep(1000);
}
await().atMost(Duration.ofSeconds(2).plus(delayTime))
.pollInterval(Duration.ofMillis(200))
.until(testPipeline::isReady);
assertTrue(testPipeline.isReady());
assertThat(Duration.between(startTime, Instant.now()), greaterThanOrEqualTo(Duration.ofSeconds(delayTimeSeconds)));
assertThat(Duration.between(startTime, Instant.now()), greaterThanOrEqualTo(delayTime));
assertThat("Pipeline isStopRequested is expected to be false", testPipeline.isStopRequested(), is(false));
testPipeline.shutdown();
assertThat("Pipeline isStopRequested is expected to be true", testPipeline.isStopRequested(), is(true));
Expand All @@ -196,9 +197,9 @@ void testPipelineDelayedReady() throws InterruptedException {

@Test
void testPipelineDelayedReadyShutdownBeforeReady() throws InterruptedException {
final int delayTimeSeconds = 10;
final Duration delayTime = Duration.ofSeconds(2);
final Source<Record<String>> testSource = new TestSource();
final TestSink testSink = new TestSink(delayTimeSeconds);
final TestSink testSink = new TestSink(delayTime);
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>()));
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -29,11 +30,11 @@ public TestSink() {
this.ready = true;
}

public TestSink(int readyAfterSecs) {
public TestSink(Duration readyAfter) {
this.ready = false;
this.failSinkForTest = false;
this.collectedRecords = new ArrayList<>();
this.readyTime = Instant.now().plusSeconds(readyAfterSecs);
this.readyTime = Instant.now().plus(readyAfter);
}

public TestSink(boolean failSinkForTest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private AggregateProcessor createObjectUnderTest() {
return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator);
}

@RepeatedTest(value = 10)
@RepeatedTest(value = 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see an issue with just removing the @RepeatedTest annotation all together from these tests

void aggregateWithNoConcludingGroupsReturnsExpectedResult() throws InterruptedException {
aggregateAction = new RemoveDuplicatesAggregateAction();
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -260,7 +260,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException {
}

@ParameterizedTest
@ValueSource(doubles = {5.0, 15.0, 33.0, 55.0, 70.0, 85.0, 92.0, 99.0})
@ValueSource(doubles = {5.0, 15.0, 55.0, 92.0, 99.0})
void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedException, NoSuchFieldException, IllegalAccessException {
PercentSamplerAggregateActionConfig percentSamplerAggregateActionConfig = new PercentSamplerAggregateActionConfig();
setField(PercentSamplerAggregateActionConfig.class, percentSamplerAggregateActionConfig, "percent", testPercent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_noLimit_THEN_keepsWritingToB

BlockingBuffer<Record<Event>> spyBuffer = spy(new BlockingBuffer<Record<Event>>("SamplePipeline"));

lenient().when(sourceConfig.getInterval()).thenReturn(Duration.ofSeconds(1)); // interval of 1 second
Duration interval = Duration.ofMillis(100);

lenient().when(sourceConfig.getInterval()).thenReturn(interval);
lenient().when(sourceConfig.getCount()).thenReturn(INFINITE_LOG_COUNT); // no limit to log count

logGeneratorSource.start(spyBuffer);
Thread.sleep(1500);

Thread.sleep((long) (interval.toMillis() * 1.5));
verify(spyBuffer, atLeast(1)).write(any(Record.class), anyInt());
Thread.sleep(700);
Thread.sleep((long) (interval.toMillis() * 0.7));
verify(spyBuffer, atLeast(2)).write(any(Record.class), anyInt());
}

Expand All @@ -102,16 +103,18 @@ void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_reachedLimit_THEN_stopsWriti

BlockingBuffer<Record<Event>> spyBuffer = spy(new BlockingBuffer<Record<Event>>("SamplePipeline"));

lenient().when(sourceConfig.getInterval()).thenReturn(Duration.ofSeconds(1)); // interval of 1 second
Duration interval = Duration.ofMillis(100);

lenient().when(sourceConfig.getInterval()).thenReturn(interval);
lenient().when(sourceConfig.getCount()).thenReturn(1); // max log count of 1 in logGeneratorSource

assertEquals(spyBuffer.isEmpty(), true);
logGeneratorSource.start(spyBuffer);
Thread.sleep(1100);
Thread.sleep((long) (interval.toMillis() * 1.1));

verify(spyBuffer, times(1)).write(any(Record.class), anyInt());

Thread.sleep(1000);
Thread.sleep(interval.toMillis());
verify(spyBuffer, times(1)).write(any(Record.class), anyInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void start(final Buffer<Record<Document>> buffer) {
throw new IllegalStateException("Buffer is null");
}
rssReaderTask = new RssReaderTask(rssReader, rssSourceConfig.getUrl(), buffer);
scheduledExecutorService.scheduleAtFixedRate(rssReaderTask, 0, 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(rssReaderTask, 0,
rssSourceConfig.getPollingFrequency().toMillis(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.time.Duration;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
Expand All @@ -42,12 +44,14 @@ class RSSSourceTest {
private PluginMetrics pluginMetrics;

private RSSSource rssSource;
private Duration pollingFrequency;

@BeforeEach
void setUp() {
pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, PIPELINE_NAME);
pollingFrequency = Duration.ofMillis(1800);
lenient().when(rssSourceConfig.getUrl()).thenReturn(VALID_RSS_URL);
lenient().when(rssSourceConfig.getPollingFrequency()).thenReturn(Duration.ofSeconds(5));
lenient().when(rssSourceConfig.getPollingFrequency()).thenReturn(pollingFrequency);
rssSource = new RSSSource(pluginMetrics, rssSourceConfig);
}

Expand All @@ -59,9 +63,15 @@ public void tearDown() {
@Test
void test_ExecutorService_keep_writing_Events_to_Buffer() throws Exception {
rssSource.start(buffer);
Thread.sleep(5000);
verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt());
Thread.sleep(5000);
await().atMost(pollingFrequency.multipliedBy(2))
.untilAsserted(() -> {
verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt());
});
verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt());
await().atMost(pollingFrequency.multipliedBy(2))
.untilAsserted(() -> {
verify(buffer, atLeast(2)).writeAll(anyCollection(), anyInt());
});
verify(buffer, atLeast(2)).writeAll(anyCollection(), anyInt());
}
}
Loading