Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub-Issue#2778: Added CloudWatchLogsSink #3084

Merged

Conversation

MaGonzalMayedo
Copy link
Contributor

@MaGonzalMayedo MaGonzalMayedo commented Jul 31, 2023

Description

This change implements the main Sink entry point for the CloudWatchLogs Sink. It extends the plugin by adding a single sink class alongside a test.

Issues Resolved

This PR will work towards resolving CWL-Sink for Data-Prepper [Issue https://github.com/https://github.com/https://github.com/https://github.com/https://github.com//issues/2778]
Link: #2778 (comment)

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

graytaylor0 and others added 19 commits July 3, 2023 12:56
…pensearch-project#2910)

Create Elasticsearch client, implement search and pit apis for ElasticsearchAccessor

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Added Config Files for CloudWatchLogs Sink.

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Added default params for back_off and log_send_interval alongside test cases for ThresholdConfig.

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
…ith AwsConfig.

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
…mer and params to AwsConfig

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
…tive mapping to tests CwlSink

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the convention for sink naming is without -sink. For example http and s3. Are you sure about this naming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please drop the -sink. You can name it cloudwatchlogs or cloudwatch_logs if that matches the naming convention better. Data Prepper does use underscores in between words.

See for example:

@DataPrepperPlugin(name = "otel_traces", deprecatedName = "otel_trace_raw", pluginType = Processor.class, pluginConfigurationType = OtelTraceRawProcessorConfig.class)

Copy link
Contributor Author

@MaGonzalMayedo MaGonzalMayedo Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I will have it change to "cloudwatch_logs".


@Override
public void doInitialize() {
isInitialized = true;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the initialize method for? Should we be creating failable object here or in the class constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it has to with data-prepper's way of initializing sinks. In the Pipeline class, data-prepper checks to see if every sink is active through an initializer thread. Which just waits until each sink is "initialized" or times out trying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Is there another reason fro this?

@Override
public void doOutput(Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the tests for this method for the next commit.

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please drop the -sink. You can name it cloudwatchlogs or cloudwatch_logs if that matches the naming convention better. Data Prepper does use underscores in between words.

See for example:

@DataPrepperPlugin(name = "otel_traces", deprecatedName = "otel_trace_raw", pluginType = Processor.class, pluginConfigurationType = OtelTraceRawProcessorConfig.class)

thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval());

CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);
BufferFactory bufferFactory = new InMemoryBufferFactory();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there only one buffer available right now? Can you pull this from the config instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one sink right now. An "InMemoryBuffer". But I will make the change to extract it via string comparison so it can be expanded easily.

return;
}

cloudWatchLogsService.processLogEvents(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have unit test for this as well. Just to verify that the records are passed into the service.

* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put this project into a unique package. This way no class in here clobbers any other classes.

package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I will try to have them moved for the revision.

…he sink. Also moved plugin within "cloudwatch_logs" package.

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
dlvenable
dlvenable previously approved these changes Aug 1, 2023
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thank you @MaGonzalMayedo !

@dlvenable
Copy link
Member

@MaGonzalMayedo , There are failing tests.

CloudWatchLogsSinkTest > WHEN_given_sample_empty_records_THEN_records_are_processed() FAILED
    software.amazon.awssdk.core.exception.SdkClientException at CloudWatchLogsSinkTest.java:65

CloudWatchLogsSinkTest > WHEN_sink_is_initialized_THEN_sink_is_ready_returns_true() FAILED
    software.amazon.awssdk.core.exception.SdkClientException at CloudWatchLogsSinkTest.java:65

CloudWatchLogsSinkTest > WHEN_given_sample_empty_records_THEN_records_are_not_processed() FAILED
    software.amazon.awssdk.core.exception.SdkClientException at CloudWatchLogsSinkTest.java:65

@dlvenable dlvenable merged commit 9cef0d5 into opensearch-project:main Aug 1, 2023
24 checks passed
@MaGonzalMayedo MaGonzalMayedo deleted the cloud_watch_logs_sink branch August 1, 2023 22:08
@MaGonzalMayedo MaGonzalMayedo restored the cloud_watch_logs_sink branch August 1, 2023 22:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants