You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
pipeline.run() is documented to be asynchronous (cf. create-your-pipeline). It seems that when using FlinkRunner (embedded or remote) the call blocks until the pipeline finishes.
Digging into Flink code I found that both, LocalStreamEnvironment and RemoteStreamEnvironment set execution.attached to true. This causes that StreamExecutionEnvironment.execute blocks later on:
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient
jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult
= jobClient.getJobExecutionResult().get(); // <==== execution is blocked here
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult,
null));
return jobExecutionResult;
} catch (Throwable t) {
// get()
on the JobExecutionResult Future will throw an ExecutionException. This
// behaviour was
largely not there in Flink versions before the PipelineExecutor
// refactoring so we should
strip that exception.
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null,
strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
Imported from Jira BEAM-12477. Original Jira may contain additional context.
Reported by: [email protected].
The text was updated successfully, but these errors were encountered:
Is there further updates on this. I am looking to use periodic metric pusher. But seems as mentioned in description the client execution is getting blocked. And I see metric pusher class being registered after attaining results object.
pipeline.run()
is documented to be asynchronous (cf. create-your-pipeline). It seems that when using FlinkRunner (embedded or remote) the call blocks until the pipeline finishes.Digging into Flink code I found that both,
LocalStreamEnvironment
andRemoteStreamEnvironment
setexecution.attached
to true. This causes thatStreamExecutionEnvironment.execute
blocks later on:Imported from Jira BEAM-12477. Original Jira may contain additional context.
Reported by: [email protected].
The text was updated successfully, but these errors were encountered: