diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java index d0c05cf0a4..6124cb7381 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java @@ -34,4 +34,6 @@ public interface Buffer { List getBufferedData(); void clearBuffer(); + + void resetBuffer(); } \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java index 572f1ad9c1..e3794676eb 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java @@ -10,7 +10,7 @@ import java.util.List; public class InMemoryBuffer implements Buffer { - private final List eventsBuffered; + private List eventsBuffered; private int bufferSize = 0; InMemoryBuffer() { @@ -49,4 +49,10 @@ public void clearBuffer() { bufferSize = 0; eventsBuffered.clear(); } + + @Override + public void resetBuffer() { + bufferSize = 0; + eventsBuffered = new ArrayList<>(); + } } \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java index 3404f4b827..21d084b49f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java @@ -82,7 +82,8 @@ public CloudWatchLogsService(final Buffer buffer, public void processLogEvents(final Collection> logs) { sinkStopWatch.startIfNotRunning(); for (Record log: logs) { - int logLength = log.getData().toJsonString().length(); + String logString = log.getData().toJsonString(); + int logLength = logString.length(); if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) { LOG.warn("Event blocked due to Max Size restriction! {Event Size: {} bytes}", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE)); @@ -100,12 +101,12 @@ public void processLogEvents(final Collection> logs) { if ((cloudWatchLogsLimits.isGreaterThanLimitReached(time, bufferSizeWithAddedEvent, bufferEventCountWithEvent) && (bufferEventCount > 0))) { stageLogEvents(); - addToBuffer(log); + addToBuffer(log, logString); } else if (cloudWatchLogsLimits.isEqualToLimitReached(bufferSizeWithAddedEvent, bufferEventCountWithEvent)) { - addToBuffer(log); + addToBuffer(log, logString); stageLogEvents(); } else { - addToBuffer(log); + addToBuffer(log, logString); } bufferLock.unlock(); @@ -115,13 +116,12 @@ public void processLogEvents(final Collection> logs) { private void stageLogEvents() { sinkStopWatch.stopAndResetStopWatch(); - ArrayList eventMessageClone = new ArrayList<>(); - cloneLists(buffer.getBufferedData(), eventMessageClone); + List eventMessageClone = buffer.getBufferedData(); ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageClone, bufferedEventHandles); taskQueue.add(dataToPush); - buffer.clearBuffer(); + buffer.resetBuffer(); bufferedEventHandles = new ArrayList<>(); CloudWatchLogsDispatcher newTaskDispatcher = CloudWatchLogsDispatcher.builder() @@ -137,11 +137,11 @@ private void stageLogEvents() { asyncExecutor.execute(newTaskDispatcher); } - private void addToBuffer(final Record log) { + private void addToBuffer(final Record log, final String logString) { if (log.getData().getEventHandle() != null) { bufferedEventHandles.add(log.getData().getEventHandle()); } - buffer.writeEvent(log.getData().toString().getBytes()); + buffer.writeEvent(logString.getBytes()); } private void cloneLists(List listToCopy, List listToCopyInto) { diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/exception/RetransmissionLimitException.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/exception/RetransmissionLimitException.java deleted file mode 100644 index c476dabc55..0000000000 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/exception/RetransmissionLimitException.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.sink.exception; - -public class RetransmissionLimitException extends RuntimeException{ - public RetransmissionLimitException(String message) { - super(message); - } -} \ No newline at end of file