Skip to content

Commit

Permalink
Added CloudWatchLogsDispatcher builder pattern, fixed tests for Servi…
Browse files Browse the repository at this point in the history
…ce and Dispatcher and modified backOffTimeBase

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
  • Loading branch information
Marcos Gonzalez Mayedo committed Jul 25, 2023
1 parent 90418aa commit 5971190
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 219 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public byte[] popEvent() {

@Override
public List<byte[]> getBufferedData() {
Collections.unmodifiableList(eventsBuffered);
return Collections.unmodifiableList(eventsBuffered);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.dataprepper.plugins.sink.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,22 +22,23 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;

@Builder
public class CloudWatchLogsDispatcher implements Runnable {
public static final long UPPER_RETRY_TIME_BOUND = 20000;
public static final float EXP_TIME_SCALER = 1.5F;
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 5000;
private static final float EXP_TIME_SCALE = 1.5F;
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
private final BlockingQueue<ThreadTaskEvents> taskQueue;
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final String logGroup;
private final String logStream;
final int retryCount;
final long backOffTimeBase;
private BlockingQueue<ThreadTaskEvents> taskQueue;
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private String logGroup;
private String logStream;
private int retryCount;
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final BlockingQueue<ThreadTaskEvents> taskQueue,
final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final String logGroup, final String logStream,
final int retryCount, final int backOffTimeBase) {
final int retryCount, final long backOffTimeBase) {
this.taskQueue = taskQueue;
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
Expand Down Expand Up @@ -64,7 +67,7 @@ private List<InputLogEvent> prepareInputLogEvents(final ThreadTaskEvents eventDa
* @param inputLogEvents Collection of inputLogEvents to be flushed
* @return true if successful, false otherwise
*/
public boolean dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
boolean failedPost = true;
int failCounter = 0;

Expand All @@ -74,7 +77,6 @@ public boolean dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<Event
.logStreamName(logStream)
.build();

//TODO: Could also continue to retry even with InterruptedException instead of directly pushing to DLQ.
try {
while (failedPost && (failCounter < retryCount)) {
try {
Expand All @@ -94,28 +96,26 @@ public boolean dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<Event
}
} catch (InterruptedException e) {
LOG.warn("Got interrupted while waiting!");
Thread.currentThread().interrupt();
//TODO: Push to DLQ.
Thread.currentThread().interrupt();
}


if (failedPost) {
cloudWatchLogsMetrics.increaseLogEventFailCounter(inputLogEvents.size());
releaseEventHandles(false, eventHandles);
return false;
} else {
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(inputLogEvents.size());
releaseEventHandles(true, eventHandles);
return true;
}
}

//TODO: Can abstract this if clients want more choice.
private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
long scale = (long)Math.pow(EXP_TIME_SCALER, failCounter);
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);

if (scale >= UPPER_RETRY_TIME_BOUND) {
return UPPER_RETRY_TIME_BOUND;
if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
}

return scale * backOffTimeBase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents;
import org.opensearch.dataprepper.plugins.sink.push_condition.CloudWatchLogsLimits;
import org.opensearch.dataprepper.plugins.sink.time.SinkStopWatch;
import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits;
import org.opensearch.dataprepper.plugins.sink.utils.SinkStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -26,28 +27,43 @@

public class CloudWatchLogsService {
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsService.class);
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final Buffer buffer;
private final CloudWatchLogsLimits cloudWatchLogsLimits;
private List<EventHandle> bufferedEventHandles;
private final BlockingQueue<ThreadTaskEvents> taskQueue;
private final SinkStopWatch sinkStopWatch;
private final ReentrantLock bufferLock;
private final Executor sinkThreadManager;
private final CloudWatchLogsDispatcher dispatcher;
private final Executor asyncExecutor;
private final String logGroup;
private final String logStream;
private final int retryCount;
private final long backOffTimeBase;

public CloudWatchLogsService(final Buffer buffer,
final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final CloudWatchLogsLimits cloudWatchLogsLimits,
final CloudWatchLogsDispatcher dispatcher, BlockingQueue<ThreadTaskEvents> blockingQueue) {
final BlockingQueue<ThreadTaskEvents> blockingQueue,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {

this.buffer = buffer;
this.dispatcher = dispatcher;
this.bufferedEventHandles = new ArrayList<>();
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.cloudWatchLogsLimits = cloudWatchLogsLimits;
this.taskQueue = blockingQueue;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

this.bufferedEventHandles = new ArrayList<>();

bufferLock = new ReentrantLock();
sinkStopWatch = new SinkStopWatch();
sinkThreadManager = newCachedThreadPool();
asyncExecutor = newCachedThreadPool();
}

/**
Expand Down Expand Up @@ -91,16 +107,26 @@ public void processLogEvents(final Collection<Record<Event>> logs) {
private void stageLogEvents() {
sinkStopWatch.stopAndResetStopWatch();

ArrayList<byte[]> eventMessageCloneList = new ArrayList<>();
cloneLists(buffer.getBufferedData(), eventMessageCloneList);
ArrayList<byte[]> eventMessageClone = new ArrayList<>();
cloneLists(buffer.getBufferedData(), eventMessageClone);

ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageCloneList, bufferedEventHandles);
ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageClone, bufferedEventHandles);
taskQueue.add(dataToPush);

buffer.clearBuffer();
bufferedEventHandles = new ArrayList<>();

sinkThreadManager.execute(dispatcher);
CloudWatchLogsDispatcher newTaskDispatcher = CloudWatchLogsDispatcher.builder()
.taskQueue(taskQueue)
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.logGroup(logGroup)
.logStream(logStream)
.retryCount(retryCount)
.backOffTimeBase(backOffTimeBase)
.build();

asyncExecutor.execute(newTaskDispatcher);
}

private void addToBuffer(final Record<Event> log) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.push_condition;
package org.opensearch.dataprepper.plugins.sink.utils;
/**
* ThresholdCheck receives parameters for which to reference the
* limits of a buffer and CloudWatchLogsClient before making a
* PutLogEvent request to AWS.
*/
public class CloudWatchLogsLimits {
public static final int APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE = 26; //Size of overhead for each log event message.
private final int batchSize;
private final int maxBatchSize;
private final int maxEventSizeBytes;
private final int maxRequestSizeBytes;
private final long logSendInterval;

public CloudWatchLogsLimits(final int batchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) {
this.batchSize = batchSize;
public CloudWatchLogsLimits(final int maxBatchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) {
this.maxBatchSize = maxBatchSize;
this.maxEventSizeBytes = maxEventSizeBytes;
this.maxRequestSizeBytes = maxRequestSizeBytes;
this.logSendInterval = logSendInterval;
Expand Down Expand Up @@ -82,7 +82,7 @@ private boolean isGreaterThanMaxRequestSize(final int currentRequestSize) {
* @return boolean - true if greater, false otherwise.
*/
private boolean isGreaterThanBatchSize(final int batchSize) {
return batchSize > this.batchSize;
return batchSize > this.maxBatchSize;
}

/**
Expand All @@ -95,6 +95,6 @@ private boolean isEqualMaxRequestSize(final int currentRequestSize) {
}

private boolean isEqualBatchSize(final int batchSize) {
return batchSize == this.batchSize;
return batchSize == this.maxBatchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.time;
package org.opensearch.dataprepper.plugins.sink.utils;

import org.apache.commons.lang3.time.StopWatch;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* A synchronized watch for accessing time
* related data. (Wrapper around StopWatch class
* from "commons.apache.lang3")
*/
public class SinkStopWatch {
private final StopWatch stopWatch;
private final ReentrantLock stopWatchLock;
Expand Down
Loading

0 comments on commit 5971190

Please sign in to comment.