Skip to content

Commit

Permalink
Added resetBuffer method, removed unnecessary RetransmissionException…
Browse files Browse the repository at this point in the history
…, and added logString pass in parameter for staging log events.

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
  • Loading branch information
Marcos Gonzalez Mayedo committed Jul 25, 2023
1 parent 3e61e85 commit 3ba1011
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface Buffer {
List<byte[]> getBufferedData();

void clearBuffer();

void resetBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.List;

public class InMemoryBuffer implements Buffer {
private final List<byte[]> eventsBuffered;
private List<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
Expand Down Expand Up @@ -49,4 +49,10 @@ public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}

@Override
public void resetBuffer() {
bufferSize = 0;
eventsBuffered = new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public CloudWatchLogsService(final Buffer buffer,
public void processLogEvents(final Collection<Record<Event>> logs) {
sinkStopWatch.startIfNotRunning();
for (Record<Event> log: logs) {
int logLength = log.getData().toJsonString().length();
String logString = log.getData().toJsonString();
int logLength = logString.length();

if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) {
LOG.warn("Event blocked due to Max Size restriction! {Event Size: {} bytes}", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
Expand All @@ -100,12 +101,12 @@ public void processLogEvents(final Collection<Record<Event>> logs) {

if ((cloudWatchLogsLimits.isGreaterThanLimitReached(time, bufferSizeWithAddedEvent, bufferEventCountWithEvent) && (bufferEventCount > 0))) {
stageLogEvents();
addToBuffer(log);
addToBuffer(log, logString);
} else if (cloudWatchLogsLimits.isEqualToLimitReached(bufferSizeWithAddedEvent, bufferEventCountWithEvent)) {
addToBuffer(log);
addToBuffer(log, logString);
stageLogEvents();
} else {
addToBuffer(log);
addToBuffer(log, logString);
}

bufferLock.unlock();
Expand All @@ -115,13 +116,12 @@ public void processLogEvents(final Collection<Record<Event>> logs) {
private void stageLogEvents() {
sinkStopWatch.stopAndResetStopWatch();

ArrayList<byte[]> eventMessageClone = new ArrayList<>();
cloneLists(buffer.getBufferedData(), eventMessageClone);
List<byte[]> eventMessageClone = buffer.getBufferedData();

ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageClone, bufferedEventHandles);
taskQueue.add(dataToPush);

buffer.clearBuffer();
buffer.resetBuffer();
bufferedEventHandles = new ArrayList<>();

CloudWatchLogsDispatcher newTaskDispatcher = CloudWatchLogsDispatcher.builder()
Expand All @@ -137,11 +137,11 @@ private void stageLogEvents() {
asyncExecutor.execute(newTaskDispatcher);
}

private void addToBuffer(final Record<Event> log) {
private void addToBuffer(final Record<Event> log, final String logString) {
if (log.getData().getEventHandle() != null) {
bufferedEventHandles.add(log.getData().getEventHandle());
}
buffer.writeEvent(log.getData().toString().getBytes());
buffer.writeEvent(logString.getBytes());
}

private void cloneLists(List<byte[]> listToCopy, List<byte[]> listToCopyInto) {
Expand Down

This file was deleted.

0 comments on commit 3ba1011

Please sign in to comment.