Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sqs-source-integration-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
udaych20 committed Jun 27, 2023
2 parents cf6ba22 + 51722ba commit adf8738
Show file tree
Hide file tree
Showing 91 changed files with 5,176 additions and 690 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public Boolean hasTags(final List<String> tagsList) {

@Override
public void addTags(final List<String> newTags) {
tags.addAll(newTags);
if (Objects.nonNull(newTags)) {
tags.addAll(newTags);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ void fromEventMetadata_returns_matching_EventMetadata() {
assertThat(copiedMetadata.getAttributes(), not(sameInstance(attributes)));
}

@Test
public void testEventMetadata_withNullTags() {
final String testEventType = UUID.randomUUID().toString();

final EventMetadata eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.build();
assertThat(eventMetadata, notNullValue());
eventMetadata.addTags(null);
assertThat(eventMetadata.getTags(), equalTo(Collections.emptySet()));
}

@Test
public void testBuild_withTags() {
final String testEventType = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ private void doRun() {
processAcknowledgements(inputEvents, records);
}
}
if (!records.isEmpty()) {
postToSink(records);
}

postToSink(records);
// Checkpoint the current batch read from the buffer after being processed by processors and sinks.
readBuffer.checkpoint(checkpointState);
}
Expand Down
13 changes: 13 additions & 0 deletions data-prepper-plugins/aws-sqs-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id 'java-library'
}
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation libs.armeria.core
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:arns:'

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.aws.sqs.common;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;

import java.util.Map;

/**
* A Factory for creating and fetching the <code>SqsClient</code>.
*/
public class ClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(ClientFactory.class);

private ClientFactory(){
}


/**
* Create a SqsClient Object for reading the sqs messages
* @param region - aws region should be required
* @param roleArn - aws role arn should be required
* @param stsHeader - aws role header should be required
* @param awsCredentialsSupplier - aws credentials supplier should be required
* @return a SqsClient Object
*/
public static SqsClient createSqsClient(final Region region,
final String roleArn,
final Map<String,String> stsHeader,
final AwsCredentialsSupplier awsCredentialsSupplier) {
LOG.info("Creating SQS client");
return SqsClient.builder()
.region(region)
.credentialsProvider(awsCredentialsSupplier.getProvider(convertToCredentialOptions(region,roleArn,stsHeader)))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(builder -> builder.numRetries(5).build())
.build())
.build();
}

private static AwsCredentialsOptions convertToCredentialOptions(final Region region,
final String roleArn,
final Map<String,String> stsHeader) {
return AwsCredentialsOptions.builder()
.withRegion(region)
.withStsRoleArn(roleArn)
.withStsHeaderOverrides(stsHeader)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.aws.sqs.common;

import com.linecorp.armeria.client.retry.Backoff;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.aws.sqs.common.exception.SqsRetriesExhaustedException;
import org.opensearch.dataprepper.plugins.aws.sqs.common.metrics.SqsMetrics;
import org.opensearch.dataprepper.plugins.aws.sqs.common.model.SqsOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SqsException;
import software.amazon.awssdk.services.sts.model.StsException;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* contains the sqs related common functionality for reading
* the messages and processing
*/
public class SqsService {
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);

static final Duration END_TO_END_ACK_TIME_OUT = Duration.ofSeconds(10);

private final SqsMetrics sqsMetrics;

private final SqsClient sqsClient;

private final Backoff backoff;

private int failedAttemptCount;


public SqsService(final SqsMetrics sqsMetrics,
final SqsClient sqsClient,
final Backoff backoff) {
this.sqsMetrics = sqsMetrics;
this.sqsClient = sqsClient;
this.backoff =backoff;

}

/**
* Create a sqs message request object with the help of queue url and max messages count.
*
* @param sqsOptions - required sqs option object
* @return ReceiveMessageRequest - return the Aws Message Request object.
*/
public ReceiveMessageRequest createReceiveMessageRequest(final SqsOptions sqsOptions) {
return ReceiveMessageRequest.builder()
.queueUrl(sqsOptions.getSqsUrl())
.maxNumberOfMessages(sqsOptions.getMaximumMessages())
.visibilityTimeout(getTimeOutValueByDuration(sqsOptions.getVisibilityTimeout()))
.waitTimeSeconds(getTimeOutValueByDuration(sqsOptions.getWaitTime()))
.messageAttributeNames("All")
.build();
}

private static Integer getTimeOutValueByDuration(final Duration duration) {
if(Objects.nonNull(duration))
return (int) duration.toSeconds();
return null;
}

/**
* fetch the sqs message from provided sqs queue url options
*
* @param sqsOptions - required sqs option object
* @return Messages list - return the list of sqs messages from queue.
*/
public List<Message> getMessagesFromSqs(final SqsOptions sqsOptions) {
try {
final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest(sqsOptions);
final ReceiveMessageResponse receiveMessageResponse = sqsClient.receiveMessage(receiveMessageRequest);
return receiveMessageResponse.messages();
} catch (final SqsException | StsException e) {
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
sqsMetrics.getSqsReceiveMessagesFailedCounter().increment();
applyBackoff();
return Collections.emptyList();
}
}

/**
* contains a back off functionality.
*
*/
public void applyBackoff() {
final long delayMillis = backoff.nextDelayMillis(++failedAttemptCount);
if (delayMillis < 0) {
Thread.currentThread().interrupt();
throw new SqsRetriesExhaustedException("SQS retries exhausted. Make sure that SQS configuration is valid, SQS queue exists, and IAM role has required permissions.");
}
final Duration delayDuration = Duration.ofMillis(delayMillis);
LOG.info("Pausing SQS processing for {}.{} seconds due to an error in processing.",
delayDuration.getSeconds(), delayDuration.toMillisPart());
try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){
LOG.error("Thread is interrupted while polling SQS with retry.", e);
Thread.currentThread().interrupt();
}
}

/**
* helps to delete the sqs messages and update the respective metrics.
*
* @param deleteMsgBatchReqList - required list deleteMsgBatchReqList object
* @param queueUrl - required queue url for deleting messages
*/
public void deleteMessagesFromQueue(final List<DeleteMessageBatchRequestEntry> deleteMsgBatchReqList,
final String queueUrl) {
try{
final DeleteMessageBatchResponse deleteMessageBatchResponse =
sqsClient.deleteMessageBatch(DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(deleteMsgBatchReqList)
.build());
updateMetricsForDeletedMessages(deleteMessageBatchResponse);
updateMetricsForUnDeletedMessages(deleteMessageBatchResponse);
}catch(Exception e){
final int failedMessageCount = deleteMsgBatchReqList.size();
sqsMetrics.getSqsMessagesDeleteFailedCounter().increment(failedMessageCount);
LOG.error("Failed to delete {} messages from SQS due to {}.", failedMessageCount, e.getMessage());
if(e instanceof StsException) {
applyBackoff();
}
}
}

/**
* helps to update the metrics for delete succeed messages.
* @param deleteMessageBatchResponse - required list deleteMessageBatchResponse object
*/
private void updateMetricsForDeletedMessages(DeleteMessageBatchResponse deleteMessageBatchResponse) {
if (deleteMessageBatchResponse.hasSuccessful()) {
final int deletedMessagesCount = deleteMessageBatchResponse.successful().size();
if (deletedMessagesCount > 0) {
final String successfullyDeletedMessages = deleteMessageBatchResponse.successful().stream()
.map(DeleteMessageBatchResultEntry::id)
.collect(Collectors.joining(", "));
LOG.info("Deleted {} messages from SQS. [{}]", deletedMessagesCount, successfullyDeletedMessages);
sqsMetrics.getSqsMessagesDeletedCounter().increment(deletedMessagesCount);
}
}
}

/**
* helps to update the metrics for delete failed messages.
* @param deleteMessageBatchResponse - required list deleteMessageBatchResponse object
*/
private void updateMetricsForUnDeletedMessages(DeleteMessageBatchResponse deleteMessageBatchResponse) {
if(deleteMessageBatchResponse.hasFailed()) {
final int failedDeleteCount = deleteMessageBatchResponse.failed().size();
sqsMetrics.getSqsMessagesDeleteFailedCounter().increment();
if(LOG.isErrorEnabled()) {
final String failedMessages = deleteMessageBatchResponse.failed().stream()
.map(failed -> toString())
.collect(Collectors.joining(", "));
LOG.error("Failed to delete {} messages from SQS with errors: [{}].", failedDeleteCount, failedMessages);
}
}
}

/**
* helps to create and fetch to delete message batch request entry list from messages list.
* @param messages - required list deleteMessageBatchResponse object
* @return DeleteMessageBatchRequestEntry list - provide the DeleteMessageBatchRequestEntry list
*/
public List<DeleteMessageBatchRequestEntry> getDeleteMessageBatchRequestEntryList(final List<Message> messages){
final List<DeleteMessageBatchRequestEntry> deleteMsgBatchReqList = new ArrayList<>(messages.size());
messages.forEach(message ->
deleteMsgBatchReqList.add(DeleteMessageBatchRequestEntry.builder()
.id(message.messageId()).receiptHandle(message.receiptHandle()).build()));
return deleteMsgBatchReqList;
}

/**
* helps to send end to end acknowledgements after successful processing.
*
* @param queueUrl - queue url for deleting the messages from the queue
* @param acknowledgementSetManager - required acknowledgementSetManager for creating acknowledgementSet
* @param waitingForAcknowledgements - will pass the processed messages batch in Delete message batch request.
* @return AcknowledgementSet - will generate the AcknowledgementSet if endToEndAcknowledgementsEnabled is true.
*/
public AcknowledgementSet createAcknowledgementSet(final String queueUrl,
final AcknowledgementSetManager acknowledgementSetManager,
final List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements) {
AcknowledgementSet acknowledgementSet = null;
acknowledgementSet = acknowledgementSetManager.create(result -> {
sqsMetrics.getAcknowledgementSetCallbackCounter().increment();
if (result == true) {
deleteMessagesFromQueue(waitingForAcknowledgements,queueUrl);
}
}, END_TO_END_ACK_TIME_OUT);
return acknowledgementSet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.aws.sqs.common.exception;

/**
* This exception is thrown when SQS retries are exhausted
*
* @since 2.1
*/
public class SqsRetriesExhaustedException extends RuntimeException {

public SqsRetriesExhaustedException(final String errorMessage) {
super(errorMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.aws.sqs.common.handler;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;

import java.util.List;

public interface SqsMessageHandler {
List<DeleteMessageBatchRequestEntry> handleMessages(final List<Message> messages,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet);
}
Loading

0 comments on commit adf8738

Please sign in to comment.