Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline.run() blocks when using FlinkRunner #20992

Closed
damccorm opened this issue Jun 4, 2022 · 3 comments
Closed

pipeline.run() blocks when using FlinkRunner #20992

damccorm opened this issue Jun 4, 2022 · 3 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

pipeline.run() is documented to be asynchronous (cf. create-your-pipeline). It seems that when using FlinkRunner (embedded or remote) the call blocks until the pipeline finishes.

Digging into Flink code I found that both, LocalStreamEnvironment and RemoteStreamEnvironment set execution.attached to true. This causes that StreamExecutionEnvironment.execute blocks later on:


    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        final JobClient
jobClient = executeAsync(streamGraph);

        try {
            final JobExecutionResult jobExecutionResult;


           if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                jobExecutionResult
= jobClient.getJobExecutionResult().get(); // <==== execution is blocked here
            } else {

               jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
        
   }

            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult,
null));

            return jobExecutionResult;
        } catch (Throwable t) {
            // get()
on the JobExecutionResult Future will throw an ExecutionException. This
            // behaviour was
largely not there in Flink versions before the PipelineExecutor
            // refactoring so we should
strip that exception.
            Throwable strippedException = ExceptionUtils.stripExecutionException(t);


           jobListeners.forEach(
                    jobListener -> {
                        jobListener.onJobExecuted(null,
strippedException);
                    });
            ExceptionUtils.rethrowException(strippedException);


           // never reached, only make javac happy
            return null;
        }
    }

Imported from Jira BEAM-12477. Original Jira may contain additional context.
Reported by: [email protected].

@github-ramA7
Copy link

Is there further updates on this. I am looking to use periodic metric pusher. But seems as mentioned in description the client execution is getting blocked. And I see metric pusher class being registered after attaining results object.

@xsolo
Copy link

xsolo commented Jan 8, 2023

we have the same issues, would like to know if this is on a road map? we use flink 1.15.3

@Abacn
Copy link
Contributor

Abacn commented May 10, 2024

fixed by #26158

@Abacn Abacn closed this as completed May 10, 2024
@github-actions github-actions bot added this to the 2.57.0 Release milestone May 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants