Skip to content

Commit

Permalink
Added fix to commit count
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
  • Loading branch information
Marcos Gonzalez Mayedo committed Jul 31, 2023
1 parent 7b56a86 commit 93f6fe5
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.sink.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.buffer.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsDispatcher;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsService;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsClientFactory;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class)
public class CloudWatchLogsSink extends AbstractSink<Record<Event>> {
private final CloudWatchLogsService cloudWatchLogsService;
private boolean isInitialized;
@DataPrepperPluginConstructor
public CloudWatchLogsSink(final PluginSetting pluginSetting,
final PluginMetrics pluginMetrics,
final CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);

AwsConfig awsConfig = cloudWatchLogsSinkConfig.getAwsConfig();
ThresholdConfig thresholdConfig = cloudWatchLogsSinkConfig.getThresholdConfig();

CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval());

CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);
BufferFactory bufferFactory = new InMemoryBufferFactory();

Executor executor = Executors.newCachedThreadPool();

CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
.logStream(cloudWatchLogsSinkConfig.getLogStream())
.backOffTimeBase(thresholdConfig.getBackOffTime())
.retryCount(thresholdConfig.getRetryCount())
.executor(executor)
.build();

Buffer buffer = bufferFactory.getBuffer();

cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher);
}

@Override
public void doInitialize() {
isInitialized = true;
}

@Override
public void doOutput(Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}

cloudWatchLogsService.processLogEvents(records);
}

@Override
public boolean isReady() {
return isInitialized;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.dataprepper.plugins.sink;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class CloudWatchLogsSinkTest {
private CloudWatchLogsSink testCloudWatchSink;
private PluginSetting mockPluginSetting;
private PluginMetrics mockPluginMetrics;
private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig;
private AwsCredentialsSupplier mockCredentialSupplier;
private AwsConfig mockAwsConfig;
private ThresholdConfig thresholdConfig;
private CloudWatchLogsMetrics mockCloudWatchLogsMetrics;
private static String TEST_LOG_GROUP = "testLogGroup";
private static String TEST_LOG_STREAM= "testLogStream";
private static String TEST_PLUGIN_NAME = "testPluginName";
private static String TEST_PIPELINE_NAME = "testPipelineName";
@BeforeEach
void setUp() {
mockPluginSetting = mock(PluginSetting.class);
mockPluginMetrics = mock(PluginMetrics.class);
mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class);
mockCredentialSupplier = mock(AwsCredentialsSupplier.class);
mockAwsConfig = mock(AwsConfig.class);
thresholdConfig = new ThresholdConfig();
mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class);

when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig);
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP);
when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM);

when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME);
when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);

testCloudWatchSink = new CloudWatchLogsSink(mockPluginSetting, mockPluginMetrics, mockCloudWatchLogsSinkConfig,
mockCredentialSupplier);
}

@Test
void WHEN_sink_is_initialized_THEN_sink_is_ready_returns_true() {
testCloudWatchSink.doInitialize();
assertTrue(testCloudWatchSink.isReady());
}
}

0 comments on commit 93f6fe5

Please sign in to comment.