Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub-Issue#2778: Added CouldWatchLogsService, Tests and RetransmissionException #3023

Merged
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f4dc9da
Elasticsearch client implementation with pit and no context search (#…
graytaylor0 Jun 21, 2023
4387b29
GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4)
MaGonzalMayedo Jun 21, 2023
f1c25bb
Added fixes from comments to code (including pathing and nomenclature…
Jun 22, 2023
9d640e5
Refactoring config (#5)
MaGonzalMayedo Jun 22, 2023
b100ee3
Fixed deleted AwsConfig file
Jun 22, 2023
35859b0
Removed the s3 dependency from build.gradle, replaced the AwsAuth.. w…
Jun 26, 2023
7040186
Added modifiable back_off_timer, added threshold test for back_off_ti…
Jun 27, 2023
6b13e21
Added fixes to gradle file, added tests to AwsConfig, and used Reflec…
Jun 28, 2023
3bb125a
Added default value test to ThresholdConfig and renamed getter for ma…
Jun 28, 2023
e5ee1e5
Removed unnecessary imports
Jun 28, 2023
e90b05a
Added cloudwatch-logs to settings.gradle
Jul 3, 2023
43a0a40
Merge branch 'main' into main
MaGonzalMayedo Jul 3, 2023
40cb280
Added a quick fix to the back_off_time range
Jul 4, 2023
1aac686
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 5, 2023
903ea26
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 7, 2023
ffe5cbe
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 12, 2023
b56a845
Added Buffer classes, ClientFactory similar to S3, and ThresholdCheck
Jul 5, 2023
b06ed0b
Removed unnecessary default method from ClientFactory
Jul 5, 2023
7f5a432
Added comments in Buffer Interface, change some default values to sui…
Jul 5, 2023
4576899
Removed unused imports
Jul 7, 2023
a539833
Changed the unused imports, made parameters final in the ThresholdCheck
Jul 7, 2023
c89ea17
Made changes to the tests and the method signatures in ThresholdCheck…
Jul 10, 2023
063e1d3
Removed unused methods/comments
Jul 10, 2023
1aad0b5
Added CloudWatchLogsService, CloudWatchLogsServiceTest and Retransmis…
Jul 12, 2023
cf1f8e1
Fixed retransmission logging fixed value
Jul 12, 2023
77f6d0f
Fixed unused imports
Jul 12, 2023
75d90fe
Fixed making ThresholdCheck public
Jul 12, 2023
5f2f511
Added fixes to ThresholdCheck and CloudWatchLogsService to decouple m…
Jul 12, 2023
fdc5b00
Fixed syntax start import in CloudWatchLogsServiceTest
Jul 12, 2023
0399694
Extracted LogPusher and SinkStopWatch classes for code cleanup. Addde…
Jul 14, 2023
3c02e1d
Changed method uses in CloudWatchLogsService and removed logging the …
Jul 14, 2023
c2a02ec
Added Multithreaded CloudWatchLogsDispatcher for handling various asy…
Jul 19, 2023
abec5e3
Added fixesto test and defaulted the parameters in the config to Clou…
Jul 20, 2023
9bbfedd
Added exponential backofftime
Jul 20, 2023
6e28adc
Fixed unused imports
Jul 20, 2023
90418aa
Fixed up deepcopy of arraylist for service workers in CloudWatchLogsS…
Jul 20, 2023
5971190
Added CloudWatchLogsDispatcher builder pattern, fixed tests for Servi…
Jul 24, 2023
53086c4
Removed unused imports
Jul 25, 2023
26f18a1
Added resetBuffer method, removed unnecessary RetransmissionException…
Jul 25, 2023
a5b8be7
Started making changes to the tests to implement the new class struct…
Jul 26, 2023
070beef
Refactored the CloudWatchLogsDispatcher into two classes with the add…
Jul 27, 2023
6130b08
Fixed issues with locking in try block and added final multithreaded …
Jul 27, 2023
69320ec
Added CloudWatchLogsMetricsTest, changed upper back off time bound an…
Jul 28, 2023
993cbd0
Added changes to javadoc
Jul 28, 2023
2b18115
Update data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensea…
MaGonzalMayedo Jul 28, 2023
eee6197
Fixed comment on CloudWatchLogsDispatcher
Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.List;

/**
* Buffer that handles the temporary storage of
Expand All @@ -31,7 +31,9 @@ public interface Buffer {

byte[] popEvent();

ArrayList<byte[]> getBufferedData();
List<byte[]> getBufferedData();

void clearBuffer();

void resetBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class InMemoryBuffer implements Buffer {
private final ArrayList<byte[]> eventsBuffered;
private List<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
Expand Down Expand Up @@ -38,13 +40,19 @@ public byte[] popEvent() {
}

@Override
public ArrayList<byte[]> getBufferedData() {
return eventsBuffered;
public List<byte[]> getBufferedData() {
return Collections.unmodifiableList(eventsBuffered);
}

@Override
public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}

@Override
public void resetBuffer() {
bufferSize = 0;
eventsBuffered = new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

/**
Expand All @@ -20,6 +19,9 @@
* CloudWatchLogs services.
*/
public final class CloudWatchLogsClientFactory {
private CloudWatchLogsClientFactory() {
throw new IllegalStateException("Static Factory Class!");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably don't need this exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the exception for the next revision.

}

/**
* Generates a CloudWatchLogs Client based on STS role ARN system credentials.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the createOverrideConfiguration() method you use nested builder. It is better practice to use lambdas instead (easier to read).

return ClientOverrideConfiguration.builder()
            .retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
            .build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I have added the lambda to increase code readability.

Expand All @@ -38,10 +40,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;

@Builder
public class CloudWatchLogsDispatcher implements Runnable {
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 5000;
private static final float EXP_TIME_SCALE = 1.5F;
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
private BlockingQueue<ThreadTaskEvents> taskQueue;
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private String logGroup;
private String logStream;
private int retryCount;
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final BlockingQueue<ThreadTaskEvents> taskQueue,
final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
this.taskQueue = taskQueue;
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;
}

private List<InputLogEvent> prepareInputLogEvents(final ThreadTaskEvents eventData) {
List<InputLogEvent> logEventList = new ArrayList<>();

for (byte[] data: eventData.getEventMessages()) {
InputLogEvent tempLogEvent = InputLogEvent.builder()
.message(new String(data))
.timestamp(System.currentTimeMillis())
.build();
logEventList.add(tempLogEvent);
}

return logEventList;
}

/**
* Flush function to handle the flushing of logs to CloudWatchLogs services;
* @param inputLogEvents Collection of inputLogEvents to be flushed
* @param eventHandles Collection of EventHandles for events
*/
public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
boolean failedPost = true;
int failCounter = 0;

PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder()
.logEvents(inputLogEvents)
.logGroupName(logGroup)
.logStreamName(logStream)
.build();

try {
while (failedPost && (failCounter < retryCount)) {
try {
cloudWatchLogsClient.putLogEvents(putLogEventsRequest);

cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
failedPost = false;

//TODO: When a log is rejected by the service, we cannot send it, can probably push to a DLQ here.

} catch (CloudWatchLogsException | SdkClientException e) {
LOG.error("Service-Worker {} Failed to push logs with error: {}", Thread.currentThread().getName(), e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCounter));
LOG.warn("Service-Worker {} Trying to retransmit request... {Attempt: {} }", Thread.currentThread().getName(), (++failCounter));
}
}
} catch (InterruptedException e) {
LOG.warn("Got interrupted while waiting!");
//TODO: Push to DLQ.
Thread.currentThread().interrupt();
}


if (failedPost) {
cloudWatchLogsMetrics.increaseLogEventFailCounter(inputLogEvents.size());
releaseEventHandles(false, eventHandles);
} else {
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(inputLogEvents.size());
releaseEventHandles(true, eventHandles);
}
}

//TODO: Can abstract this if clients want more choice.
private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);

if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
}

return scale * backOffTimeBase;
}

@Override
public void run() {
try {
ThreadTaskEvents taskData = taskQueue.take();
List<InputLogEvent> inputLogEvents = prepareInputLogEvents(taskData);
dispatchLogs(inputLogEvents, taskData.getEventHandles());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
//TODO: Implement back up to taskQueue read failure.
}
}

private void releaseEventHandles(final boolean result, final Collection<EventHandle> eventHandles) {
if (eventHandles.isEmpty()) {
return;
}

for (EventHandle eventHandle : eventHandles) {
eventHandle.release(result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

/**
* Class is meant to abstract the metric book-keeping of
* CloudWatchLogs metrics so that multiple instances
* may refer to it.
*/
public class CloudWatchLogsMetrics {
public static final String CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED = "cloudWatchLogsRequestsSucceeded";
public static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
private final Counter logEventSuccessCounter;
private final Counter logEventFailCounter;
private final Counter requestSuccessCount;
private final Counter requestFailCount;

public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
}

public void increaseLogEventSuccessCounter(int value) {
logEventSuccessCounter.increment(value);
}

public void increaseRequestSuccessCounter(int value) {
requestSuccessCount.increment(value);
}

public void increaseLogEventFailCounter(int value) {
logEventFailCounter.increment(value);
}

public void increaseRequestFailCounter(int value) {
requestFailCount.increment(value);
}
}
Loading
Loading