From 5fac36b008937c4ca639f53783d52c342f43d45e Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 11 Jul 2023 15:00:26 +0000 Subject: [PATCH] Modified to do the synchronization in the acknowledgement set framework Signed-off-by: Krishna Kondaka --- .../acknowledgements/AcknowledgementSet.java | 8 +++++++ .../DefaultAcknowledgementSet.java | 21 ++++++++++++++++++- .../dataprepper/plugins/source/SqsWorker.java | 20 +++++------------- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index edd36f4ee7..c95c2e5f88 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -50,4 +50,12 @@ public interface AcknowledgementSet { * @since 2.2 */ public boolean release(final EventHandle eventHandle, final boolean result); + + /** + * Indicates that the addition of initial set of events to + * the acknowledgement set is completed. + * It is possible that more events are added to the set as the + * initial events are going through the pipeline line. + */ + public void complete(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index c5bd269105..56498d6866 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -34,6 +34,7 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet { private final Map pendingAcknowledgments; private Future callbackFuture; private final DefaultAcknowledgementSetMetrics metrics; + private boolean closed; public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) { this.callback = callback; @@ -42,6 +43,7 @@ public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer< this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis()); this.callbackFuture = null; this.metrics = metrics; + this.closed = false; pendingAcknowledgments = new HashMap<>(); lock = new ReentrantLock(true); } @@ -84,6 +86,8 @@ public boolean isDone() { if (Instant.now().isAfter(expiryTime)) { if (callbackFuture != null) { callbackFuture.cancel(true); + callbackFuture = null; + LOG.warn("AcknowledgementSet expired"); } metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME); return true; @@ -98,6 +102,19 @@ public Instant getExpiryTime() { return expiryTime; } + @Override + public void complete() { + lock.lock(); + try { + closed = true; + if (pendingAcknowledgments.size() == 0) { + callbackFuture = executor.submit(() -> callback.accept(this.result)); + } + } finally { + lock.unlock(); + } + } + @Override public boolean release(final EventHandle eventHandle, final boolean result) { lock.lock(); @@ -114,9 +131,11 @@ public boolean release(final EventHandle eventHandle, final boolean result) { } if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) { pendingAcknowledgments.remove(eventHandle); - if (pendingAcknowledgments.size() == 0) { + if (closed && pendingAcknowledgments.size() == 0) { callbackFuture = executor.submit(() -> callback.accept(this.result)); return true; + } else if (pendingAcknowledgments.size() == 0) { + LOG.warn("Acknowledgement set is not closed. Delaying callback until it is closed"); } } } finally { diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java index 4db7acc5d2..589c6ba74e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java @@ -232,17 +232,10 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { // Acknowledgement Set timeout is slightly smaller than the visibility timeout; int timeout = (int) sqsOptions.getVisibilityTimeout().getSeconds() - 2; acknowledgementSet = acknowledgementSetManager.create((result) -> { - synchronized (waitingForAcknowledgements) { - while (!acknowledgementSetReady.get()) { - try { - waitingForAcknowledgements.wait(); - } catch (InterruptedException e){} - } - acknowledgementSetCallbackCounter.increment(); - // Delete only if this is positive acknowledgement - if (result == true) { - deleteSqsMessages(waitingForAcknowledgements); - } + acknowledgementSetCallbackCounter.increment(); + // Delete only if this is positive acknowledgement + if (result == true) { + deleteSqsMessages(waitingForAcknowledgements); } }, Duration.ofSeconds(timeout)); } @@ -250,10 +243,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet); if (endToEndAcknowledgementsEnabled) { deleteMessageBatchRequestEntry.ifPresent(waitingForAcknowledgements::add); - synchronized (waitingForAcknowledgements) { - acknowledgementSetReady.set(true); - waitingForAcknowledgements.notify(); - } + acknowledgementSet.complete(); } else { deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); }