Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sns Sink Plugin with junit test cases #2995

Merged
merged 28 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
643d06c
Sns Sink Plugin with junit test cases
udaych20 Jul 10, 2023
dce26a7
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 12, 2023
23b12c7
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 13, 2023
69a0ca0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 17, 2023
caf5d1c
Sns Sink DLQ changes
udaych20 Jul 17, 2023
c459013
Incorporated FIFO Topic related Changes
udaych20 Jul 17, 2023
96d09b0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 18, 2023
1b5b45b
SNS Sink incorporated the review comments.
udaych20 Jul 18, 2023
eb012af
SNS Sink incorporated the review comments.
udaych20 Jul 18, 2023
d46b778
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 20, 2023
370d57e
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 24, 2023
08b1146
Dlq changes for sns sink
udaych20 Jul 24, 2023
0ca43a0
local date stamp removed from dlq
udaych20 Jul 25, 2023
6a9ef69
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 25, 2023
818a03f
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 26, 2023
1027c78
SNS Sink removed threshold and pushed records in specified batch
udaych20 Jul 26, 2023
536d6f0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 27, 2023
2008138
SNS Sink Integration Tests
udaych20 Jul 27, 2023
7b94e33
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
udaych20 Jul 27, 2023
b7b6e5e
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 31, 2023
4849418
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 1, 2023
325bf8a
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 1, 2023
526b521
Sns Sink Review Changes
udaych20 Aug 1, 2023
c779a1f
Sns Sink Test Case Changes
udaych20 Aug 1, 2023
67f10d9
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 2, 2023
3ba60d7
Sns Sink Review changes
udaych20 Aug 2, 2023
3a19a4e
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
udaych20 Aug 2, 2023
7088650
Merge branch 'main' into sns-sink-plugin
udaych20 Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions data-prepper-plugins/sns-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SNS Sink

This is the Data Prepper SNS Sink plugin that sends records to an SNS Topic.

## Usages

The SNS sink should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
sink:
- sns:
topic_arn: arn:aws:sns:ap-south-1:524239988922:my-topic
message_group_id: /type
message_deduplication_id: /id
batch_size: 10
aws:
region: ap-south-1
sts_role_arn: arn:aws:iam::524239988922:role/app-test
dlq:
s3:
bucket: test-bucket
key_path_prefix: dlq/
codec:
ndjson:
max_retries: 5
```

## SNS Pipeline Configuration

- `topic_arn` (Optional) : The SNS Topic Arn of the Topic to push events.

- `batch_size` (Optional) : An integer value indicates the maximum number of events required to ingest into sns topic. Defaults to 10.

- `message_group_id` (optional): A string of message group identifier which is used as `message_group_id` for the message group when it is stored in the sns topic. Default to Auto generated Random key.

- `message_deduplication_id` (Optional) : A string of message deduplication identifier which is used as `message_deduplication_id` for the message deduplication when it is stored in the sns topic. Default to Auto generated Random key.

- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown.

- `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown.

- `codec` : This plugin is integrated with sink codec

- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. If this option is present, `aws_` options are not expected to be present. If any of `aws_` options are present along with this, error is thrown.

- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon SNS and S3. Defaults to `5`.

### <a name="aws_configuration">AWS Configuration</a>

* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SNS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).


### Counters

* `snsSinkObjectsEventsSucceeded` - The number of events that the SNS sink has successfully sent to Topic.
* `snsSinkObjectsEventsFailed` - The number of events that the SNS sink has successfully sent to Topic.

## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

Note: Subscribe sns topic to sqs queues to run the integration tests.

```
./gradlew :data-prepper-plugins:sns-sink:integrationTest -Dtests.sns.sink.region=<<aws-region>> -Dtests.sns.sink.sts.role.arn=<<aws-sts-role-arn>> -Dtests.sns.sink.standard.topic=<<standard-topic-arn>> -Dtests.sns.sink.fifo.topic=<<fifo-topic-arn>> -Dtests.sns.sink.dlq.file.path=<<dlq-file-path>> -Dtests.sns.sink.standard.sqs.queue.url=<<sqs-standard-queue>> -Dtests.sns.sink.fifo.sqs.queue.url=<<sqs-fifo-queue>>
```
64 changes: 64 additions & 0 deletions data-prepper-plugins/sns-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:sns'
implementation 'software.amazon.awssdk:sts'
testImplementation 'software.amazon.awssdk:sqs'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation project(':data-prepper-plugins:failures-common')
testImplementation project(':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.sns.sink.region', System.getProperty('tests.sns.sink.region')
systemProperty 'tests.sns.sink.dlq.file.path', System.getProperty('tests.sns.sink.dlq.file.path')
systemProperty 'tests.sns.sink.sts.role.arn', System.getProperty('tests.sns.sink.sts.role.arn')
systemProperty 'tests.sns.sink.standard.topic', System.getProperty('tests.sns.sink.standard.topic')
systemProperty 'tests.sns.sink.fifo.topic', System.getProperty('tests.sns.sink.fifo.topic')
systemProperty 'tests.sns.sink.standard.sqs.queue.url', System.getProperty('tests.sns.sink.standard.sqs.queue.url')
systemProperty 'tests.sns.sink.fifo.sqs.queue.url', System.getProperty('tests.sns.sink.fifo.sqs.queue.url')

filter {
includeTestsMatching '*IT'
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.sns;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED;
import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS;

public class SnsSinkServiceIT {

private SnsClient snsClient;

private PluginMetrics pluginMetrics;

private PluginFactory pluginFactory;

private PluginSetting pluginSetting;

private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


private static final String SNS_SINK_CONFIG_YAML = " topic_arn: {0}\n" +
" batch_size: 10\n" +
" aws:\n" +
" region: {1}\n" +
" sts_role_arn: {2}\n" +
" dlq_file: {3}\n" +
" codec:\n" +
" ndjson:\n" +
" max_retries: 5";

private String standardTopic;

private String fifoTopic;

private String region;

private String stsRoleArn;

private String dlqFilePath;

private Counter snsSinkObjectsEventsSucceeded;

private Counter numberOfRecordsFailedCounter;

private String standardSqsQueue;

private SqsClient sqsClient;

private String fifoSqsQueue;

private DlqPushHandler dlqPushHandler;

@BeforeEach
public void setup() {
this.standardTopic = System.getProperty("tests.sns.sink.standard.topic");
this.fifoTopic = System.getProperty("tests.sns.sink.fifo.topic");
this.region = System.getProperty("tests.sns.sink.region");
this.stsRoleArn = System.getProperty("tests.sns.sink.sts.role.arn");
this.dlqFilePath = System.getProperty("tests.sns.sink.dlq.file.path");
this.standardSqsQueue = System.getProperty("tests.sns.sink.standard.sqs.queue.url");
this.fifoSqsQueue = System.getProperty("tests.sns.sink.fifo.sqs.queue.url");

this.dlqPushHandler = mock(DlqPushHandler.class);
this.pluginMetrics = mock(PluginMetrics.class);
this.pluginFactory = mock(PluginFactory.class);
this.pluginSetting = mock(PluginSetting.class);
this.snsSinkObjectsEventsSucceeded = mock(Counter.class);
this.numberOfRecordsFailedCounter = mock(Counter.class);
this.snsClient = SnsClient.builder()
.region(Region.of(region))
.build();
this.sqsClient = SqsClient.builder().region(Region.of(region)).build();
when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(snsSinkObjectsEventsSucceeded);
when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter);
}

private Collection<Record<Event>> setEventQueue(final int records) {
final Collection<Record<Event>> jsonObjects = new LinkedList<>();
for (int i = 0; i < records; i++)
jsonObjects.add(createRecord());
return jsonObjects;
}

private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

public SnsSinkService createObjectUnderTest(final String topicName) throws JsonProcessingException {
String[] values = { topicName,region,stsRoleArn,dlqFilePath };
final String configYaml = MessageFormat.format(SNS_SINK_CONFIG_YAML, values);
final SnsSinkConfig snsSinkConfig = objectMapper.readValue(configYaml, SnsSinkConfig.class);
return new SnsSinkService(snsSinkConfig,snsClient,pluginMetrics,pluginFactory,pluginSetting,mock(ExpressionEvaluator.class));
}

@ParameterizedTest
@ValueSource(ints = {5,9,10})
public void sns_sink_service_test_with_standard_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
}

@Test
public void sns_sink_service_test_with_standard_queue_with_multiple_batch() throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
final Collection<Record<Event>> records = setEventQueue(11);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded,times(2)).increment(anyDouble());
}

private List<String> readMessagesFromSNSTopicQueue(List<String> inputRecords, final String sqsQueue) {
final List<Message> messages = new ArrayList<>();
long startTime = System.currentTimeMillis();
long endTime = startTime + 60000;
do {
messages.addAll(sqsClient.receiveMessage(builder -> builder.queueUrl(sqsQueue)).messages());
if(messages.size() >= inputRecords.size()){
break;
}
} while (System.currentTimeMillis() < endTime);

List<String> topicData = messages.stream().map(Message::body).map(obj-> {
try {
Map<String,String> map = objectMapper.readValue(obj,Map.class);
return map.get("Message");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
return topicData;
}

private void deleteSqsMessages(String sqsQueue, List<Message> messages) throws InterruptedException {
for (Message message : messages) {
sqsClient.deleteMessage(builder -> builder.queueUrl(sqsQueue).receiptHandle(message.receiptHandle()));
Thread.sleep(Duration.ofSeconds(2).toMillis());
}
}

@ParameterizedTest
@ValueSource(ints = {1, 5, 10})
public void sns_sink_service_test_with_fifo_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(fifoTopic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,fifoSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
}



@ParameterizedTest
@ValueSource(ints = {1,5,9})
public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOException, InterruptedException {
final ObjectMapper mapper = new ObjectMapper();
final String topic = "test";
Files.deleteIfExists(Path.of(dlqFilePath));
final SnsSinkService objectUnderTest = createObjectUnderTest(topic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
verify(numberOfRecordsFailedCounter).increment(recordCount);
final Map<String,String> map = mapper.readValue(new String(Files.readAllBytes(Path.of(dlqFilePath))).replaceAll("(\\r|\\n)", ""), Map.class);
assertThat(map.get("topic"),equalTo(topic));
}
}
Loading
Loading