-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed #32962
base: master
Are you sure you want to change the base?
Conversation
cc @ppawel - since I wasn't able to reproduce the issue you reported, would you mind testing it in your environment? |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Sorry for late reply, I haven't had much time recently for this topic, but last week I set up a streaming job in our test environment with your branch, and just left it running consuming same messages as the prod job. This job had the Google Streaming Engine enabled. Today I checked the job and unfortunately there was over 2k messages spooled in the Solace queue. I haven't investigated deeper what is exactly the behavior of checkpointing with the new branch, hopefully I will get more time again for this as we really want to fully switch from JmsIO to SolaceIO, and preferably with Streaming Engine enabled. |
/** | ||
* Map to place advanced messages before {@link #getCheckpointMark()} is called. This is a | ||
* non-concurrent object, should only be accessed by the reader thread. | ||
*/ | ||
private final Map<Long, BytesXMLMessage> safeToAckMessages; | ||
|
||
/** | ||
* Surrogate id used as a key in Collections storing messages that are waiting to be acknowledged | ||
* ({@link UnboundedSolaceReader#safeToAckMessages}) and already acknowledged ({@link | ||
* UnboundedSolaceReader#ackedMessageIds}). | ||
*/ | ||
private Long surrogateId = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like you could use the queue directly instead of this map.
The reader can offer messages to the queue, while your checkpoint mark can poll from it and subsequently call ackMessage()
. The elements observed by this reader should be finalized by checkpoint marks of this reader. Alternatively, a deserialized checkpoint mark may be provided the current reader's queue to perform clean up. The reference to this reader's queue may need to be marked as volatile, but I'd have to double check when the checkpoint mark is handed off to a different thread.
// It's possible for a checkpoint to be taken but never finalized. | ||
// So we simply copy whatever safeToAckIds we currently have. | ||
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages); | ||
return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkpoint mark does not need to be aware of whether the reader is or isn't active. The call to close may be observed by the finalizing thread to first close the underlying reader before atomically setting the active flag to false. Put a try/catch statement around the acknowledgement loop in the checkpoint mark's finalizer, if it trips on IllegalStateException
, then the reader has been closed and none of the queued elements can be acknowledged.
public void finalizeCheckpoint() { | ||
if (activeReader == null || !activeReader.get() || ackQueue == null) { | ||
if (activeReader == null || !activeReader.get() || safeToAck == null) { | ||
return; | ||
} | ||
|
||
while (!ackQueue.isEmpty()) { | ||
BytesXMLMessage msg = ackQueue.poll(); | ||
for (Entry<Long, BytesXMLMessage> entry : safeToAck.entrySet()) { | ||
BytesXMLMessage msg = entry.getValue(); | ||
if (msg != null) { | ||
msg.ackMessage(); | ||
confirmAckCallback.accept(entry.getKey()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a try/catch here, see https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/index.html for the exception states of ackMessage
. Make sure to add a meaningful log statement for debugging purposes.
Addresses #32596
This PR fixes a bug where messages get stuck on the Solace queue because the checkpoint's finalize method, responsible for acknowledging message processing, isn't always called.
The approach is similar to the one in PubSubIO. The flow there is the following:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.