From 8cbcfa30be55be4e370e3bad6121eeb85ee020f7 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Mon, 23 Sep 2024 19:29:17 +0800 Subject: [PATCH] enablePassToOneExchange --- be/src/pipeline/exec/exchange_sink_operator.cpp | 5 +++-- be/src/pipeline/exec/exchange_sink_operator.h | 1 + be/src/runtime/runtime_state.h | 5 +++++ .../src/main/java/org/apache/doris/qe/SessionVariable.java | 7 +++++++ gensrc/thrift/PaloInternalService.thrift | 1 + 5 files changed, 17 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index f34da3f476bcdd..d73085aef811db 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -306,7 +306,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_txn_id(sink.tablet_sink_txn_id), _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()), - _is_multi_cast(is_multi_cast) { + _is_multi_cast(is_multi_cast), + _enable_pass_to_one_exchange(state->enable_pass_to_one_exchange()) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -683,7 +684,7 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { sort_source && sort_source->use_local_merge()) { // Sort the data local return ExchangeType::LOCAL_MERGE_SORT; - } else if (!_is_multi_cast) { + } else if (!_is_multi_cast && _enable_pass_to_one_exchange) { return ExchangeType::PASS_TO_ONE_EXCHANGE; } } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 9469db19f4d18b..2752a7ab3851c6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -283,6 +283,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX