Skip to content

Commit

Permalink
Merge pull request #30905: Do not reemit data from impulse (#30903)
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored Apr 15, 2024
2 parents cd253fd + cfe7be6 commit f071e9e
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
Expand Down Expand Up @@ -308,27 +309,27 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context)
WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE),
context.getPipelineOptions());

FlinkBoundedSource<byte[]> impulseSource;
WatermarkStrategy<WindowedValue<byte[]>> watermarkStrategy;
final SingleOutputStreamOperator<WindowedValue<byte[]>> impulseOperator;
if (context.isStreaming()) {
long shutdownAfterIdleSourcesMs =
context
.getPipelineOptions()
.as(FlinkPipelineOptions.class)
.getShutdownSourcesAfterIdleMs();
impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs);
watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps();
impulseOperator =
context
.getExecutionEnvironment()
.addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse")
.returns(typeInfo);
} else {
impulseSource = FlinkSource.boundedImpulse();
watermarkStrategy = WatermarkStrategy.noWatermarks();
FlinkBoundedSource<byte[]> impulseSource = FlinkSource.boundedImpulse();
impulseOperator =
context
.getExecutionEnvironment()
.fromSource(impulseSource, WatermarkStrategy.noWatermarks(), "Impulse")
.returns(typeInfo);
}
SingleOutputStreamOperator<WindowedValue<byte[]>> source =
context
.getExecutionEnvironment()
.fromSource(impulseSource, watermarkStrategy, "Impulse")
.returns(typeInfo);

context.setOutputDataStream(context.getOutput(transform), source);
context.setOutputDataStream(context.getOutput(transform), impulseOperator);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -71,22 +73,16 @@ public static <T> FlinkUnboundedSource<T> unbounded(
return new FlinkUnboundedSource<>(stepName, source, serializablePipelineOptions, numSplits);
}

public static FlinkBoundedSource<byte[]> unboundedImpulse(long shutdownSourceAfterIdleMs) {
public static FlinkUnboundedSource<byte[]> unboundedImpulse(long shutdownSourceAfterIdleMs) {
FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults();
flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
// Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but overriding its
// boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will treat this
// source as an unbounded source and execute the job in streaming mode. This also
// works well with checkpoint, because the FlinkSourceSplit containing the
// BeamImpulseSource will be discarded after the impulse emission. So the streaming
// job won't see another impulse after failover.
return new FlinkBoundedSource<>(
return new FlinkUnboundedSource<>(
"Impulse",
new BeamImpulseSource(),
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
new BeamImpulseSource()),
new SerializablePipelineOptions(flinkPipelineOptions),
Boundedness.CONTINUOUS_UNBOUNDED,
1,
record -> Watermark.MAX_WATERMARK.getTimestamp());
record -> BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
}

public static FlinkBoundedSource<byte[]> boundedImpulse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
beamSourceReaders.forEach(
(splitId, readerAndOutput) -> {
try {
splitsState.add(getReaderCheckpoint(splitId, readerAndOutput));
FlinkSourceSplit<T> checkpoint = getReaderCheckpoint(splitId, readerAndOutput);
splitsState.add(checkpoint);
} catch (IOException e) {
throw new IllegalStateException(
String.format("Failed to get checkpoint for split %d", splitId), e);
Expand Down Expand Up @@ -176,7 +177,8 @@ public CompletableFuture<Void> isAvailable() {
checkIdleTimeoutAndMaybeStartCountdown();
return idleTimeoutFuture;
} else {
// There is no live readers, waiting for new split assignments or no more splits notification.
// There are no live readers, waiting for new split assignments or no more splits
// notification.
if (waitingForSplitChangeFuture.isDone()) {
waitingForSplitChangeFuture = new CompletableFuture<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;

Expand Down Expand Up @@ -67,7 +68,11 @@ public String splitId() {

@Override
public String toString() {
return String.format("[SplitIndex: %d, BeamSource: %s]", splitIndex, beamSplitSource);
return MoreObjects.toStringHelper(this)
.add("splitIndex", splitIndex)
.add("beamSource", beamSplitSource)
.add("splitState.isNull", splitState == null)
.toString();
}

public static <T> SimpleVersionedSerializer<FlinkSourceSplit<T>> serializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void start() {
context.callAsync(
() -> {
try {
LOG.info("Starting source {}", beamSource);
List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public FlinkBoundedSource(
Boundedness boundedness,
int numSplits,
@Nullable TimestampExtractor<WindowedValue<T>> timestampExtractor) {

super(stepName, beamSource, serializablePipelineOptions, boundedness, numSplits);
this.timestampExtractor = timestampExtractor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -29,17 +27,13 @@
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
Expand All @@ -61,8 +55,6 @@
*/
public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T, WindowedValue<T>> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
private static final VarLongCoder LONG_CODER = VarLongCoder.of();
private final Map<Integer, Long> consumedFromSplit = new HashMap<>();
private @Nullable Source.Reader<T> currentReader;
private int currentSplitId;

Expand All @@ -71,6 +63,7 @@ public FlinkBoundedSourceReader(
SourceReaderContext context,
PipelineOptions pipelineOptions,
@Nullable Function<WindowedValue<T>, Long> timestampExtractor) {

super(stepName, context, pipelineOptions, timestampExtractor);
currentSplitId = -1;
}
Expand All @@ -82,40 +75,24 @@ protected FlinkSourceSplit<T> getReaderCheckpoint(int splitId, ReaderAndOutput r
// stream."
// For bounded source, the checkpoint granularity is the entire source split.
// So, in case of failure, all the data from this split will be consumed again.
return new FlinkSourceSplit<>(
splitId, readerAndOutput.reader.getCurrentSource(), asBytes(consumedFromSplit(splitId)));
return new FlinkSourceSplit<>(splitId, readerAndOutput.reader.getCurrentSource());
}

@Override
protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
byte[] state = sourceSplit.getSplitState();
if (state != null) {
consumedFromSplit.put(Integer.parseInt(sourceSplit.splitId()), fromBytes(state));
}
return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
}

private byte[] asBytes(long l) throws CoderException {
return CoderUtils.encodeToByteArray(LONG_CODER, l);
}

private long fromBytes(byte[] b) throws CoderException {
return CoderUtils.decodeFromByteArray(LONG_CODER, b);
}

private long consumedFromSplit(int splitId) {
return consumedFromSplit.getOrDefault(splitId, 0L);
}

@VisibleForTesting
protected FlinkBoundedSourceReader(
String stepName,
SourceReaderContext context,
PipelineOptions pipelineOptions,
ScheduledExecutorService executor,
@Nullable Function<WindowedValue<T>, Long> timestampExtractor) {

super(stepName, executor, context, pipelineOptions, timestampExtractor);
currentSplitId = -1;
}
Expand All @@ -141,12 +118,11 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Except
if (currentReader != null) {
// make null checks happy
final @Nonnull Source.Reader<T> splitReader = currentReader;
// store number of processed elements from this split
consumedFromSplit.compute(currentSplitId, (k, v) -> v == null ? 1 : v + 1);
T record = splitReader.getCurrent();
WindowedValue<T> windowedValue =
WindowedValue.of(
record, splitReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);

if (timestampExtractor == null) {
output.collect(windowedValue);
} else {
Expand All @@ -158,7 +134,6 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Except
// the beginning. So the failover granularity is the entire Flink job.
if (!invocationUtil.invokeAdvance(splitReader)) {
finishSplit(currentSplitId);
consumedFromSplit.remove(currentSplitId);
LOG.debug("Finished reading from {}", currentSplitId);
currentReader = null;
currentSplitId = -1;
Expand Down Expand Up @@ -188,12 +163,6 @@ private boolean moveToNextNonEmptyReader() throws IOException {
if (invocationUtil.invokeStart(rao.reader)) {
currentSplitId = Integer.parseInt(rao.splitId);
currentReader = rao.reader;
long toSkipAfterStart =
MoreObjects.firstNonNull(consumedFromSplit.remove(currentSplitId), 0L);
@Nonnull Source.Reader<T> reader = Preconditions.checkArgumentNotNull(currentReader);
while (toSkipAfterStart > 0 && reader.advance()) {
toSkipAfterStart--;
}
return true;
} else {
finishSplit(Integer.parseInt(rao.splitId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand All @@ -46,38 +48,33 @@ public BoundedReader<byte[]> createReader(PipelineOptions options) throws IOExce
return new ImpulseReader(this);
}

@Override
public Coder<byte[]> getOutputCoder() {
return ByteArrayCoder.of();
}

private static class ImpulseReader extends BoundedSource.BoundedReader<byte[]> {
private final BeamImpulseSource source;
private boolean started;
private int index;

private ImpulseReader(BeamImpulseSource source) {
this.source = source;
this.started = false;
this.index = 0;
}

@Override
public boolean start() throws IOException {
started = true;
return true;
public boolean start() {
return advance();
}

@Override
public boolean advance() throws IOException {
if (!started) {
throw new IllegalStateException("start() should be called before calling advance()");
}
index++;
return false;
public boolean advance() {
return index++ == 0;
}

@Override
public byte[] getCurrent() throws NoSuchElementException {
if (!started) {
throw new IllegalStateException("The reader hasn't started.");
}
if (index == 0) {
if (index == 1) {
return new byte[0];
} else {
throw new NoSuchElementException("No element is available.");
Expand All @@ -91,17 +88,14 @@ public BoundedSource<byte[]> getCurrentSource() {

@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
if (!started) {
throw new IllegalStateException("The reader hasn't started.");
}
if (index == 0) {
if (index == 1) {
return BoundedWindow.TIMESTAMP_MIN_VALUE;
} else {
throw new NoSuchElementException("No element is available.");
}
}

@Override
public void close() throws IOException {}
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ protected FlinkBoundedSourceReader<KV<Integer, Integer>> createReader(
long idleTimeoutMs,
@Nullable Function<WindowedValue<KV<Integer, Integer>>, Long> timestampExtractor,
TestMetricGroup testMetricGroup) {

FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults();
pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs);
SourceReaderContext mockContext = createSourceReaderContext(testMetricGroup);
Expand Down

0 comments on commit f071e9e

Please sign in to comment.