diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 412c358037d2c7..174d102993d395 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -577,7 +577,11 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) DCHECK(!_build_unique); DCHECK(_have_other_join_conjunct); } + return Status::OK(); +} +Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need // insert to output block of hash join. // _left_output_slots_flags : column of left table need to output set flag = true @@ -596,11 +600,6 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags); init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(), _right_output_slot_flags); - return Status::OK(); -} - -Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); // _other_join_conjuncts are evaluated in the context of the rows produced by this node for (auto& conjunct : _other_join_conjuncts) { diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 67537e65cac34c..12d89c1049eeea 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -50,15 +50,15 @@ class JoinProbeLocalState : public PipelineXLocalState { // output expr vectorized::VExprContextSPtrs _output_expr_ctxs; vectorized::Block _join_block; - vectorized::MutableColumnPtr _tuple_is_null_left_flag_column; - vectorized::MutableColumnPtr _tuple_is_null_right_flag_column; + vectorized::MutableColumnPtr _tuple_is_null_left_flag_column = nullptr; + vectorized::MutableColumnPtr _tuple_is_null_right_flag_column = nullptr; RuntimeProfile::Counter* _probe_timer = nullptr; RuntimeProfile::Counter* _probe_rows_counter = nullptr; RuntimeProfile::Counter* _join_filter_timer = nullptr; RuntimeProfile::Counter* _build_output_block_timer = nullptr; - std::unique_ptr _child_block; + std::unique_ptr _child_block = nullptr; SourceState _child_source_state; }; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 7dcffb410a276a..f4b7928887c3fe 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -145,7 +145,7 @@ class Pipeline : public std::enable_shared_from_this { // Operators for pipelineX. All pipeline tasks share operators from this. // [SourceOperator -> ... -> SinkOperator] OperatorXs operatorXs; - DataSinkOperatorXPtr _sink_x; + DataSinkOperatorXPtr _sink_x = nullptr; std::shared_ptr _obj_pool; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 8a55efcdb7f84d..3fd1489103ded3 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -584,10 +584,10 @@ struct LocalExchangeSharedState : public BasicSharedState { std::vector> data_queue; std::vector source_dependencies; std::atomic running_sink_operators = 0; - void add_running_sink_operators() { running_sink_operators++; } + std::mutex le_lock; void sub_running_sink_operators() { - auto val = running_sink_operators.fetch_sub(1); - if (val == 1) { + std::unique_lock lc(le_lock); + if (running_sink_operators.fetch_sub(1) == 1) { _set_ready_for_read(); } } @@ -599,11 +599,10 @@ struct LocalExchangeSharedState : public BasicSharedState { } void set_dep_by_channel_id(Dependency* dep, int channel_id) { source_dependencies[channel_id] = dep; - dep->block(); } void set_ready_for_read(int channel_id) { auto* dep = source_dependencies[channel_id]; - DCHECK(dep); + DCHECK(dep) << channel_id << " " << (int64_t)this; dep->set_ready(); } }; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index a793a22761fe6c..12cc5e042e868f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -25,9 +25,14 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_open_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + auto& p = _parent->cast(); + _num_rows_in_queue.resize(p._num_partitions); + for (size_t i = 0; i < p._num_partitions; i++) { + _num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL( + profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT, 1); + } RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); - _shared_state->add_running_sink_operators(); return Status::OK(); } @@ -59,8 +64,9 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state, size_t size = _partition_rows_histogram[i + 1] - start; if (size > 0) { data_queue[i].enqueue({new_block, {row_idx, start, size}}); + _shared_state->set_ready_for_read(i); + COUNTER_UPDATE(_num_rows_in_queue[i], size); } - _shared_state->set_ready_for_read(i); } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index b6ce3fbeb9e9ca..45d61d4ff6bb12 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -51,8 +51,9 @@ class LocalExchangeSinkLocalState final RuntimeProfile::Counter* _compute_hash_value_timer = nullptr; RuntimeProfile::Counter* _distribute_timer = nullptr; - std::unique_ptr _partitioner; - std::vector _partition_rows_histogram; + std::vector _num_rows_in_queue {}; + std::unique_ptr _partitioner = nullptr; + std::vector _partition_rows_histogram {}; }; // A single 32-bit division on a recent x64 processor has a throughput of one instruction every six cycles with a latency of 26 cycles. @@ -69,8 +70,9 @@ struct LocalExchangeChannelIds { class LocalExchangeSinkOperatorX final : public DataSinkOperatorX { public: using Base = DataSinkOperatorX; - LocalExchangeSinkOperatorX(int sink_id, int num_partitions, const std::vector& texprs) - : Base(sink_id, -1), _num_partitions(num_partitions), _texprs(texprs) {} + LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions, + const std::vector& texprs) + : Base(sink_id, -1, dest_id), _num_partitions(num_partitions), _texprs(texprs) {} Status init(const TPlanNode& tnode, RuntimeState* state) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 81c9cc21447162..83dac5eb8f4046 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -41,30 +41,40 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized:: PartitionedBlock partitioned_block; std::unique_ptr mutable_block = nullptr; - if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( - partitioned_block)) { - SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = - vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); - + auto get_data = [&](vectorized::Block* result_block) { do { const auto* offset_start = &(( *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); mutable_block->add_rows(partitioned_block.first.get(), offset_start, offset_start + std::get<2>(partitioned_block.second)); - } while (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( - partitioned_block) && - mutable_block->rows() < state->batch_size()); - *block = mutable_block->to_block(); - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - if (local_state._shared_state->running_sink_operators == 0) { + } while (mutable_block->rows() < state->batch_size() && + local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( + partitioned_block)); + *result_block = mutable_block->to_block(); + }; + if (local_state._shared_state->running_sink_operators == 0) { + if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( + partitioned_block)) { + SCOPED_TIMER(local_state._copy_data_timer); + mutable_block = + vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); + get_data(block); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); source_state = SourceState::FINISHED; } + } else if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( + partitioned_block)) { + SCOPED_TIMER(local_state._copy_data_timer); + mutable_block = + vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); + get_data(block); + } else { + local_state._dependency->block(); + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); } local_state.reached_limit(block, source_state); - return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index ebf18d9a249612..3ccc38854f5c74 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -28,6 +28,17 @@ struct LocalExchangeSourceDependency final : public Dependency { LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx) : Dependency(id, node_id, "LocalExchangeSourceDependency", query_ctx) {} ~LocalExchangeSourceDependency() override = default; + + void block() override { + if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) { + return; + } + std::unique_lock lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock); + if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) { + return; + } + Dependency::block(); + } }; class LocalExchangeSourceOperatorX; @@ -52,7 +63,8 @@ class LocalExchangeSourceLocalState final class LocalExchangeSourceOperatorX final : public OperatorX { public: using Base = OperatorX; - LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, id) {} + LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase* parent) + : Base(pool, -1, id), _parent(parent) {} Status init(const TPlanNode& tnode, RuntimeState* state) override { _op_name = "LOCAL_EXCHANGE_OPERATOR"; return Status::OK(); @@ -70,8 +82,21 @@ class LocalExchangeSourceOperatorX final : public OperatorXset_child(child)); + } else { + _child_x = std::move(child); + } + return Status::OK(); + } + private: friend class LocalExchangeSourceLocalState; + + OperatorXBase* _parent = nullptr; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index ed2dbfc3d50002..5fa6785435bfd7 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -23,14 +23,6 @@ namespace doris::pipeline { -#define CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state) \ - auto _sptr = state->get_local_state(operator_id()); \ - auto& local_state = _sptr->template cast(); - -#define CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state) \ - auto _sptr = state->get_sink_local_state(operator_id()); \ - auto& local_state = _sptr->template cast(); - // This struct is used only for initializing local state. struct LocalStateInfo { RuntimeProfile* parent_profile = nullptr; @@ -279,7 +271,7 @@ class OperatorXBase : public OperatorBase { RowDescriptor _row_descriptor; - std::unique_ptr _output_row_descriptor; + std::unique_ptr _output_row_descriptor = nullptr; vectorized::VExprContextSPtrs _projections; /// Resource information sent from the frontend. diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 1924dc90d585d6..d49e290c0441b2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r if (_prepared) { return Status::InternalError("Already prepared"); } + _num_instances = request.local_params.size(); _runtime_profile.reset(new RuntimeProfile("PipelineContext")); _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); @@ -232,7 +233,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { - //TODO: can we do this in set_sink? + DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); static_cast(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); } @@ -593,15 +594,17 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, } Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op, - PipelinePtr& cur_pipe, + PipelinePtr& cur_pipe, const TPlanNode& tnode, const std::vector& texprs) { - if (!_runtime_state->enable_local_shuffle() || - _runtime_state->query_parallel_instance_num() == 1) { + if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) { return Status::OK(); } + auto parent = op; + RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get())); auto local_exchange_id = next_operator_id(); - op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); + op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id, parent.get())); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + RETURN_IF_ERROR(parent->set_child(op)); const auto downstream_pipeline_id = cur_pipe->id(); if (_dag.find(downstream_pipeline_id) == _dag.end()) { @@ -611,14 +614,15 @@ Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - auto num_instances = _runtime_state->query_parallel_instance_num(); - sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id, num_instances, texprs)); + sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id, + _num_instances, texprs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init()); auto shared_state = LocalExchangeSharedState::create_shared(); - shared_state->data_queue.resize(num_instances); - shared_state->source_dependencies.resize(num_instances, nullptr); + shared_state->data_queue.resize(_num_instances); + shared_state->source_dependencies.resize(_num_instances, nullptr); + shared_state->running_sink_operators = _num_instances; _op_id_to_le_state.insert({local_exchange_id, shared_state}); return Status::OK(); } @@ -717,9 +721,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); if (!tnode.agg_node.need_finalize) { - RETURN_IF_ERROR(op->init(tnode, _runtime_state.get())); - RETURN_IF_ERROR( - _add_local_exchange(pool, op, cur_pipe, tnode.agg_node.grouping_exprs)); + RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, + tnode.agg_node.grouping_exprs)); } } break; @@ -740,6 +743,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + + std::vector probe_exprs; + const std::vector& eq_join_conjuncts = + tnode.hash_join_node.eq_join_conjuncts; + for (const auto& eq_join_conjunct : eq_join_conjuncts) { + probe_exprs.push_back(eq_join_conjunct.left); + } + RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs)); _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); break; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index f579265ab63f9b..7f47052296e607 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -125,7 +125,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { void _close_fragment_instance() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe, - const std::vector& texprs); + const TPlanNode& tnode, const std::vector& texprs); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -170,7 +170,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wshadow-field" #endif - DataSinkOperatorXPtr _sink; + DataSinkOperatorXPtr _sink = nullptr; #ifdef __clang__ #pragma clang diagnostic pop #endif @@ -210,6 +210,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { int _operator_id = 0; int _sink_operator_id = 0; + int _num_instances = 0; std::map> _op_id_to_le_state; }; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index c6df2daff0d718..43eff466019031 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -458,8 +458,9 @@ Result RuntimeState::get_local_state_result(int id) { void RuntimeState::emplace_sink_local_state( int id, std::unique_ptr state) { - DCHECK(id < _op_id_to_sink_local_state.size()); - DCHECK(!_op_id_to_sink_local_state[id]); + DCHECK(id < _op_id_to_sink_local_state.size()) + << " id=" << id << " state: " << state->debug_string(0); + DCHECK(!_op_id_to_sink_local_state[id]) << " id=" << id << " state: " << state->debug_string(0); _op_id_to_sink_local_state[id] = std::move(state); }