-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed #32962
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,10 +22,11 @@ | |
import com.solacesystems.jcsmp.BytesXMLMessage; | ||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayDeque; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.NoSuchElementException; | ||
import java.util.Queue; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import org.apache.beam.sdk.io.UnboundedSource; | ||
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; | ||
|
@@ -34,6 +35,7 @@ | |
import org.apache.beam.sdk.io.solace.broker.SessionService; | ||
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
import org.joda.time.Instant; | ||
import org.slf4j.Logger; | ||
|
@@ -54,11 +56,23 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> { | |
AtomicBoolean active = new AtomicBoolean(true); | ||
|
||
/** | ||
* Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent | ||
* queue, should only be accessed by the reader thread A given {@link UnboundedReader} object will | ||
* only be accessed by a single thread at once. | ||
* List of successfully ACKed message (surrogate) ids which need to be pruned from the above. | ||
* CAUTION: Accessed by both reader and checkpointing threads. | ||
*/ | ||
private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new ArrayDeque<>(); | ||
private final Queue<Long> ackedMessageIds; | ||
|
||
/** | ||
* Map to place advanced messages before {@link #getCheckpointMark()} is called. This is a | ||
* non-concurrent object, should only be accessed by the reader thread. | ||
*/ | ||
private final Map<Long, BytesXMLMessage> safeToAckMessages; | ||
|
||
/** | ||
* Surrogate id used as a key in Collections storing messages that are waiting to be acknowledged | ||
* ({@link UnboundedSolaceReader#safeToAckMessages}) and already acknowledged ({@link | ||
* UnboundedSolaceReader#ackedMessageIds}). | ||
*/ | ||
private Long surrogateId = 0L; | ||
Comment on lines
+64
to
+75
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like you could use the queue directly instead of this map. |
||
|
||
public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) { | ||
this.currentSource = currentSource; | ||
|
@@ -67,6 +81,8 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) { | |
currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); | ||
this.sessionService = currentSource.getSessionServiceFactory().create(); | ||
this.sempClient = currentSource.getSempClientFactory().create(); | ||
this.safeToAckMessages = new HashMap<>(); | ||
this.ackedMessageIds = new ConcurrentLinkedQueue<>(); | ||
} | ||
|
||
@Override | ||
|
@@ -98,6 +114,9 @@ private void populateMessageConsumer() { | |
|
||
@Override | ||
public boolean advance() { | ||
// Retire state associated with ACKed messages. | ||
retire(); | ||
|
||
BytesXMLMessage receivedXmlMessage; | ||
try { | ||
receivedXmlMessage = checkNotNull(messageReceiver).receive(); | ||
|
@@ -109,10 +128,11 @@ public boolean advance() { | |
if (receivedXmlMessage == null) { | ||
return false; | ||
} | ||
elementsToCheckpoint.add(receivedXmlMessage); | ||
solaceOriginalRecord = receivedXmlMessage; | ||
solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); | ||
watermarkPolicy.update(solaceMappedRecord); | ||
safeToAckMessages.put(surrogateId, receivedXmlMessage); | ||
surrogateId++; | ||
|
||
return true; | ||
} | ||
|
||
|
@@ -133,14 +153,10 @@ public Instant getWatermark() { | |
|
||
@Override | ||
public UnboundedSource.CheckpointMark getCheckpointMark() { | ||
List<BytesXMLMessage> ackQueue = new ArrayList<>(); | ||
while (!elementsToCheckpoint.isEmpty()) { | ||
BytesXMLMessage msg = elementsToCheckpoint.poll(); | ||
if (msg != null) { | ||
ackQueue.add(msg); | ||
} | ||
} | ||
return new SolaceCheckpointMark(active, ackQueue); | ||
// It's possible for a checkpoint to be taken but never finalized. | ||
// So we simply copy whatever safeToAckIds we currently have. | ||
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages); | ||
return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The checkpoint mark does not need to be aware of whether the reader is or isn't active. The call to close may be observed by the finalizing thread to first close the underlying reader before atomically setting the active flag to false. Put a try/catch statement around the acknowledgement loop in the checkpoint mark's finalizer, if it trips on |
||
} | ||
|
||
@Override | ||
|
@@ -190,4 +206,21 @@ public long getTotalBacklogBytes() { | |
return BACKLOG_UNKNOWN; | ||
} | ||
} | ||
|
||
public void markAsAcked(Long messageSurrogateId) { | ||
ackedMessageIds.add(messageSurrogateId); | ||
} | ||
|
||
/** | ||
* Messages which have been ACKed (via the checkpoint finalize) can be safely removed from the | ||
* list of messages to acknowledge. | ||
*/ | ||
private void retire() { | ||
while (!ackedMessageIds.isEmpty()) { | ||
Long ackMessageId = ackedMessageIds.poll(); | ||
if (ackMessageId != null) { | ||
safeToAckMessages.remove(ackMessageId); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a try/catch here, see https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/index.html for the exception states of
ackMessage
. Make sure to add a meaningful log statement for debugging purposes.