From ac2eddc6c2c93321454a9198d7abc64b576669f7 Mon Sep 17 00:00:00 2001 From: jto Date: Tue, 12 Nov 2024 14:56:41 +0100 Subject: [PATCH] [Flink] fix error management in lazy source --- .../LazyFlinkSourceSplitEnumerator.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java index 5f394391c25d..b4aa37d77956 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java @@ -68,17 +68,26 @@ public LazyFlinkSourceSplitEnumerator( @Override public void start() { - try { - LOG.info("Starting source {}", beamSource); - List> beamSplitSourceList = splitBeamSource(); - int i = 0; - for (Source beamSplitSource : beamSplitSourceList) { - pendingSplits.add(new FlinkSourceSplit<>(i, beamSplitSource)); - i++; - } - } catch (Exception e) { - throw new RuntimeException(e); - } + context.callAsync( + () -> { + try { + LOG.info("Starting source {}", beamSource); + List> beamSplitSourceList = splitBeamSource(); + int i = 0; + for (Source beamSplitSource : beamSplitSourceList) { + pendingSplits.add(new FlinkSourceSplit<>(i, beamSplitSource)); + i++; + } + return pendingSplits; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + (sourceSplits, error) -> { + if (error != null) { + throw new RuntimeException("Failed to start source enumerator.", error); + } + }); } @Override