From 5971190eb523a50bf5a79d9e019943853990c572 Mon Sep 17 00:00:00 2001 From: Marcos Gonzalez Mayedo Date: Mon, 24 Jul 2023 14:34:52 -0700 Subject: [PATCH] Added CloudWatchLogsDispatcher builder pattern, fixed tests for Service and Dispatcher and modified backOffTimeBase Signed-off-by: Marcos Gonzalez Mayedo --- .../cloudwatch-logs/build.gradle | 3 + .../plugins/sink/buffer/InMemoryBuffer.java | 1 - .../client/CloudWatchLogsClientFactory.java | 4 +- .../sink/client/CloudWatchLogsDispatcher.java | 36 +-- .../sink/client/CloudWatchLogsService.java | 50 +++- .../CloudWatchLogsLimits.java | 12 +- .../sink/{time => utils}/SinkStopWatch.java | 7 +- .../client/CloudWatchLogsServiceTest.java | 230 ++++-------------- .../CloudWatchLogsLimitsTest.java | 1 + 9 files changed, 125 insertions(+), 219 deletions(-) rename data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/{push_condition => utils}/CloudWatchLogsLimits.java (91%) rename data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/{time => utils}/SinkStopWatch.java (86%) diff --git a/data-prepper-plugins/cloudwatch-logs/build.gradle b/data-prepper-plugins/cloudwatch-logs/build.gradle index bd387d69ef..0a3c815c0b 100644 --- a/data-prepper-plugins/cloudwatch-logs/build.gradle +++ b/data-prepper-plugins/cloudwatch-logs/build.gradle @@ -17,9 +17,12 @@ dependencies { implementation 'software.amazon.awssdk:cloudwatch' implementation 'software.amazon.awssdk:cloudwatchlogs' implementation 'org.apache.commons:commons-lang3:3.12.0' + implementation 'org.projectlombok:lombok:1.18.26' testImplementation project(path: ':data-prepper-test-common') testImplementation testLibs.mockito.inline testImplementation 'org.junit.jupiter:junit-jupiter' + compileOnly 'org.projectlombok:lombok:1.18.24' + annotationProcessor 'org.projectlombok:lombok:1.18.24' } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java index a93cb99ad7..572f1ad9c1 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java @@ -41,7 +41,6 @@ public byte[] popEvent() { @Override public List getBufferedData() { - Collections.unmodifiableList(eventsBuffered); return Collections.unmodifiableList(eventsBuffered); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java index 8d2672e9ab..00ba993060 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java @@ -41,10 +41,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi } private static ClientOverrideConfiguration createOverrideConfiguration() { - final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build(); - return ClientOverrideConfiguration.builder() - .retryPolicy(retryPolicy) + .retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS)) .build(); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java index 1c3137f99e..bd49248288 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java @@ -5,7 +5,9 @@ package org.opensearch.dataprepper.plugins.sink.client; +import lombok.Builder; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,22 +22,23 @@ import java.util.List; import java.util.concurrent.BlockingQueue; +@Builder public class CloudWatchLogsDispatcher implements Runnable { - public static final long UPPER_RETRY_TIME_BOUND = 20000; - public static final float EXP_TIME_SCALER = 1.5F; + private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 5000; + private static final float EXP_TIME_SCALE = 1.5F; private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class); - private final BlockingQueue taskQueue; - private final CloudWatchLogsClient cloudWatchLogsClient; - private final CloudWatchLogsMetrics cloudWatchLogsMetrics; - private final String logGroup; - private final String logStream; - final int retryCount; - final long backOffTimeBase; + private BlockingQueue taskQueue; + private CloudWatchLogsClient cloudWatchLogsClient; + private CloudWatchLogsMetrics cloudWatchLogsMetrics; + private String logGroup; + private String logStream; + private int retryCount; + private long backOffTimeBase; public CloudWatchLogsDispatcher(final BlockingQueue taskQueue, final CloudWatchLogsClient cloudWatchLogsClient, final CloudWatchLogsMetrics cloudWatchLogsMetrics, final String logGroup, final String logStream, - final int retryCount, final int backOffTimeBase) { + final int retryCount, final long backOffTimeBase) { this.taskQueue = taskQueue; this.cloudWatchLogsClient = cloudWatchLogsClient; this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; @@ -64,7 +67,7 @@ private List prepareInputLogEvents(final ThreadTaskEvents eventDa * @param inputLogEvents Collection of inputLogEvents to be flushed * @return true if successful, false otherwise */ - public boolean dispatchLogs(List inputLogEvents, Collection eventHandles) { + public void dispatchLogs(List inputLogEvents, Collection eventHandles) { boolean failedPost = true; int failCounter = 0; @@ -74,7 +77,6 @@ public boolean dispatchLogs(List inputLogEvents, Collection inputLogEvents, Collection= UPPER_RETRY_TIME_BOUND) { - return UPPER_RETRY_TIME_BOUND; + if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) { + return UPPER_RETRY_TIME_BOUND_MILLISECONDS; } return scale * backOffTimeBase; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java index 1de8fc9b72..8480f9b874 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java @@ -10,10 +10,11 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.sink.buffer.Buffer; import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents; -import org.opensearch.dataprepper.plugins.sink.push_condition.CloudWatchLogsLimits; -import org.opensearch.dataprepper.plugins.sink.time.SinkStopWatch; +import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; +import org.opensearch.dataprepper.plugins.sink.utils.SinkStopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import java.util.ArrayList; import java.util.Collection; @@ -26,28 +27,43 @@ public class CloudWatchLogsService { private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsService.class); + private final CloudWatchLogsClient cloudWatchLogsClient; + private final CloudWatchLogsMetrics cloudWatchLogsMetrics; private final Buffer buffer; private final CloudWatchLogsLimits cloudWatchLogsLimits; private List bufferedEventHandles; private final BlockingQueue taskQueue; private final SinkStopWatch sinkStopWatch; private final ReentrantLock bufferLock; - private final Executor sinkThreadManager; - private final CloudWatchLogsDispatcher dispatcher; + private final Executor asyncExecutor; + private final String logGroup; + private final String logStream; + private final int retryCount; + private final long backOffTimeBase; public CloudWatchLogsService(final Buffer buffer, + final CloudWatchLogsClient cloudWatchLogsClient, + final CloudWatchLogsMetrics cloudWatchLogsMetrics, final CloudWatchLogsLimits cloudWatchLogsLimits, - final CloudWatchLogsDispatcher dispatcher, BlockingQueue blockingQueue) { + final BlockingQueue blockingQueue, + final String logGroup, final String logStream, + final int retryCount, final long backOffTimeBase) { this.buffer = buffer; - this.dispatcher = dispatcher; - this.bufferedEventHandles = new ArrayList<>(); + this.cloudWatchLogsClient = cloudWatchLogsClient; + this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; this.cloudWatchLogsLimits = cloudWatchLogsLimits; this.taskQueue = blockingQueue; + this.logGroup = logGroup; + this.logStream = logStream; + this.retryCount = retryCount; + this.backOffTimeBase = backOffTimeBase; + + this.bufferedEventHandles = new ArrayList<>(); bufferLock = new ReentrantLock(); sinkStopWatch = new SinkStopWatch(); - sinkThreadManager = newCachedThreadPool(); + asyncExecutor = newCachedThreadPool(); } /** @@ -91,16 +107,26 @@ public void processLogEvents(final Collection> logs) { private void stageLogEvents() { sinkStopWatch.stopAndResetStopWatch(); - ArrayList eventMessageCloneList = new ArrayList<>(); - cloneLists(buffer.getBufferedData(), eventMessageCloneList); + ArrayList eventMessageClone = new ArrayList<>(); + cloneLists(buffer.getBufferedData(), eventMessageClone); - ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageCloneList, bufferedEventHandles); + ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageClone, bufferedEventHandles); taskQueue.add(dataToPush); buffer.clearBuffer(); bufferedEventHandles = new ArrayList<>(); - sinkThreadManager.execute(dispatcher); + CloudWatchLogsDispatcher newTaskDispatcher = CloudWatchLogsDispatcher.builder() + .taskQueue(taskQueue) + .cloudWatchLogsClient(cloudWatchLogsClient) + .cloudWatchLogsMetrics(cloudWatchLogsMetrics) + .logGroup(logGroup) + .logStream(logStream) + .retryCount(retryCount) + .backOffTimeBase(backOffTimeBase) + .build(); + + asyncExecutor.execute(newTaskDispatcher); } private void addToBuffer(final Record log) { diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimits.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimits.java similarity index 91% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimits.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimits.java index d1fc837da9..64fdb4afcc 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimits.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimits.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.push_condition; +package org.opensearch.dataprepper.plugins.sink.utils; /** * ThresholdCheck receives parameters for which to reference the * limits of a buffer and CloudWatchLogsClient before making a @@ -11,13 +11,13 @@ */ public class CloudWatchLogsLimits { public static final int APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE = 26; //Size of overhead for each log event message. - private final int batchSize; + private final int maxBatchSize; private final int maxEventSizeBytes; private final int maxRequestSizeBytes; private final long logSendInterval; - public CloudWatchLogsLimits(final int batchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) { - this.batchSize = batchSize; + public CloudWatchLogsLimits(final int maxBatchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) { + this.maxBatchSize = maxBatchSize; this.maxEventSizeBytes = maxEventSizeBytes; this.maxRequestSizeBytes = maxRequestSizeBytes; this.logSendInterval = logSendInterval; @@ -82,7 +82,7 @@ private boolean isGreaterThanMaxRequestSize(final int currentRequestSize) { * @return boolean - true if greater, false otherwise. */ private boolean isGreaterThanBatchSize(final int batchSize) { - return batchSize > this.batchSize; + return batchSize > this.maxBatchSize; } /** @@ -95,6 +95,6 @@ private boolean isEqualMaxRequestSize(final int currentRequestSize) { } private boolean isEqualBatchSize(final int batchSize) { - return batchSize == this.batchSize; + return batchSize == this.maxBatchSize; } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/time/SinkStopWatch.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/SinkStopWatch.java similarity index 86% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/time/SinkStopWatch.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/SinkStopWatch.java index 52352d6cd1..036fa949f5 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/time/SinkStopWatch.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/SinkStopWatch.java @@ -3,13 +3,18 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.time; +package org.opensearch.dataprepper.plugins.sink.utils; import org.apache.commons.lang3.time.StopWatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +/** + * A synchronized watch for accessing time + * related data. (Wrapper around StopWatch class + * from "commons.apache.lang3") + */ public class SinkStopWatch { private final StopWatch stopWatch; private final ReentrantLock stopWatchLock; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java index da548a55a1..30980ad10a 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java @@ -7,40 +7,53 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.sink.buffer.Buffer; import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsDispatcher.CloudWatchLogsDispatcherBuilder; import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig; import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents; -import org.opensearch.dataprepper.plugins.sink.push_condition.CloudWatchLogsLimits; +import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.never; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.when; public class CloudWatchLogsServiceTest { - private static final int NUMBER_THREADS_SMALL = 5; - private static final int NUMBER_THREADS_BIG = 10; - private static final int NUMBER_THREADS_LARGE = 20; - private BlockingQueue mockQueue; + private static final int MAX_QUEUE_SIZE = 100; + private CloudWatchLogsClient mockClient; + private CloudWatchLogsMetrics mockMetrics; + private BlockingQueue testQueue; private CloudWatchLogsService cloudWatchLogsService; + private CloudWatchLogsDispatcherBuilder mockDispatchBuilder; private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; private ThresholdConfig thresholdConfig; private CloudWatchLogsLimits cloudWatchLogsLimits; private InMemoryBufferFactory inMemoryBufferFactory; private Buffer buffer; - private CloudWatchLogsDispatcher dispatcher; - private volatile int testCounter; + private CloudWatchLogsDispatcher testDispatcher; + private final String logGroup = "testGroup"; + private final String logStream = "testStream"; @BeforeEach void setUp() { @@ -50,14 +63,33 @@ void setUp() { cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(), thresholdConfig.getMaxRequestSize(), thresholdConfig.getLogSendInterval()); + mockClient = mock(CloudWatchLogsClient.class); + mockMetrics = mock(CloudWatchLogsMetrics.class); inMemoryBufferFactory = new InMemoryBufferFactory(); buffer = inMemoryBufferFactory.getBuffer(); - dispatcher = mock(CloudWatchLogsDispatcher.class); - mockQueue = mock(BlockingQueue.class); - - cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, dispatcher, mockQueue); - - testCounter = 0; + testDispatcher = mock(CloudWatchLogsDispatcher.class); + testQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); + + mockDispatchBuilder = mock(CloudWatchLogsDispatcherBuilder.class, RETURNS_DEEP_STUBS); + when(mockDispatchBuilder.taskQueue(any(BlockingQueue.class))).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.cloudWatchLogsClient(any(CloudWatchLogsClient.class))).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.cloudWatchLogsMetrics(any(CloudWatchLogsMetrics.class))).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.logGroup(anyString())).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.logStream(anyString())).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.retryCount(anyInt())).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.backOffTimeBase(anyInt())).thenReturn(mockDispatchBuilder); + when(mockDispatchBuilder.taskQueue(any(BlockingQueue.class)) + .cloudWatchLogsClient(any(CloudWatchLogsClient.class)) + .cloudWatchLogsMetrics(any(CloudWatchLogsMetrics.class)).logGroup(logGroup) + .logStream(anyString()) + .retryCount(anyInt()) + .backOffTimeBase(anyLong()) + .build()).thenReturn(testDispatcher); + + cloudWatchLogsService = new CloudWatchLogsService(buffer, mockClient, mockMetrics, + cloudWatchLogsLimits, testQueue, + logGroup, logStream, + thresholdConfig.getRetryCount(), thresholdConfig.getBackOffTime()); } Collection> getSampleRecordsLess() { @@ -99,178 +131,20 @@ Collection> getSampleRecordsLarge() { @Test void check_dispatcher_run_was_not_called() { cloudWatchLogsService.processLogEvents(getSampleRecordsLess()); - verify(dispatcher, never()).run(); + verify(mockClient, never()).putLogEvents(any(PutLogEventsRequest.class)); } @Test - void check_dispatcher_run_was_called_test() { + void check_dispatcher_run_was_called_test() throws InterruptedException { cloudWatchLogsService.processLogEvents(getSampleRecords()); - verify(dispatcher, atLeastOnce()).run(); + Thread.sleep(100); + verify(mockClient, atLeastOnce()).putLogEvents(any(PutLogEventsRequest.class)); } @Test - void check_dispatcher_run_called_heavy_load() { + void check_dispatcher_run_called_heavy_load() throws InterruptedException { cloudWatchLogsService.processLogEvents(getSampleRecordsLarge()); - verify(dispatcher, atLeast(4)).run(); - } - - //TODO: Add multithreaded testing to ensure that the proper methods (run) gets called. - - @Test - void test_less_threads_normal_load() { - Collection threadsToRun = new ArrayList<>(); - - for (int i = 0; i < NUMBER_THREADS_SMALL; i++) { - Thread testingThread = new Thread(new CloudWatchLogsServiceTester(getSampleRecords(), cloudWatchLogsService)); - threadsToRun.add(testingThread); - } - - for (Thread serviceTester: threadsToRun) { - serviceTester.start(); - } - - for (Thread serviceTester: threadsToRun) { - try { - serviceTester.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - verify(dispatcher, atLeast(NUMBER_THREADS_SMALL)).run(); - } - - @Test - void test_less_threads_heavy_load() { - Collection threadsToRun = new ArrayList<>(); - - for (int i = 0; i < NUMBER_THREADS_SMALL; i++) { - Thread testingThread = new Thread(new CloudWatchLogsServiceTester(getSampleRecordsLarge(), cloudWatchLogsService)); - threadsToRun.add(testingThread); - } - - for (Thread serviceTester: threadsToRun) { - serviceTester.start(); - } - - for (Thread serviceTester: threadsToRun) { - try { - serviceTester.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - verify(dispatcher, atLeast(NUMBER_THREADS_SMALL * 4)).run(); - } - - @Test - void test_more_threads_normal_load() { - Collection threadsToRun = new ArrayList<>(); - - for (int i = 0; i < NUMBER_THREADS_BIG; i++) { - Thread testingThread = new Thread(new CloudWatchLogsServiceTester(getSampleRecords(), cloudWatchLogsService)); - threadsToRun.add(testingThread); - } - - for (Thread serviceTester: threadsToRun) { - serviceTester.start(); - } - - for (Thread serviceTester: threadsToRun) { - try { - serviceTester.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - verify(dispatcher, atLeast(NUMBER_THREADS_BIG)).run(); - } - - @Test - void test_more_threads_heavy_load() { - Collection threadsToRun = new ArrayList<>(); - - for (int i = 0; i < NUMBER_THREADS_BIG; i++) { - Thread testingThread = new Thread(new CloudWatchLogsServiceTester(getSampleRecordsLarge(), cloudWatchLogsService)); - threadsToRun.add(testingThread); - } - - for (Thread serviceTester: threadsToRun) { - serviceTester.start(); - } - - for (Thread serviceTester: threadsToRun) { - try { - serviceTester.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - verify(dispatcher, atLeast(NUMBER_THREADS_BIG * 4)).run(); - } - - @Test - void test_large_threads_normal_load() { - Collection threadsToRun = new ArrayList<>(); - - for (int i = 0; i < NUMBER_THREADS_LARGE; i++) { - Thread testingThread = new Thread(new CloudWatchLogsServiceTester(getSampleRecords(), cloudWatchLogsService)); - threadsToRun.add(testingThread); - } - - for (Thread serviceTester: threadsToRun) { - serviceTester.start(); - } - - for (Thread serviceTester: threadsToRun) { - try { - serviceTester.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - verify(dispatcher, atLeast(NUMBER_THREADS_LARGE)).run(); - } - - @Test - void test_large_threads_heavy_load() { - Collection threadsToRun = new ArrayList<>(); - - for (int i = 0; i < NUMBER_THREADS_LARGE; i++) { - Thread testingThread = new Thread(new CloudWatchLogsServiceTester(getSampleRecordsLarge(), cloudWatchLogsService)); - threadsToRun.add(testingThread); - } - - for (Thread serviceTester: threadsToRun) { - serviceTester.start(); - } - - for (Thread serviceTester: threadsToRun) { - try { - serviceTester.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - verify(dispatcher, atLeast(NUMBER_THREADS_LARGE * 4)).run(); - } - - static class CloudWatchLogsServiceTester implements Runnable { - Collection> testEvents; - CloudWatchLogsService testCloudWatchLogsService; - CloudWatchLogsServiceTester(Collection> events, CloudWatchLogsService cloudWatchLogsService) { - testEvents = events; - testCloudWatchLogsService = cloudWatchLogsService; - } - - @Override - public void run() { - testCloudWatchLogsService.processLogEvents(testEvents); - } + Thread.sleep(100); + verify(mockClient, atLeast(4)).putLogEvents(any(PutLogEventsRequest.class)); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimitsTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimitsTest.java index 18eff78eef..1ae09ce471 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimitsTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/push_condition/CloudWatchLogsLimitsTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue;