Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: SolaceIO does not acknowledge messages #32596

Open
3 of 17 tasks
ppawel opened this issue Sep 30, 2024 · 15 comments
Open
3 of 17 tasks

[Bug]: SolaceIO does not acknowledge messages #32596

ppawel opened this issue Sep 30, 2024 · 15 comments

Comments

@ppawel
Copy link

ppawel commented Sep 30, 2024

What happened?

We currently have a streaming job running in Dataflow using SolaceIO (ported from JmsIO) and we frequently see messages left in the Solace queue, and then Solace trying to redeliver them. It looks like even though they were already processed by the pipeline, they are not being acknowledged by SolaceIO so Solace thinks they are not consumed at all.

As I checked, the acking logic is in the finalizeCheckpoint method of the checkpoint itself. However, there is also this:

  // BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
  // these messages here. We relay on Solace's retry mechanism.
  private transient ArrayDeque<BytesXMLMessage> ackQueue;

So it looks like not in all cases the messages will be acked? Is this by design in SolaceIO?

In general, it would be good that messages are acked in a deterministic way but it seems Beam does not always call finalizeCheckpoint? And also sometimes it resumes a reader/source from a checkpointed state - in which case the current SolaceIO approach won't ack the messages.

Somewhat related issue in JmsIO, unfortunately not much specifics in there: #18857. With JmsIO we had the same problem, but I manually patched JmsIO to ack messages in getCheckpointMark which was very aggressive and probably not the "correct" solution but then again I could not find the proper place for this logic.

@bzablocki Any ideas how to improve this?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ppawel
Copy link
Author

ppawel commented Oct 1, 2024

I am in the process of analyzing this issue deeper and the current situation in our testing environment to reproduce the bug is:

  1. Solace queue that has around 2000 spooled messages.
  2. Logging added to the SolaceCheckpointMark constructor and its finalizeCheckpoint method.

Once I start the pipeline, the following happens:

  1. All the spooled messages are received by SolaceIO and Beam does around 30-40 bigger checkpoints with what I assume are those old messages:
    image
  2. The pipeline (downstream from SolaceIO read transform) itself is processing all the messages (they are duplicates so nothing really happens but that's not too relevant to this bug I guess/hope).
  3. Beam DOES NOT call finalizeCheckpoint during processing of the backlog.
  4. Pipeline catches up to the real time messages and checkpoints become much smaller (1-3 messages).
  5. Beam starts calling finalizeCheckpoint but seems that only for the checkpoints with new messages - so no finalizeCheckpoint calls for the spooled messages = they are still in the queue not acked.
    image
    (On the screenshot you can see that the checkpoint with size=34 is one the last ones created on the previous screenshot before it catches up and the checkpoints become small.)

Note: there are no errors in the pipeline.

@bzablocki @iht @tvalentyn Any ideas why SolaceIO/Beam behave this way (not finalizing the checkpoints for spooled messages)? I'm not sure if this could be related to something in our custom part of the pipeline, I might try to simplify it just to check if it makes any difference...

@bzablocki
Copy link
Contributor

Thanks @ppawel for reporting and the detailed information. We'll try to reproduce and investigate internally.
Could you tell me:

  • did your job have more than one stage?
  • what was the throughput writing to Solace?

@ppawel
Copy link
Author

ppawel commented Oct 1, 2024

did your job have more than one stage?

Yes, the job is quite complex, I'm trying to test it with a simplified version to rule out any issues inside the pipeline itself, although not sure why would it matter..
image

what was the throughput writing to Solace?

The queue that the pipeline is consuming usually has traffic around 1-2 messages per second, sometimes there are spikes aup to 5-10 per second. The pipeline does not have problems with keeping up in real time, the only problem is that already spooled messages are not getting properly acknowledged.

You can see it below - when the job is not running the messages are piling up. When I run the job, the spool size goes flat because the job is consuming all the newly incoming messages but does not ack the existing ones (the proper behavior would be that the spool size goes back down to zero).

image

@ppawel
Copy link
Author

ppawel commented Oct 1, 2024

I deployed an almost empty pipeline, with only SolaceIO read transform and nothing else, and I still see this behavior although it is a bit mixed:

image

So you see some of the bigger checkpoints at the beginning are in fact being finalized properly but on the other hand, the biggest ones (e.g. the one with 2703 messages or the one with 5834 messages) are not finalized and then the pipeline moves on to the real-time traffic.

@ppawel
Copy link
Author

ppawel commented Oct 1, 2024

Not sure exactly why this would affect behavior of Beam I/Os, but I could not think of anything else to try so I disabled the Streaming Engine[1] Dataflow option for the do-nothing pipeline

Now it looks much better, it is checkpointing and finalizing smoothly. It is only running for 10 minutes and also I will need to test it with the real pipeline but so far so good...

image

[1] https://cloud.google.com/dataflow/docs/streaming-engine

@ppawel
Copy link
Author

ppawel commented Oct 4, 2024

Just FYI, the pipeline is running without any issues since 2 days with Streaming Engine disabled (so it is executed directly on the Dataflow VM, not inside the "engine").

As soon as I enable the Streaming Engine, problems with finalizing chekpoints start...

Not sure what to think about this, we will probably create a support ticket for Google Cloud, maybe someone there can take a look but for now we plan to run SolaceIO without Streaming Engine as it looks stable.

So in the end it looks like this is not really a SolaceIO/Beam issue but rather a Dataflow runner issue, or rather a specific configuration of the Dataflow runner.

@bzablocki
Copy link
Contributor

Thanks for the detailed investigation! I'm looking into it with some folks internally. I'll hopefully have an update early next week.

@reuvenlax
Copy link
Contributor

streaming engine should not change behavior, however it does introduce limits on checkpoint size. if the individual checkpoint objects exceed a certain limit, the checkpoints will fail to be persisted - this limit cannot be raised.

Is it possible to split the source further?

@ppawel
Copy link
Author

ppawel commented Oct 7, 2024

streaming engine should not change behavior, however it does introduce limits on checkpoint size. if the individual checkpoint objects exceed a certain limit, the checkpoints will fail to be persisted - this limit cannot be raised.

Hmm, OK thank you for the information, quite interesting. Can you share what is the limit on checkpoint size? In my case, messages are quite small (~1 kilobyte) and it was skipping finalization of checkpoints as small as couple of hundred of messages. Can it be that the limit is that low?

Is it possible to split the source further?

Yes, I will try that, so far I have been testing with a single connection to simplify the testing conditions. I changed it to 10 connections on Friday and the pipeline without Streaming Engine is still running stable. I will test it now with more connections and Streaming Engine enabled to see if that helps.

However, even if that helps, then there could be a situation where the backlog on a queue is larger than the allowable split (max connection number in this case) - I wonder what would then be the solution to consume the backlog as it will never be acked (checkpoints above limit won't get finalized). I guess we could think about having a dead-letter queue but that seems like a crude solution to a problem that should be solvable within Beam/Dataflow...

@bzablocki
Copy link
Contributor

Are you using "Direct" or "Guaranteed" messages?

Do you know if we can specify some redelivery policy after some deadline to work around missing acks?

@ppawel
Copy link
Author

ppawel commented Oct 8, 2024

Are you using "Direct" or "Guaranteed" messages?

Guaranteed.

Do you know if we can specify some redelivery policy after some deadline to work around missing acks?

Sometimes the broker is redelivering messages to our pipeline (because there are no acks so broker thinks something's wrong) and they are being processed the second time but in the end they are not getting acked as well, so they just stay in the queue.

With redeliveries I also was wondering why are they not filtered out as duplicates (the requiresDedup property of the IO transforms in Beam)? I checked it specifically that the message id is the same between the original message and the same message when it is being redelivered but Beam/Dataflow just sends it over to our pipeline twice.. so I'm a bit confused about this deduplication logic.

@bzablocki
Copy link
Contributor

It seems that we should account for the possibility of the checkpoint not being finalized.
This is handled correctly in PubSubIO:

// It's possible for a checkpoint to be taken but never finalized.
// So we simply copy whatever safeToAckIds we currently have.

The flow there is following:

  1. in the advance(), we copy message to safeToAckIds list (source)
  2. when the runner request a checkpoint, we create it with a copy of safeToAck list (source)
  3. when the checkpoint is finalized, we add the messages to a queue of finalized messages (source)
  4. in the advance() method, remove whatever is in the queue of finalized messages from the safeToAckIds (source).

I can work on that, but I'm going on holidays now, so I could pick it up in 2 weeks, which gives me ~2 weeks before the 2.61 release.

Again, thank you @ppawel for reporting. That's an important bug :)

@bzablocki
Copy link
Contributor

With redeliveries I also was wondering why are they not filtered out as duplicates (the requiresDedup property of the IO transforms in Beam)? I checked it specifically that the message id is the same between the original message and the same message when it is being redelivered but Beam/Dataflow just sends it over to our pipeline twice.. so I'm a bit confused about this deduplication logic.

I assume you set the SolaceIO.Read#withDeduplicateRecords() to true?

This adds a Reshuffle step based on an id. The id in this case is

/**
* Optional, default: false. Set to deduplicate messages based on the {@link
* BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the
* field is null, then the {@link BytesXMLMessage#getReplicationGroupMessageId()} will be used,
* which is always set by Solace.
*/
public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
configurationBuilder.setDeduplicateRecords(deduplicateRecords);
return this;
}

Fyi, the added Reshuffle step is in the Deduplicate transform:

if (source.requiresDeduping()) {
return Pipeline.applyTransform(input, new ReadWithIds<>(source)).apply(new Deduplicate<>());

Could you check if you look at the same id that is used for deduplication?

@bzablocki
Copy link
Contributor

@ppawel do you remember if you put LOG statement in the finalize method before or after the if statement here:

if (activeReader == null || !activeReader.get() || ackQueue == null) {

?

@ppawel
Copy link
Author

ppawel commented Nov 14, 2024

@ppawel do you remember if you put LOG statement in the finalize method before or after the if statement here:

if (activeReader == null || !activeReader.get() || ackQueue == null) {

?

After that if but also I had a LOG.warn inside this if:

LOG.warn("-- finalizeCheckpoint SKIPPED");

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants