Skip to content

Commit

Permalink
[local exchange](fix) Fix correctness caused by local exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Oct 8, 2024
1 parent bdd7382 commit d6fd887
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 37 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class HashJoinBuildSinkOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
15 changes: 7 additions & 8 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,20 @@ class OperatorBase {
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
OperatorPtr child() { return _child; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
OperatorPtr _child = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
bool _followed_by_shuffled_operator = false;
};

class PipelineXLocalStateBase {
Expand Down Expand Up @@ -477,8 +480,6 @@ class DataSinkOperatorXBase : public OperatorBase {
[[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const;

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
Expand Down Expand Up @@ -663,8 +664,6 @@ class OperatorXBase : public OperatorBase {
[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) = 0;

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override;

[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
}
}

bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
bool _has_data(RuntimeState* state) const {
Expand Down
46 changes: 25 additions & 21 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const DescriptorTbl& descs, OperatorPtr parent,
int* node_idx, OperatorPtr* root,
PipelinePtr& cur_pipe, int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status::InternalError(
Expand All @@ -677,11 +677,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const TPlanNode& tnode = tnodes[*node_idx];

int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
OperatorPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_join));
followed_by_shuffled_operator));

// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
Expand All @@ -691,7 +691,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
*root = op;
}
/**
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
*
* For plan:
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
Expand All @@ -704,15 +704,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
auto require_shuffled_data_distribution =
cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution()
: op->require_shuffled_data_distribution();
current_followed_by_shuffled_join =
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
current_followed_by_shuffled_operator =
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;

// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
cur_pipe, i, current_followed_by_shuffled_join));
cur_pipe, i, current_followed_by_shuffled_operator));

// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
Expand Down Expand Up @@ -753,13 +753,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
*/
const bool followed_by_shuffled_join = operators.size() > idx
? operators[idx]->followed_by_shuffled_join()
: cur_pipe->sink()->followed_by_shuffled_join();
const bool followed_by_shuffled_operator =
operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
: cur_pipe->sink()->followed_by_shuffled_operator();
const bool should_disable_bucket_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
followed_by_shuffled_join;
followed_by_shuffled_operator;
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id,
should_disable_bucket_shuffle ? _total_instances : _num_instances,
Expand Down Expand Up @@ -1199,7 +1199,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
Expand Down Expand Up @@ -1321,15 +1321,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_join(false);
op->set_followed_by_shuffled_operator(false);
_require_bucket_distribution = true;
RETURN_IF_ERROR(new_pipe->add_operator(op));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand Down Expand Up @@ -1384,7 +1384,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1434,8 +1434,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand All @@ -1456,8 +1456,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
Expand Down Expand Up @@ -1487,6 +1487,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(op));

const auto downstream_pipeline_id = cur_pipe->id();
Expand All @@ -1498,6 +1499,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs));
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -1531,7 +1533,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1571,7 +1573,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
DataSinkOperatorPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand All @@ -1582,11 +1584,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
Expand Down

0 comments on commit d6fd887

Please sign in to comment.