Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed #32962

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -38,36 +38,43 @@
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private transient Map<Long, BytesXMLMessage> safeToAck;
private transient Consumer<Long> confirmAckCallback;
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
private transient ArrayDeque<BytesXMLMessage> ackQueue;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}

/**
* Creates a new {@link SolaceCheckpointMark}.
*
* @param markAsAckedFn {@link Consumer<Long>} a reference to a method in the {@link
* UnboundedSolaceReader} that will mark the message as acknowledged.
* @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The
* reader creating the messages has to be active to acknowledge the messages.
* @param ackQueue {@link List} of {@link BytesXMLMessage} to be acknowledged.
* @param safeToAck {@link Map<Long, BytesXMLMessage>} of {@link BytesXMLMessage} to be
* acknowledged.
*/
SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage> ackQueue) {
SolaceCheckpointMark(
Consumer<Long> markAsAckedFn,
AtomicBoolean activeReader,
Map<Long, BytesXMLMessage> safeToAck) {
this.confirmAckCallback = markAsAckedFn;
this.activeReader = activeReader;
this.ackQueue = new ArrayDeque<>(ackQueue);
this.safeToAck = safeToAck;
}

@Override
public void finalizeCheckpoint() {
if (activeReader == null || !activeReader.get() || ackQueue == null) {
if (activeReader == null || !activeReader.get() || safeToAck == null) {
return;
}

while (!ackQueue.isEmpty()) {
BytesXMLMessage msg = ackQueue.poll();
for (Entry<Long, BytesXMLMessage> entry : safeToAck.entrySet()) {
BytesXMLMessage msg = entry.getValue();
if (msg != null) {
msg.ackMessage();
confirmAckCallback.accept(entry.getKey());
}
}
}
Comment on lines 68 to 80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a try/catch here, see https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/index.html for the exception states of ackMessage. Make sure to add a meaningful log statement for debugging purposes.

Expand All @@ -84,15 +91,13 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
// Needed to convert to ArrayList because ArrayDeque.equals checks only for reference, not
// content.
ArrayList<BytesXMLMessage> ackList = new ArrayList<>(ackQueue);
ArrayList<BytesXMLMessage> thatAckList = new ArrayList<>(that.ackQueue);
return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList);
return Objects.equals(safeToAck, that.safeToAck)
&& Objects.equals(confirmAckCallback, that.confirmAckCallback)
&& Objects.equals(activeReader, that.activeReader);
}

@Override
public int hashCode() {
return Objects.hash(activeReader, ackQueue);
return Objects.hash(safeToAck, confirmAckCallback, activeReader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
Expand All @@ -34,6 +35,7 @@
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -54,11 +56,23 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
AtomicBoolean active = new AtomicBoolean(true);

/**
* Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent
* queue, should only be accessed by the reader thread A given {@link UnboundedReader} object will
* only be accessed by a single thread at once.
* List of successfully ACKed message (surrogate) ids which need to be pruned from the above.
* CAUTION: Accessed by both reader and checkpointing threads.
*/
private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new ArrayDeque<>();
private final Queue<Long> ackedMessageIds;

/**
* Map to place advanced messages before {@link #getCheckpointMark()} is called. This is a
* non-concurrent object, should only be accessed by the reader thread.
*/
private final Map<Long, BytesXMLMessage> safeToAckMessages;

/**
* Surrogate id used as a key in Collections storing messages that are waiting to be acknowledged
* ({@link UnboundedSolaceReader#safeToAckMessages}) and already acknowledged ({@link
* UnboundedSolaceReader#ackedMessageIds}).
*/
private Long surrogateId = 0L;
Comment on lines +64 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you could use the queue directly instead of this map.
The reader can offer messages to the queue, while your checkpoint mark can poll from it and subsequently call ackMessage(). The elements observed by this reader should be finalized by checkpoint marks of this reader. Alternatively, a deserialized checkpoint mark may be provided the current reader's queue to perform clean up. The reference to this reader's queue may need to be marked as volatile, but I'd have to double check when the checkpoint mark is handed off to a different thread.


public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
this.currentSource = currentSource;
Expand All @@ -67,6 +81,8 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold());
this.sessionService = currentSource.getSessionServiceFactory().create();
this.sempClient = currentSource.getSempClientFactory().create();
this.safeToAckMessages = new HashMap<>();
this.ackedMessageIds = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -98,6 +114,9 @@ private void populateMessageConsumer() {

@Override
public boolean advance() {
// Retire state associated with ACKed messages.
retire();

BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = checkNotNull(messageReceiver).receive();
Expand All @@ -109,10 +128,11 @@ public boolean advance() {
if (receivedXmlMessage == null) {
return false;
}
elementsToCheckpoint.add(receivedXmlMessage);
solaceOriginalRecord = receivedXmlMessage;
solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage);
watermarkPolicy.update(solaceMappedRecord);
safeToAckMessages.put(surrogateId, receivedXmlMessage);
surrogateId++;

return true;
}

Expand All @@ -133,14 +153,10 @@ public Instant getWatermark() {

@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
List<BytesXMLMessage> ackQueue = new ArrayList<>();
while (!elementsToCheckpoint.isEmpty()) {
BytesXMLMessage msg = elementsToCheckpoint.poll();
if (msg != null) {
ackQueue.add(msg);
}
}
return new SolaceCheckpointMark(active, ackQueue);
// It's possible for a checkpoint to be taken but never finalized.
// So we simply copy whatever safeToAckIds we currently have.
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages);
return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint mark does not need to be aware of whether the reader is or isn't active. The call to close may be observed by the finalizing thread to first close the underlying reader before atomically setting the active flag to false. Put a try/catch statement around the acknowledgement loop in the checkpoint mark's finalizer, if it trips on IllegalStateException, then the reader has been closed and none of the queued elements can be acknowledged.

}

@Override
Expand Down Expand Up @@ -190,4 +206,21 @@ public long getTotalBacklogBytes() {
return BACKLOG_UNKNOWN;
}
}

public void markAsAcked(Long messageSurrogateId) {
ackedMessageIds.add(messageSurrogateId);
}

/**
* Messages which have been ACKed (via the checkpoint finalize) can be safely removed from the
* list of messages to acknowledge.
*/
private void retire() {
while (!ackedMessageIds.isEmpty()) {
Long ackMessageId = ackedMessageIds.poll();
if (ackMessageId != null) {
safeToAckMessages.remove(ackMessageId);
}
}
}
}
Loading