Skip to content

Commit

Permalink
Modified to do the synchronization in the acknowledgement set framework
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jul 11, 2023
1 parent b2f6a8a commit 5fac36b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ public interface AcknowledgementSet {
* @since 2.2
*/
public boolean release(final EventHandle eventHandle, final boolean result);

/**
* Indicates that the addition of initial set of events to
* the acknowledgement set is completed.
* It is possible that more events are added to the set as the
* initial events are going through the pipeline line.
*/
public void complete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
private final Map<EventHandle, AtomicInteger> pendingAcknowledgments;
private Future<?> callbackFuture;
private final DefaultAcknowledgementSetMetrics metrics;
private boolean closed;

public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) {
this.callback = callback;
Expand All @@ -42,6 +43,7 @@ public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<
this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis());
this.callbackFuture = null;
this.metrics = metrics;
this.closed = false;
pendingAcknowledgments = new HashMap<>();
lock = new ReentrantLock(true);
}
Expand Down Expand Up @@ -84,6 +86,8 @@ public boolean isDone() {
if (Instant.now().isAfter(expiryTime)) {
if (callbackFuture != null) {
callbackFuture.cancel(true);
callbackFuture = null;
LOG.warn("AcknowledgementSet expired");
}
metrics.increment(DefaultAcknowledgementSetMetrics.EXPIRED_METRIC_NAME);
return true;
Expand All @@ -98,6 +102,19 @@ public Instant getExpiryTime() {
return expiryTime;
}

@Override
public void complete() {
lock.lock();
try {
closed = true;
if (pendingAcknowledgments.size() == 0) {
callbackFuture = executor.submit(() -> callback.accept(this.result));
}
} finally {
lock.unlock();
}
}

@Override
public boolean release(final EventHandle eventHandle, final boolean result) {
lock.lock();
Expand All @@ -114,9 +131,11 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
}
if (pendingAcknowledgments.get(eventHandle).decrementAndGet() == 0) {
pendingAcknowledgments.remove(eventHandle);
if (pendingAcknowledgments.size() == 0) {
if (closed && pendingAcknowledgments.size() == 0) {
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
} else if (pendingAcknowledgments.size() == 0) {
LOG.warn("Acknowledgement set is not closed. Delaying callback until it is closed");
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,28 +232,18 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
// Acknowledgement Set timeout is slightly smaller than the visibility timeout;
int timeout = (int) sqsOptions.getVisibilityTimeout().getSeconds() - 2;
acknowledgementSet = acknowledgementSetManager.create((result) -> {
synchronized (waitingForAcknowledgements) {
while (!acknowledgementSetReady.get()) {
try {
waitingForAcknowledgements.wait();
} catch (InterruptedException e){}
}
acknowledgementSetCallbackCounter.increment();
// Delete only if this is positive acknowledgement
if (result == true) {
deleteSqsMessages(waitingForAcknowledgements);
}
acknowledgementSetCallbackCounter.increment();
// Delete only if this is positive acknowledgement
if (result == true) {
deleteSqsMessages(waitingForAcknowledgements);
}
}, Duration.ofSeconds(timeout));
}
final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey());
final Optional<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet);
if (endToEndAcknowledgementsEnabled) {
deleteMessageBatchRequestEntry.ifPresent(waitingForAcknowledgements::add);
synchronized (waitingForAcknowledgements) {
acknowledgementSetReady.set(true);
waitingForAcknowledgements.notify();
}
acknowledgementSet.complete();
} else {
deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add);
}
Expand Down

0 comments on commit 5fac36b

Please sign in to comment.