From eb5bb6c4dac991f993680463afef3842f6db9aaa Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Sun, 22 Sep 2024 22:15:43 +0800 Subject: [PATCH] multi-cast --- be/src/pipeline/exec/exchange_sink_operator.cpp | 8 +++++--- be/src/pipeline/exec/exchange_sink_operator.h | 4 +++- be/src/pipeline/pipeline_fragment_context.cpp | 8 ++++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 39dfec1e3b53956..29ccbe3c2452267 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -289,7 +289,8 @@ segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const { ExchangeSinkOperatorX::ExchangeSinkOperatorX( RuntimeState* state, const RowDescriptor& row_desc, int operator_id, - const TDataStreamSink& sink, const std::vector& destinations) + const TDataStreamSink& sink, const std::vector& destinations, + bool is_multi_cast) : DataSinkOperatorX(operator_id, sink.dest_node_id), _texprs(sink.output_partition.partition_exprs), _row_desc(row_desc), @@ -303,7 +304,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), _tablet_sink_txn_id(sink.tablet_sink_txn_id), _t_tablet_sink_exprs(&sink.tablet_sink_exprs), - _enable_local_merge_sort(state->enable_local_merge_sort()) { + _enable_local_merge_sort(state->enable_local_merge_sort()), + _is_multi_cast(is_multi_cast) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -680,7 +682,7 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { sort_source && sort_source->use_local_merge()) { // Sort the data local return ExchangeType::LOCAL_MERGE_SORT; - } else { + } else if (!_is_multi_cast) { return ExchangeType::PASS_TO_ONE_EXCHANGE; } } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index d52911fde9032a3..64ffdf0f25f5806 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -206,7 +206,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX& destinations); + const std::vector& destinations, + bool is_multi_cast = false); Status init(const TDataSink& tsink) override; RuntimeState* state() { return _state; } @@ -276,6 +277,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorXset_sink(sink_op)); {