Skip to content

Commit

Permalink
multi-cast
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Sep 23, 2024
1 parent d7d5242 commit a406e51
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const {

ExchangeSinkOperatorX::ExchangeSinkOperatorX(
RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations)
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations,
bool is_multi_cast)
: DataSinkOperatorX(operator_id, sink.dest_node_id),
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
Expand All @@ -303,7 +304,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()) {
_enable_local_merge_sort(state->enable_local_merge_sort()),
_is_multi_cast(is_multi_cast) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -680,7 +682,7 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
} else {
} else if (!_is_multi_cast) {
return ExchangeType::PASS_TO_ONE_EXCHANGE;
}
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations);
const std::vector<TPlanFragmentDestination>& destinations,
bool is_multi_cast = false);
Status init(const TDataSink& tsink) override;

RuntimeState* state() { return _state; }
Expand Down Expand Up @@ -276,6 +277,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
size_t _data_processed = 0;
int _writer_count = 1;
const bool _enable_local_merge_sort;
const bool _is_multi_cast;
};

} // namespace pipeline
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,10 +1184,10 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
// 2. create and set sink operator of data stream sender for new pipeline

DataSinkOperatorPtr sink_op;
sink_op.reset(
new ExchangeSinkOperatorX(state, *_row_desc, next_sink_operator_id(),
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i]));
sink_op.reset(new ExchangeSinkOperatorX(
state, *_row_desc, next_sink_operator_id(),
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i], true));

RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
{
Expand Down

0 comments on commit a406e51

Please sign in to comment.