Skip to content

Commit

Permalink
[pipelineX](fix) Fix local exchange on pipelineX engine (#27763)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Nov 30, 2023
1 parent 5739167 commit 34e53ac
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 60 deletions.
9 changes: 4 additions & 5 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,11 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
DCHECK(!_build_unique);
DCHECK(_have_other_join_conjunct);
}
return Status::OK();
}

Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
// init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need
// insert to output block of hash join.
// _left_output_slots_flags : column of left table need to output set flag = true
Expand All @@ -596,11 +600,6 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags);
init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
_right_output_slot_flags);
return Status::OK();
}

Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
// _other_join_conjuncts are evaluated in the context of the rows produced by this node
for (auto& conjunct : _other_join_conjuncts) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class JoinProbeLocalState : public PipelineXLocalState<DependencyType> {
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
vectorized::Block _join_block;
vectorized::MutableColumnPtr _tuple_is_null_left_flag_column;
vectorized::MutableColumnPtr _tuple_is_null_right_flag_column;
vectorized::MutableColumnPtr _tuple_is_null_left_flag_column = nullptr;
vectorized::MutableColumnPtr _tuple_is_null_right_flag_column = nullptr;

RuntimeProfile::Counter* _probe_timer = nullptr;
RuntimeProfile::Counter* _probe_rows_counter = nullptr;
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;

std::unique_ptr<vectorized::Block> _child_block;
std::unique_ptr<vectorized::Block> _child_block = nullptr;
SourceState _child_source_state;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
// Operators for pipelineX. All pipeline tasks share operators from this.
// [SourceOperator -> ... -> SinkOperator]
OperatorXs operatorXs;
DataSinkOperatorXPtr _sink_x;
DataSinkOperatorXPtr _sink_x = nullptr;

std::shared_ptr<ObjectPool> _obj_pool;

Expand Down
9 changes: 4 additions & 5 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,10 @@ struct LocalExchangeSharedState : public BasicSharedState {
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
std::vector<Dependency*> source_dependencies;
std::atomic<int> running_sink_operators = 0;
void add_running_sink_operators() { running_sink_operators++; }
std::mutex le_lock;
void sub_running_sink_operators() {
auto val = running_sink_operators.fetch_sub(1);
if (val == 1) {
std::unique_lock<std::mutex> lc(le_lock);
if (running_sink_operators.fetch_sub(1) == 1) {
_set_ready_for_read();
}
}
Expand All @@ -599,11 +599,10 @@ struct LocalExchangeSharedState : public BasicSharedState {
}
void set_dep_by_channel_id(Dependency* dep, int channel_id) {
source_dependencies[channel_id] = dep;
dep->block();
}
void set_ready_for_read(int channel_id) {
auto* dep = source_dependencies[channel_id];
DCHECK(dep);
DCHECK(dep) << channel_id << " " << (int64_t)this;
dep->set_ready();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_open_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");

auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
_num_rows_in_queue.resize(p._num_partitions);
for (size_t i = 0; i < p._num_partitions; i++) {
_num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL(
profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT, 1);
}
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
_shared_state->add_running_sink_operators();
return Status::OK();
}

Expand Down Expand Up @@ -59,8 +64,9 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state,
size_t size = _partition_rows_histogram[i + 1] - start;
if (size > 0) {
data_queue[i].enqueue({new_block, {row_idx, start, size}});
_shared_state->set_ready_for_read(i);
COUNTER_UPDATE(_num_rows_in_queue[i], size);
}
_shared_state->set_ready_for_read(i);
}

return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ class LocalExchangeSinkLocalState final

RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::vector<size_t> _partition_rows_histogram;
std::vector<RuntimeProfile::Counter*> _num_rows_in_queue {};
std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
std::vector<size_t> _partition_rows_histogram {};
};

// A single 32-bit division on a recent x64 processor has a throughput of one instruction every six cycles with a latency of 26 cycles.
Expand All @@ -69,8 +70,9 @@ struct LocalExchangeChannelIds {
class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeSinkLocalState> {
public:
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
LocalExchangeSinkOperatorX(int sink_id, int num_partitions, const std::vector<TExpr>& texprs)
: Base(sink_id, -1), _num_partitions(num_partitions), _texprs(texprs) {}
LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
const std::vector<TExpr>& texprs)
: Base(sink_id, -1, dest_id), _num_partitions(num_partitions), _texprs(texprs) {}

Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,40 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;

if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());

auto get_data = [&](vectorized::Block* result_block) {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
mutable_block->add_rows(partitioned_block.first.get(), offset_start,
offset_start + std::get<2>(partitioned_block.second));
} while (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block) &&
mutable_block->rows() < state->batch_size());
*block = mutable_block->to_block();
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
if (local_state._shared_state->running_sink_operators == 0) {
} while (mutable_block->rows() < state->batch_size() &&
local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block));
*result_block = mutable_block->to_block();
};
if (local_state._shared_state->running_sink_operators == 0) {
if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
} else if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
get_data(block);
} else {
local_state._dependency->block();
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
}

local_state.reached_limit(block, source_state);

return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ struct LocalExchangeSourceDependency final : public Dependency {
LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeSourceDependency", query_ctx) {}
~LocalExchangeSourceDependency() override = default;

void block() override {
if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) {
return;
}
std::unique_lock<std::mutex> lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) {
return;
}
Dependency::block();
}
};

class LocalExchangeSourceOperatorX;
Expand All @@ -52,7 +63,8 @@ class LocalExchangeSourceLocalState final
class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceLocalState> {
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, id) {}
LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase* parent)
: Base(pool, -1, id), _parent(parent) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
_op_name = "LOCAL_EXCHANGE_OPERATOR";
return Status::OK();
Expand All @@ -70,8 +82,21 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL

bool is_source() const override { return true; }

Status set_child(OperatorXPtr child) override {
if (_child_x) {
// Set build side child for join probe operator
DCHECK(_parent != nullptr);
RETURN_IF_ERROR(_parent->set_child(child));
} else {
_child_x = std::move(child);
}
return Status::OK();
}

private:
friend class LocalExchangeSourceLocalState;

OperatorXBase* _parent = nullptr;
};

} // namespace doris::pipeline
10 changes: 1 addition & 9 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@

namespace doris::pipeline {

#define CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state) \
auto _sptr = state->get_local_state(operator_id()); \
auto& local_state = _sptr->template cast<LocalState>();

#define CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state) \
auto _sptr = state->get_sink_local_state(operator_id()); \
auto& local_state = _sptr->template cast<LocalState>();

// This struct is used only for initializing local state.
struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
Expand Down Expand Up @@ -279,7 +271,7 @@ class OperatorXBase : public OperatorBase {

RowDescriptor _row_descriptor;

std::unique_ptr<RowDescriptor> _output_row_descriptor;
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;

/// Resource information sent from the frontend.
Expand Down
35 changes: 23 additions & 12 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
if (_prepared) {
return Status::InternalError("Already prepared");
}
_num_instances = request.local_params.size();
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_start_timer = ADD_TIMER(_runtime_profile, "StartTime");
COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time());
Expand Down Expand Up @@ -232,7 +233,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r

// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
//TODO: can we do this in set_sink?
DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size();
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
}
Expand Down Expand Up @@ -593,15 +594,17 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
}

Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe,
PipelinePtr& cur_pipe, const TPlanNode& tnode,
const std::vector<TExpr>& texprs) {
if (!_runtime_state->enable_local_shuffle() ||
_runtime_state->query_parallel_instance_num() == 1) {
if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
return Status::OK();
}
auto parent = op;
RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get()));
auto local_exchange_id = next_operator_id();
op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id, parent.get()));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
RETURN_IF_ERROR(parent->set_child(op));

const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
Expand All @@ -611,14 +614,15 @@ Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
auto num_instances = _runtime_state->query_parallel_instance_num();
sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id, num_instances, texprs));
sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id,
_num_instances, texprs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init());

auto shared_state = LocalExchangeSharedState::create_shared();
shared_state->data_queue.resize(num_instances);
shared_state->source_dependencies.resize(num_instances, nullptr);
shared_state->data_queue.resize(_num_instances);
shared_state->source_dependencies.resize(_num_instances, nullptr);
shared_state->running_sink_operators = _num_instances;
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
Expand Down Expand Up @@ -717,9 +721,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));

if (!tnode.agg_node.need_finalize) {
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
RETURN_IF_ERROR(
_add_local_exchange(pool, op, cur_pipe, tnode.agg_node.grouping_exprs));
RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
tnode.agg_node.grouping_exprs));
}
}
break;
Expand All @@ -740,6 +743,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));

std::vector<TExpr> probe_exprs;
const std::vector<TEqJoinCondition>& eq_join_conjuncts =
tnode.hash_join_node.eq_join_conjuncts;
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
probe_exprs.push_back(eq_join_conjunct.left);
}
RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe,
const std::vector<TExpr>& texprs);
const TPlanNode& tnode, const std::vector<TExpr>& texprs);

[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const doris::TPipelineFragmentParams& request,
Expand Down Expand Up @@ -170,7 +170,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wshadow-field"
#endif
DataSinkOperatorXPtr _sink;
DataSinkOperatorXPtr _sink = nullptr;
#ifdef __clang__
#pragma clang diagnostic pop
#endif
Expand Down Expand Up @@ -210,6 +210,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

int _operator_id = 0;
int _sink_operator_id = 0;
int _num_instances = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> _op_id_to_le_state;
};

Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,9 @@ Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id) {

void RuntimeState::emplace_sink_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> state) {
DCHECK(id < _op_id_to_sink_local_state.size());
DCHECK(!_op_id_to_sink_local_state[id]);
DCHECK(id < _op_id_to_sink_local_state.size())
<< " id=" << id << " state: " << state->debug_string(0);
DCHECK(!_op_id_to_sink_local_state[id]) << " id=" << id << " state: " << state->debug_string(0);
_op_id_to_sink_local_state[id] = std::move(state);
}

Expand Down

0 comments on commit 34e53ac

Please sign in to comment.