diff --git a/data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTaskIT.java b/data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTaskIT.java index 0512ff0cfa..6a79ced3ab 100644 --- a/data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTaskIT.java +++ b/data-prepper-plugins/sqs-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTaskIT.java @@ -27,6 +27,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.equalTo; @@ -70,9 +73,11 @@ public class SqsSourceTaskIT { private SqsMetrics sqsMetrics; + private ScheduledExecutorService executorService; + @ParameterizedTest - @CsvSource({"2,2","10,15","50,40","100,40","200,50","500,55","1000,60"}) - public void process_sqs_messages(int messageLoad,int secs){ + @CsvSource({"2,1","10,2","50,4","100,5","200,7","500,10","1000,15","2000,24"}) + public void process_sqs_messages(int messageLoad,int threadSleepTime){ final SqsRecordsGenerator sqsRecordsGenerator = new SqsRecordsGenerator(sqsClient); final String queueUrl = System.getProperty(AWS_SQS_QUEUE_URL); @@ -80,17 +85,14 @@ public void process_sqs_messages(int messageLoad,int secs){ this.buffer = getBuffer(inputDataList.size()); SqsOptions sqsOptions = new SqsOptions.Builder().setSqsUrl(queueUrl).setMaximumMessages(10).build(); - final Thread thread = createObjectUnderTest(sqsOptions); - thread.start(); - + executorService.scheduleAtFixedRate(createObjectUnderTest(sqsOptions),0,1, TimeUnit.MILLISECONDS); try { - Thread.sleep(Duration.ofSeconds(secs).toMillis()); + Thread.sleep(Duration.ofSeconds(threadSleepTime).toMillis()); } catch (InterruptedException e) { } - + executorService.shutdown(); final List> bufferEvents = new ArrayList<>(buffer.read((int) Duration.ofSeconds(10).toMillis()).getKey()); final List bufferData = bufferEvents.stream().map(obj -> obj.getData().get(MESSAGE, String.class)).collect(Collectors.toList()); - thread.interrupt(); assertThat(bufferData, containsInAnyOrder(inputDataList.toArray())); assertThat(bufferData.size(),equalTo(inputDataList.size())); } @@ -106,12 +108,13 @@ public void setup(){ this.backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE) .withMaxAttempts(Integer.MAX_VALUE); this.sqsClient = SqsClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build(); + executorService = Executors.newSingleThreadScheduledExecutor(); } - public Thread createObjectUnderTest(final SqsOptions sqsOptions){ + public SqsSourceTask createObjectUnderTest(final SqsOptions sqsOptions){ SqsService sqsService = new SqsService(sqsMetrics,sqsClient,backoff); - SqsMessageHandler sqsHandler = new RawSqsMessageHandler(buffer,sqsService); - return new Thread(new SqsSourceTask(sqsService,sqsOptions,sqsMetrics,acknowledgementSetManager,Boolean.FALSE,sqsHandler)); + SqsMessageHandler sqsHandler = new RawSqsMessageHandler(buffer,sqsService,10); + return new SqsSourceTask(sqsService,sqsOptions,sqsMetrics,acknowledgementSetManager,Boolean.FALSE,sqsHandler); } private static List pushMessagesToQueue(SqsRecordsGenerator sqsRecordsGenerator, String queueUrl,final int load) {