Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed #32962

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bzablocki
Copy link
Contributor

Addresses #32596

This PR fixes a bug where messages get stuck on the Solace queue because the checkpoint's finalize method, responsible for acknowledging message processing, isn't always called.

The approach is similar to the one in PubSubIO. The flow there is the following:

  1. in the advance(), we copy message to safeToAckIds list (source)
  2. when the runner request a checkpoint, we create it with a copy of safeToAck list (source)
  3. when the checkpoint is finalized, we add the messages to a queue of finalized messages (source)
  4. in the advance() method, remove whatever is in the queue of finalized messages from the safeToAckIds (source).

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@bzablocki
Copy link
Contributor Author

cc @ppawel - since I wasn't able to reproduce the issue you reported, would you mind testing it in your environment?

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@ppawel
Copy link

ppawel commented Nov 11, 2024

cc @ppawel - since I wasn't able to reproduce the issue you reported, would you mind testing it in your environment?

Sorry for late reply, I haven't had much time recently for this topic, but last week I set up a streaming job in our test environment with your branch, and just left it running consuming same messages as the prod job. This job had the Google Streaming Engine enabled.

Today I checked the job and unfortunately there was over 2k messages spooled in the Solace queue. I haven't investigated deeper what is exactly the behavior of checkpointing with the new branch, hopefully I will get more time again for this as we really want to fully switch from JmsIO to SolaceIO, and preferably with Streaming Engine enabled.

Comment on lines +64 to +75
/**
* Map to place advanced messages before {@link #getCheckpointMark()} is called. This is a
* non-concurrent object, should only be accessed by the reader thread.
*/
private final Map<Long, BytesXMLMessage> safeToAckMessages;

/**
* Surrogate id used as a key in Collections storing messages that are waiting to be acknowledged
* ({@link UnboundedSolaceReader#safeToAckMessages}) and already acknowledged ({@link
* UnboundedSolaceReader#ackedMessageIds}).
*/
private Long surrogateId = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you could use the queue directly instead of this map.
The reader can offer messages to the queue, while your checkpoint mark can poll from it and subsequently call ackMessage(). The elements observed by this reader should be finalized by checkpoint marks of this reader. Alternatively, a deserialized checkpoint mark may be provided the current reader's queue to perform clean up. The reference to this reader's queue may need to be marked as volatile, but I'd have to double check when the checkpoint mark is handed off to a different thread.

// It's possible for a checkpoint to be taken but never finalized.
// So we simply copy whatever safeToAckIds we currently have.
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages);
return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint mark does not need to be aware of whether the reader is or isn't active. The call to close may be observed by the finalizing thread to first close the underlying reader before atomically setting the active flag to false. Put a try/catch statement around the acknowledgement loop in the checkpoint mark's finalizer, if it trips on IllegalStateException, then the reader has been closed and none of the queued elements can be acknowledged.

Comment on lines 68 to 80
public void finalizeCheckpoint() {
if (activeReader == null || !activeReader.get() || ackQueue == null) {
if (activeReader == null || !activeReader.get() || safeToAck == null) {
return;
}

while (!ackQueue.isEmpty()) {
BytesXMLMessage msg = ackQueue.poll();
for (Entry<Long, BytesXMLMessage> entry : safeToAck.entrySet()) {
BytesXMLMessage msg = entry.getValue();
if (msg != null) {
msg.ackMessage();
confirmAckCallback.accept(entry.getKey());
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a try/catch here, see https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/index.html for the exception states of ackMessage. Make sure to add a meaningful log statement for debugging purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants