diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index de46a14ed3fd92..e11705b6fd3fd1 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -931,6 +931,11 @@ Status PipelineFragmentContext::_add_local_exchange_impl( // 7. Inherit properties from current pipeline. _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip); + + if (data_distribution.distribution_type == ExchangeType::PASS_TO_ONE_EXCHANGE) { + cur_pipe->set_num_tasks(1); + } + return Status::OK(); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1ca6bb7f2c5931..006736c2b15fbc 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -255,8 +255,8 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { return; } _sender_eos_set.insert(be_number); - DCHECK_GT(_num_remaining_senders, 0); - _num_remaining_senders--; + _num_remaining_senders = 0; + _record_debug_info(); VLOG_FILE << "decremented senders: fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()