Skip to content

Commit

Permalink
Incorporated review comment changes
Browse files Browse the repository at this point in the history
Signed-off-by: Uday Kumar Chintala <[email protected]>
  • Loading branch information
udaych20 committed Jun 21, 2023
1 parent e917b7a commit cf6ba22
Showing 1 changed file with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -70,27 +73,26 @@ public class SqsSourceTaskIT {

private SqsMetrics sqsMetrics;

private ScheduledExecutorService executorService;

@ParameterizedTest
@CsvSource({"2,2","10,15","50,40","100,40","200,50","500,55","1000,60"})
public void process_sqs_messages(int messageLoad,int secs){
@CsvSource({"2,1","10,2","50,4","100,5","200,7","500,10","1000,15","2000,24"})
public void process_sqs_messages(int messageLoad,int threadSleepTime){
final SqsRecordsGenerator sqsRecordsGenerator = new SqsRecordsGenerator(sqsClient);
final String queueUrl = System.getProperty(AWS_SQS_QUEUE_URL);

List<String> inputDataList = pushMessagesToQueue(sqsRecordsGenerator, queueUrl,messageLoad);
this.buffer = getBuffer(inputDataList.size());

SqsOptions sqsOptions = new SqsOptions.Builder().setSqsUrl(queueUrl).setMaximumMessages(10).build();
final Thread thread = createObjectUnderTest(sqsOptions);
thread.start();

executorService.scheduleAtFixedRate(createObjectUnderTest(sqsOptions),0,1, TimeUnit.MILLISECONDS);
try {
Thread.sleep(Duration.ofSeconds(secs).toMillis());
Thread.sleep(Duration.ofSeconds(threadSleepTime).toMillis());
} catch (InterruptedException e) {
}

executorService.shutdown();
final List<Record<Event>> bufferEvents = new ArrayList<>(buffer.read((int) Duration.ofSeconds(10).toMillis()).getKey());
final List<String> bufferData = bufferEvents.stream().map(obj -> obj.getData().get(MESSAGE, String.class)).collect(Collectors.toList());
thread.interrupt();
assertThat(bufferData, containsInAnyOrder(inputDataList.toArray()));
assertThat(bufferData.size(),equalTo(inputDataList.size()));
}
Expand All @@ -106,12 +108,13 @@ public void setup(){
this.backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
this.sqsClient = SqsClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build();
executorService = Executors.newSingleThreadScheduledExecutor();
}

public Thread createObjectUnderTest(final SqsOptions sqsOptions){
public SqsSourceTask createObjectUnderTest(final SqsOptions sqsOptions){
SqsService sqsService = new SqsService(sqsMetrics,sqsClient,backoff);
SqsMessageHandler sqsHandler = new RawSqsMessageHandler(buffer,sqsService);
return new Thread(new SqsSourceTask(sqsService,sqsOptions,sqsMetrics,acknowledgementSetManager,Boolean.FALSE,sqsHandler));
SqsMessageHandler sqsHandler = new RawSqsMessageHandler(buffer,sqsService,10);
return new SqsSourceTask(sqsService,sqsOptions,sqsMetrics,acknowledgementSetManager,Boolean.FALSE,sqsHandler);
}

private static List<String> pushMessagesToQueue(SqsRecordsGenerator sqsRecordsGenerator, String queueUrl,final int load) {
Expand Down

0 comments on commit cf6ba22

Please sign in to comment.