-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: SolaceIO does not acknowledge messages #32596
Comments
I am in the process of analyzing this issue deeper and the current situation in our testing environment to reproduce the bug is:
Once I start the pipeline, the following happens:
Note: there are no errors in the pipeline. @bzablocki @iht @tvalentyn Any ideas why SolaceIO/Beam behave this way (not finalizing the checkpoints for spooled messages)? I'm not sure if this could be related to something in our custom part of the pipeline, I might try to simplify it just to check if it makes any difference... |
Thanks @ppawel for reporting and the detailed information. We'll try to reproduce and investigate internally.
|
I deployed an almost empty pipeline, with only SolaceIO read transform and nothing else, and I still see this behavior although it is a bit mixed: So you see some of the bigger checkpoints at the beginning are in fact being finalized properly but on the other hand, the biggest ones (e.g. the one with 2703 messages or the one with 5834 messages) are not finalized and then the pipeline moves on to the real-time traffic. |
Not sure exactly why this would affect behavior of Beam I/Os, but I could not think of anything else to try so I disabled the Streaming Engine[1] Dataflow option for the do-nothing pipeline Now it looks much better, it is checkpointing and finalizing smoothly. It is only running for 10 minutes and also I will need to test it with the real pipeline but so far so good... |
Just FYI, the pipeline is running without any issues since 2 days with Streaming Engine disabled (so it is executed directly on the Dataflow VM, not inside the "engine"). As soon as I enable the Streaming Engine, problems with finalizing chekpoints start... Not sure what to think about this, we will probably create a support ticket for Google Cloud, maybe someone there can take a look but for now we plan to run SolaceIO without Streaming Engine as it looks stable. So in the end it looks like this is not really a SolaceIO/Beam issue but rather a Dataflow runner issue, or rather a specific configuration of the Dataflow runner. |
Thanks for the detailed investigation! I'm looking into it with some folks internally. I'll hopefully have an update early next week. |
streaming engine should not change behavior, however it does introduce limits on checkpoint size. if the individual checkpoint objects exceed a certain limit, the checkpoints will fail to be persisted - this limit cannot be raised. Is it possible to split the source further? |
Hmm, OK thank you for the information, quite interesting. Can you share what is the limit on checkpoint size? In my case, messages are quite small (~1 kilobyte) and it was skipping finalization of checkpoints as small as couple of hundred of messages. Can it be that the limit is that low?
Yes, I will try that, so far I have been testing with a single connection to simplify the testing conditions. I changed it to 10 connections on Friday and the pipeline without Streaming Engine is still running stable. I will test it now with more connections and Streaming Engine enabled to see if that helps. However, even if that helps, then there could be a situation where the backlog on a queue is larger than the allowable split (max connection number in this case) - I wonder what would then be the solution to consume the backlog as it will never be acked (checkpoints above limit won't get finalized). I guess we could think about having a dead-letter queue but that seems like a crude solution to a problem that should be solvable within Beam/Dataflow... |
Are you using "Direct" or "Guaranteed" messages? Do you know if we can specify some redelivery policy after some deadline to work around missing acks? |
Guaranteed.
Sometimes the broker is redelivering messages to our pipeline (because there are no acks so broker thinks something's wrong) and they are being processed the second time but in the end they are not getting acked as well, so they just stay in the queue. With redeliveries I also was wondering why are they not filtered out as duplicates (the |
It seems that we should account for the possibility of the checkpoint not being finalized. Lines 992 to 993 in 07322cc
The flow there is following:
I can work on that, but I'm going on holidays now, so I could pick it up in 2 weeks, which gives me ~2 weeks before the 2.61 release. Again, thank you @ppawel for reporting. That's an important bug :) |
I assume you set the This adds a Reshuffle step based on an id. The id in this case is beam/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java Lines 560 to 569 in e7ec432
Fyi, the added Reshuffle step is in the Deduplicate transform: Lines 2174 to 2175 in e7ec432
Could you check if you look at the same id that is used for deduplication? |
@ppawel do you remember if you put LOG statement in the finalize method before or after the beam/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java Line 63 in e7ec432
? |
After that
|
What happened?
We currently have a streaming job running in Dataflow using SolaceIO (ported from JmsIO) and we frequently see messages left in the Solace queue, and then Solace trying to redeliver them. It looks like even though they were already processed by the pipeline, they are not being acknowledged by SolaceIO so Solace thinks they are not consumed at all.
As I checked, the acking logic is in the
finalizeCheckpoint
method of the checkpoint itself. However, there is also this:So it looks like not in all cases the messages will be acked? Is this by design in SolaceIO?
In general, it would be good that messages are acked in a deterministic way but it seems Beam does not always call
finalizeCheckpoint
? And also sometimes it resumes a reader/source from a checkpointed state - in which case the current SolaceIO approach won't ack the messages.Somewhat related issue in JmsIO, unfortunately not much specifics in there: #18857. With JmsIO we had the same problem, but I manually patched JmsIO to ack messages in
getCheckpointMark
which was very aggressive and probably not the "correct" solution but then again I could not find the proper place for this logic.@bzablocki Any ideas how to improve this?
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: