Skip to content

Commit

Permalink
[local exchange](fix) Fix correctness caused by local exchange (#41555)
Browse files Browse the repository at this point in the history
For plan `local exchange (hash shuffle) -> union -> colocated agg`, we
must ensure local exchange use the same hash algorithm as MPP shuffling.

This problem is covered by our test cases but only can be reproduced on
multiple BEs so no case is added in this PR.
  • Loading branch information
Gabriel39 authored Oct 9, 2024
1 parent 4cfd57e commit c065364
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 38 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class HashJoinBuildSinkOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
15 changes: 7 additions & 8 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,20 @@ class OperatorBase {
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
OperatorPtr child() { return _child; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
OperatorPtr _child = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
bool _followed_by_shuffled_operator = false;
};

class PipelineXLocalStateBase {
Expand Down Expand Up @@ -477,8 +480,6 @@ class DataSinkOperatorXBase : public OperatorBase {
[[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const;

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
Expand Down Expand Up @@ -663,8 +664,6 @@ class OperatorXBase : public OperatorBase {
[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) = 0;

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override;

[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
}
}

bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
bool _has_data(RuntimeState* state) const {
Expand Down
46 changes: 25 additions & 21 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const DescriptorTbl& descs, OperatorPtr parent,
int* node_idx, OperatorPtr* root,
PipelinePtr& cur_pipe, int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status::InternalError(
Expand All @@ -677,11 +677,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const TPlanNode& tnode = tnodes[*node_idx];

int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
OperatorPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_join));
followed_by_shuffled_operator));

// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
Expand All @@ -691,7 +691,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
*root = op;
}
/**
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
*
* For plan:
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
Expand All @@ -704,15 +704,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
auto require_shuffled_data_distribution =
cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution()
: op->require_shuffled_data_distribution();
current_followed_by_shuffled_join =
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
current_followed_by_shuffled_operator =
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;

// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
cur_pipe, i, current_followed_by_shuffled_join));
cur_pipe, i, current_followed_by_shuffled_operator));

// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
Expand Down Expand Up @@ -753,13 +753,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
*/
const bool followed_by_shuffled_join = operators.size() > idx
? operators[idx]->followed_by_shuffled_join()
: cur_pipe->sink()->followed_by_shuffled_join();
const bool followed_by_shuffled_operator =
operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
: cur_pipe->sink()->followed_by_shuffled_operator();
const bool should_disable_bucket_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
followed_by_shuffled_join;
followed_by_shuffled_operator;
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id,
should_disable_bucket_shuffle ? _total_instances : _num_instances,
Expand Down Expand Up @@ -1199,7 +1199,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
Expand Down Expand Up @@ -1321,15 +1321,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_join(false);
op->set_followed_by_shuffled_operator(false);
_require_bucket_distribution = true;
RETURN_IF_ERROR(new_pipe->add_operator(op));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand Down Expand Up @@ -1384,7 +1384,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1434,8 +1434,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand All @@ -1456,8 +1456,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
Expand Down Expand Up @@ -1487,6 +1487,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(op));

const auto downstream_pipeline_id = cur_pipe->id();
Expand All @@ -1498,6 +1499,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs));
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -1531,7 +1533,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1571,7 +1573,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
DataSinkOperatorPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand All @@ -1582,11 +1584,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ class PipelineTask {
bool is_finalized() const { return _finalized; }

void clear_blocking_state(bool wake_up_by_downstream = false) {
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
Expand Down

0 comments on commit c065364

Please sign in to comment.