From 1d7d88a11dfef50a44bb28d9f443ea9e94ca4295 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 28 Mar 2024 16:40:26 +0800 Subject: [PATCH] [shuffle](fix) Do not release resources if rpc has not done --- be/src/pipeline/task_scheduler.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 91adf66f3c2999f..1ac394cb9a54eea 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -296,8 +296,18 @@ void TaskScheduler::_do_work(size_t index) { // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - Status cancel_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, PipelineTaskState::CANCELED, cancel_status); + if (task->is_pending_finish()) { + // Only meet eos, should set task to PENDING_FINISH state + task->set_state(PipelineTaskState::PENDING_FINISH); + task->set_running(false); + + if (!task->is_pipelineX()) { + static_cast(_blocked_task_scheduler->add_blocked_task(task)); + } + } else { + Status cancel_status = fragment_ctx->get_query_ctx()->exec_status(); + _close_task(task, PipelineTaskState::CANCELED, cancel_status); + } continue; }