Skip to content

Commit

Permalink
[Flink] fix error management in lazy source
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Nov 12, 2024
1 parent 80a7e22 commit ac2eddc
Showing 1 changed file with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,26 @@ public LazyFlinkSourceSplitEnumerator(

@Override
public void start() {
try {
LOG.info("Starting source {}", beamSource);
List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
int i = 0;
for (Source<T> beamSplitSource : beamSplitSourceList) {
pendingSplits.add(new FlinkSourceSplit<>(i, beamSplitSource));
i++;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
context.callAsync(
() -> {
try {
LOG.info("Starting source {}", beamSource);
List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
int i = 0;
for (Source<T> beamSplitSource : beamSplitSourceList) {
pendingSplits.add(new FlinkSourceSplit<>(i, beamSplitSource));
i++;
}
return pendingSplits;
} catch (Exception e) {
throw new RuntimeException(e);
}
},
(sourceSplits, error) -> {
if (error != null) {
throw new RuntimeException("Failed to start source enumerator.", error);
}
});
}

@Override
Expand Down

0 comments on commit ac2eddc

Please sign in to comment.