Skip to content

Commit

Permalink
Refactored the CloudWatchLogsDispatcher into two classes with the add…
Browse files Browse the repository at this point in the history
…ition of Uploader, introduced simple multithread tests for CloudWatchLogsService

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
  • Loading branch information
Marcos Gonzalez Mayedo committed Jul 27, 2023
1 parent a5b8be7 commit 070beef
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
* 2. Transforms to Byte type.
* 3. Returns a Byte type.
*/

/*
TODO:
Need to add PriorityQueue for extracting timestamp, this will need the timestamp and the actual string message itself.
Can refactor the buffer to contain
*/

public interface Buffer {
/**
* Size of buffer in events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
Expand All @@ -16,13 +15,13 @@
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;

import static java.util.concurrent.Executors.newCachedThreadPool;

@Builder
public class CloudWatchLogsDispatcher {
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 5000;
private static final float EXP_TIME_SCALE = 1.5F;
Expand All @@ -36,6 +35,7 @@ public class CloudWatchLogsDispatcher {
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
Executor asyncExecutor,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
this.cloudWatchLogsClient = cloudWatchLogsClient;
Expand All @@ -45,15 +45,15 @@ public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

asyncExecutor = newCachedThreadPool();
this.asyncExecutor = asyncExecutor;
}

public List<InputLogEvent> prepareInputLogEvents(final ThreadTaskEvents eventData) {
public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventMessageBytes) {
List<InputLogEvent> logEventList = new ArrayList<>();

for (byte[] data: eventData.getEventMessages()) {
for (byte[] data : eventMessageBytes) {
InputLogEvent tempLogEvent = InputLogEvent.builder()
.message(new String(data))
.message(new String(data, StandardCharsets.UTF_8))
.timestamp(System.currentTimeMillis())
.build();
logEventList.add(tempLogEvent);
Expand Down Expand Up @@ -84,7 +84,7 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHan
}

@Builder
private static class Uploader implements Runnable {
protected static class Uploader implements Runnable {
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private PutLogEventsRequest putLogEventsRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents;
import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits;
import org.opensearch.dataprepper.plugins.sink.utils.SinkStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.concurrent.Executors.newCachedThreadPool; //TODO: Can implement a more strict pooling method if needed.

/**
* CloudWatchLogs Service encapsulates the log processing step.
* It accomplishes this by:
Expand All @@ -35,41 +32,23 @@
public class CloudWatchLogsService {
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsService.class);
private final CloudWatchLogsDispatcher cloudWatchLogsDispatcher;
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final Buffer buffer;
private final CloudWatchLogsLimits cloudWatchLogsLimits;
private List<EventHandle> bufferedEventHandles;
private final SinkStopWatch sinkStopWatch;
private final ReentrantLock bufferLock;
private final String logGroup;
private final String logStream;
private final int retryCount;
private final long backOffTimeBase;

private final ReentrantLock processLock;
public CloudWatchLogsService(final Buffer buffer,
final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final CloudWatchLogsLimits cloudWatchLogsLimits,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
final CloudWatchLogsDispatcher cloudWatchLogsDispatcher) {

this.buffer = buffer;
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.cloudWatchLogsLimits = cloudWatchLogsLimits;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

this.bufferedEventHandles = new ArrayList<>();

bufferLock = new ReentrantLock();
processLock = new ReentrantLock();
sinkStopWatch = new SinkStopWatch();

cloudWatchLogsDispatcher = new CloudWatchLogsDispatcher(cloudWatchLogsClient,
cloudWatchLogsMetrics, logGroup, logStream, retryCount, backOffTimeBase);
this.cloudWatchLogsDispatcher = cloudWatchLogsDispatcher;
}

/**
Expand All @@ -78,62 +57,54 @@ public CloudWatchLogsService(final Buffer buffer,
* @param logs - Collection of Record events which hold log data.
*/
public void processLogEvents(final Collection<Record<Event>> logs) {
sinkStopWatch.startIfNotRunning();
for (Record<Event> log: logs) {
String logString = log.getData().toJsonString();
int logLength = logString.length();

if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) {
LOG.warn("Event blocked due to Max Size restriction! {Event Size: {} bytes}", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
continue;
try {
sinkStopWatch.startIfNotRunning();
for (Record<Event> log : logs) {
String logString = log.getData().toJsonString();
int logLength = logString.length();

if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) {
LOG.warn("Event blocked due to Max Size restriction! {Event Size: {} bytes}", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
continue;
}

long time = sinkStopWatch.getStopWatchTimeSeconds();

processLock.lock();
int bufferSize = buffer.getBufferSize();
int bufferEventCount = buffer.getEventCount();
int bufferEventCountWithEvent = bufferEventCount + 1;
int bufferSizeWithAddedEvent = bufferSize + logLength;

if ((cloudWatchLogsLimits.isGreaterThanLimitReached(time, bufferSizeWithAddedEvent, bufferEventCountWithEvent) && (bufferEventCount > 0))) {
stageLogEvents();
addToBuffer(log, logString);
} else if (cloudWatchLogsLimits.isEqualToLimitReached(bufferSizeWithAddedEvent, bufferEventCountWithEvent)) {
addToBuffer(log, logString);
stageLogEvents();
} else {
addToBuffer(log, logString);
}
}

long time = sinkStopWatch.getStopWatchTimeSeconds();

bufferLock.lock();

int bufferSize = buffer.getBufferSize();
int bufferEventCount = buffer.getEventCount();
int bufferEventCountWithEvent = bufferEventCount + 1;
int bufferSizeWithAddedEvent = bufferSize + logLength;

if ((cloudWatchLogsLimits.isGreaterThanLimitReached(time, bufferSizeWithAddedEvent, bufferEventCountWithEvent) && (bufferEventCount > 0))) {
stageLogEvents();
addToBuffer(log, logString);
} else if (cloudWatchLogsLimits.isEqualToLimitReached(bufferSizeWithAddedEvent, bufferEventCountWithEvent)) {
addToBuffer(log, logString);
stageLogEvents();
} else {
addToBuffer(log, logString);
}

bufferLock.unlock();
} finally {
processLock.unlock();
}
}

private void stageLogEvents() {
sinkStopWatch.stopAndResetStopWatch();

List<byte[]> eventMessageClone = buffer.getBufferedData();
ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageClone, bufferedEventHandles);
List<InputLogEvent> inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(buffer.getBufferedData());
cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, bufferedEventHandles);

buffer.resetBuffer();
bufferedEventHandles = new ArrayList<>();

List<InputLogEvent> inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(dataToPush);
cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, dataToPush.getEventHandles());
}

private void addToBuffer(final Record<Event> log, final String logString) {
if (log.getData().getEventHandle() != null) {
bufferedEventHandles.add(log.getData().getEventHandle());
}
buffer.writeEvent(logString.getBytes());
}

private void cloneLists(List<byte[]> listToCopy, List<byte[]> listToCopyInto) {
for (byte[] holder: listToCopy) {
listToCopyInto.add(holder.clone());
}
buffer.writeEvent(logString.getBytes(StandardCharsets.UTF_8));
}
}

This file was deleted.

Loading

0 comments on commit 070beef

Please sign in to comment.