Skip to content

Commit

Permalink
one task
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Sep 23, 2024
1 parent a406e51 commit 48635d4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
5 changes: 5 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,11 @@ Status PipelineFragmentContext::_add_local_exchange_impl(

// 7. Inherit properties from current pipeline.
_inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);

if (data_distribution.distribution_type == ExchangeType::PASS_TO_ONE_EXCHANGE) {
cur_pipe->set_num_tasks(1);
}

return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
return;
}
_sender_eos_set.insert(be_number);
DCHECK_GT(_num_remaining_senders, 0);
_num_remaining_senders--;
_num_remaining_senders = 0;

_record_debug_info();
VLOG_FILE << "decremented senders: fragment_instance_id="
<< print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
Expand Down

0 comments on commit 48635d4

Please sign in to comment.