diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/CloudWatchLogsSink.java new file mode 100644 index 0000000000..14be97d38d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/CloudWatchLogsSink.java @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.sink.buffer.Buffer; +import org.opensearch.dataprepper.plugins.sink.buffer.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsDispatcher; +import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsMetrics; +import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsService; +import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsClientFactory; +import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; + +import java.util.Collection; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class) +public class CloudWatchLogsSink extends AbstractSink> { + private final CloudWatchLogsService cloudWatchLogsService; + private boolean isInitialized; + @DataPrepperPluginConstructor + public CloudWatchLogsSink(final PluginSetting pluginSetting, + final PluginMetrics pluginMetrics, + final CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig, + final AwsCredentialsSupplier awsCredentialsSupplier) { + super(pluginSetting); + + AwsConfig awsConfig = cloudWatchLogsSinkConfig.getAwsConfig(); + ThresholdConfig thresholdConfig = cloudWatchLogsSinkConfig.getThresholdConfig(); + + CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics); + CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), + thresholdConfig.getMaxEventSizeBytes(), + thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval()); + + CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier); + BufferFactory bufferFactory = new InMemoryBufferFactory(); + + Executor executor = Executors.newCachedThreadPool(); + + CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() + .cloudWatchLogsClient(cloudWatchLogsClient) + .cloudWatchLogsMetrics(cloudWatchLogsMetrics) + .logGroup(cloudWatchLogsSinkConfig.getLogGroup()) + .logStream(cloudWatchLogsSinkConfig.getLogStream()) + .backOffTimeBase(thresholdConfig.getBackOffTime()) + .retryCount(thresholdConfig.getRetryCount()) + .executor(executor) + .build(); + + Buffer buffer = bufferFactory.getBuffer(); + + cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher); + } + + @Override + public void doInitialize() { + isInitialized = true; + } + + @Override + public void doOutput(Collection> records) { + if (records.isEmpty()) { + return; + } + + cloudWatchLogsService.processLogEvents(records); + } + + @Override + public boolean isReady() { + return isInitialized; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/CloudWatchLogsSinkTest.java new file mode 100644 index 0000000000..1ac889a3c2 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/CloudWatchLogsSinkTest.java @@ -0,0 +1,57 @@ +package org.opensearch.dataprepper.plugins.sink; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsMetrics; +import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CloudWatchLogsSinkTest { + private CloudWatchLogsSink testCloudWatchSink; + private PluginSetting mockPluginSetting; + private PluginMetrics mockPluginMetrics; + private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig; + private AwsCredentialsSupplier mockCredentialSupplier; + private AwsConfig mockAwsConfig; + private ThresholdConfig thresholdConfig; + private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; + private static String TEST_LOG_GROUP = "testLogGroup"; + private static String TEST_LOG_STREAM= "testLogStream"; + private static String TEST_PLUGIN_NAME = "testPluginName"; + private static String TEST_PIPELINE_NAME = "testPipelineName"; + @BeforeEach + void setUp() { + mockPluginSetting = mock(PluginSetting.class); + mockPluginMetrics = mock(PluginMetrics.class); + mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); + mockCredentialSupplier = mock(AwsCredentialsSupplier.class); + mockAwsConfig = mock(AwsConfig.class); + thresholdConfig = new ThresholdConfig(); + mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); + + when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig); + when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); + when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP); + when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM); + + when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME); + when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + + testCloudWatchSink = new CloudWatchLogsSink(mockPluginSetting, mockPluginMetrics, mockCloudWatchLogsSinkConfig, + mockCredentialSupplier); + } + + @Test + void WHEN_sink_is_initialized_THEN_sink_is_ready_returns_true() { + testCloudWatchSink.doInitialize(); + assertTrue(testCloudWatchSink.isReady()); + } +}