Skip to content

Commit

Permalink
Merge pull request #30969: [runners-flink] Fix watermark emission for…
Browse files Browse the repository at this point in the history
… empty splits (#29816)
  • Loading branch information
je-ik authored Apr 15, 2024
2 parents 4b808b0 + d680bf0 commit e119cd4
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
Expand All @@ -44,11 +43,13 @@
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.metrics.Counter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -74,9 +75,6 @@ public abstract class FlinkSourceReaderBase<T, OutputT>
private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
protected static final CompletableFuture<Void> AVAILABLE_NOW =
CompletableFuture.completedFuture(null);
// Some dummy instances to make the annotation checker happy with AtomicReference.
protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
protected static final Exception NO_EXCEPTION = new Exception();

protected final PipelineOptions pipelineOptions;
protected final @Nullable Function<OutputT, Long> timestampExtractor;
Expand All @@ -90,9 +88,10 @@ public abstract class FlinkSourceReaderBase<T, OutputT>
protected final Counter numRecordsInCounter;
protected final long idleTimeoutMs;
private final CompletableFuture<Void> idleTimeoutFuture;
private final AtomicReference<Throwable> exception;
private final AtomicReference<@Nullable Throwable> exception;
private boolean idleTimeoutCountingDown;
private CompletableFuture<Void> waitingForSplitChangeFuture;
private final AtomicReference<CompletableFuture<Void>> waitingForSplitChangeFuture =
new AtomicReference<>(new CompletableFuture<>());
private boolean noMoreSplits;

protected FlinkSourceReaderBase(
Expand All @@ -119,12 +118,11 @@ protected FlinkSourceReaderBase(
this.pipelineOptions = pipelineOptions;
this.timestampExtractor = timestampExtractor;
this.beamSourceReaders = new ConcurrentHashMap<>();
this.exception = new AtomicReference<>(NO_EXCEPTION);
this.exception = new AtomicReference<>();
this.executor = executor;
this.idleTimeoutMs =
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
this.idleTimeoutFuture = new CompletableFuture<>();
this.waitingForSplitChangeFuture = new CompletableFuture<>();
this.idleTimeoutCountingDown = false;
// TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is
// upgraded to 1.14 and above.
Expand Down Expand Up @@ -166,23 +164,23 @@ public CompletableFuture<Void> isAvailable() {
// Regardless of whether there is data available from the alive readers, the
// main thread needs to be woken up if there is a split change. Hence, we
// need to combine the data available future with the split change future.
if (waitingForSplitChangeFuture.isDone()) {
waitingForSplitChangeFuture = new CompletableFuture<>();
if (waitingForSplitChangeFuture.get().isDone()) {
waitingForSplitChangeFuture.set(new CompletableFuture<>());
}
return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture)
return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture.get())
.thenAccept(ignored -> {});
} else if (noMoreSplits) {
// All the splits have been read, wait for idle timeout.
LOG.debug("All splits have been read, waiting for shutdown timeout {}", idleTimeoutMs);
LOG.info("All splits have been read, waiting for shutdown timeout {}", idleTimeoutMs);
checkIdleTimeoutAndMaybeStartCountdown();
return idleTimeoutFuture;
} else {
// There are no live readers, waiting for new split assignments or no more splits
// notification.
if (waitingForSplitChangeFuture.isDone()) {
waitingForSplitChangeFuture = new CompletableFuture<>();
if (waitingForSplitChangeFuture.get().isDone()) {
waitingForSplitChangeFuture.set(new CompletableFuture<>());
}
return waitingForSplitChangeFuture;
return waitingForSplitChangeFuture.get();
}
}

Expand All @@ -191,15 +189,15 @@ public void notifyNoMoreSplits() {
checkExceptionAndMaybeThrow();
LOG.info("Received NoMoreSplits signal from enumerator.");
noMoreSplits = true;
waitingForSplitChangeFuture.complete(null);
waitingForSplitChangeFuture.get().complete(null);
}

@Override
public void addSplits(List<FlinkSourceSplit<T>> splits) {
checkExceptionAndMaybeThrow();
LOG.info("Adding splits {}", splits);
sourceSplits.addAll(splits);
waitingForSplitChangeFuture.complete(null);
waitingForSplitChangeFuture.get().complete(null);
}

@Override
Expand Down Expand Up @@ -282,19 +280,19 @@ protected void execute(Runnable runnable) {
}

protected void recordException(Throwable e) {
if (!exception.compareAndSet(NO_EXCEPTION, e)) {
exception.get().addSuppressed(e);
if (!exception.compareAndSet(null, e)) {
Optional.ofNullable(exception.get()).ifPresent(exc -> exc.addSuppressed(e));
}
}

protected void checkExceptionAndMaybeThrow() {
if (exception.get() != NO_EXCEPTION) {
if (exception.get() != null) {
throw new RuntimeException("The source reader received exception.", exception.get());
}
}

protected boolean hasException() {
return exception.get() != NO_EXCEPTION;
return exception.get() != null;
}

protected Collection<FlinkSourceSplit<T>> sourceSplits() {
Expand Down Expand Up @@ -344,6 +342,15 @@ public boolean startOrAdvance() throws IOException {
public @Nullable SourceOutput<OutputT> sourceOutput() {
return outputForSplit;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("splitId", splitId)
.add("reader", reader)
.add("started", started)
.toString();
}
}

private final class ErrorRecordingRunnable implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A Flink {@link SourceSplit} implementation that encapsulates a Beam {@link Source}. This class
Expand All @@ -37,13 +37,13 @@ public class FlinkSourceSplit<T> implements SourceSplit, Serializable {
// The index of the split.
private final int splitIndex;
private final Source<T> beamSplitSource;
private final @Nullable byte[] splitState;
private final byte @Nullable [] splitState;

public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource) {
this(splitIndex, beamSplitSource, null);
}

public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource, @Nullable byte[] splitState) {
public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource, byte @Nullable [] splitState) {
this.splitIndex = splitIndex;
this.beamSplitSource = beamSplitSource;
this.splitState = splitState;
Expand All @@ -53,7 +53,7 @@ public int splitIndex() {
return splitIndex;
}

public @Nullable byte[] getSplitState() {
public byte @Nullable [] getSplitState() {
return splitState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ private List<? extends Source<T>> splitBeamSource() throws Exception {
long desiredSizeBytes = boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits;
return boundedSource.split(desiredSizeBytes, pipelineOptions);
} else if (beamSource instanceof UnboundedSource) {
return ((UnboundedSource<T, ?>) beamSource).split(numSplits, pipelineOptions);
List<? extends UnboundedSource<T, ?>> splits =
((UnboundedSource<T, ?>) beamSource).split(numSplits, pipelineOptions);
LOG.info("Split source {} to {} splits", beamSource, splits);
return splits;
} else {
throw new IllegalStateException("Unknown source type " + beamSource.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
Expand All @@ -47,6 +46,7 @@
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -70,9 +70,10 @@ public class FlinkUnboundedSourceReader<T>
@VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes";
private static final long SLEEP_ON_IDLE_MS = 50L;
private static final long MIN_WATERMARK_EMIT_INTERVAL_MS = 10L;
private final AtomicReference<CompletableFuture<Void>> dataAvailableFutureRef;
private final List<ReaderAndOutput> readers;
private int currentReaderIndex;
private final AtomicReference<@Nullable CompletableFuture<Void>> dataAvailableFutureRef =
new AtomicReference<>();
private final List<ReaderAndOutput> readers = new ArrayList<>();
private int currentReaderIndex = 0;
private volatile boolean shouldEmitWatermark;

public FlinkUnboundedSourceReader(
Expand All @@ -81,9 +82,6 @@ public FlinkUnboundedSourceReader(
PipelineOptions pipelineOptions,
@Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> timestampExtractor) {
super(stepName, context, pipelineOptions, timestampExtractor);
this.readers = new ArrayList<>();
this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
this.currentReaderIndex = 0;
}

@VisibleForTesting
Expand All @@ -94,9 +92,6 @@ protected FlinkUnboundedSourceReader(
ScheduledExecutorService executor,
@Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> timestampExtractor) {
super(stepName, executor, context, pipelineOptions, timestampExtractor);
this.readers = new ArrayList<>();
this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
this.currentReaderIndex = 0;
}

@Override
Expand All @@ -121,7 +116,7 @@ public void start() {
shouldEmitWatermark = true;
// Wake up the main thread if necessary.
CompletableFuture<Void> f = dataAvailableFutureRef.get();
if (f != DUMMY_FUTURE) {
if (f != null) {
f.complete(null);
}
},
Expand Down Expand Up @@ -151,10 +146,10 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> ou

private boolean isEndOfAllReaders() {
return allReaders().values().stream()
.mapToLong(r -> asUnbounded(r.reader).getWatermark().getMillis())
.min()
.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())
>= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
.allMatch(
r ->
asUnbounded(r.reader).getWatermark().getMillis()
>= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
}

/**
Expand All @@ -169,22 +164,22 @@ private boolean isEndOfAllReaders() {
@Override
protected CompletableFuture<Void> isAvailableForAliveReaders() {
CompletableFuture<Void> future = dataAvailableFutureRef.get();
if (future == DUMMY_FUTURE) {
if (future == null) {
CompletableFuture<Void> newFuture = new CompletableFuture<>();
// Need to set the future first to avoid the race condition of missing the watermark emission
// notification.
dataAvailableFutureRef.set(newFuture);
if (shouldEmitWatermark || hasException()) {
// There are exception after we set the new future,
// immediately complete the future and return.
dataAvailableFutureRef.set(DUMMY_FUTURE);
dataAvailableFutureRef.set(null);
newFuture.complete(null);
} else {
LOG.debug("There is no data available, scheduling the idle reader checker.");
scheduleTask(
() -> {
CompletableFuture<Void> f = dataAvailableFutureRef.get();
if (f != DUMMY_FUTURE) {
if (f != null) {
f.complete(null);
}
},
Expand All @@ -193,7 +188,7 @@ protected CompletableFuture<Void> isAvailableForAliveReaders() {
return newFuture;
} else if (future.isDone()) {
// The previous future is completed, just use it and reset the future ref.
dataAvailableFutureRef.getAndSet(DUMMY_FUTURE);
dataAvailableFutureRef.compareAndSet(future, null);
return future;
} else {
// The previous future has not been completed, just use it.
Expand Down Expand Up @@ -330,7 +325,7 @@ byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<T> reader) {

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Source.Reader<T> createUnboundedSourceReader(
Source<T> beamSource, @Nullable byte[] splitState) throws IOException {
Source<T> beamSource, byte @Nullable [] splitState) throws IOException {
UnboundedSource<T, CheckpointMarkT> unboundedSource =
(UnboundedSource<T, CheckpointMarkT>) beamSource;
Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ public void testWatermarkOnEmptySource() throws Exception {
}
}

@Test
public void testWatermarkOnNoSplits() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
new ManuallyTriggeredScheduledExecutorService();
try (FlinkUnboundedSourceReader<KV<Integer, Integer>> reader =
(FlinkUnboundedSourceReader<KV<Integer, Integer>>) createReader(executor, -1L)) {
reader.start();
reader.notifyNoMoreSplits();
assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(null));
}
}

@Test
public void testPendingBytesMetric() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
Expand Down

0 comments on commit e119cd4

Please sign in to comment.