diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 8af922bffeee588..7b70420746f8b0b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -302,7 +302,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_location(sink.tablet_sink_location), _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), _tablet_sink_txn_id(sink.tablet_sink_txn_id), - _t_output_expr(&sink.tablet_sink_exprs), + _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || @@ -315,6 +315,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared(); + if (sink.__isset.output_tuple_id) { + _output_tuple_id = sink.output_tuple_id; + } } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -323,8 +326,8 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); } if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { - RETURN_IF_ERROR( - vectorized::VExpr::create_expr_trees(*_t_output_expr, _tablet_sink_expr_ctxs)); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, + _tablet_sink_expr_ctxs)); } return Status::OK(); } @@ -333,8 +336,15 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { _state = state; _mem_tracker = std::make_unique("ExchangeSinkOperatorX:"); if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { - RETURN_IF_ERROR( - vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child_x->row_desc())); + if (_output_tuple_id == -1) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, + _child_x->row_desc())); + } else { + auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false)); + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc)); + } RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); } return Status::OK(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index ab6014556ff8deb..e253aeadc85c726 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -240,6 +240,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _texprs; const RowDescriptor& _row_desc; + TTupleId _output_tuple_id = -1; TPartitionType::type _part_type; @@ -267,7 +268,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _pool; vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; - const std::vector* _t_output_expr = nullptr; + const std::vector* _t_tablet_sink_exprs = nullptr; // for external table sink random partition // Control the number of channels according to the flow, thereby controlling the number of table sink writers. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 1201bc6e32cc721..21d80a30f1ca18b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -358,8 +358,7 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context); - exchangeNode.updateTupleIds(tupleDescriptor); + exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc()); dataStreamSink.setExchNodeId(exchangeNode.getId()); dataStreamSink.setOutputPartition(dataPartition); parentFragment.addChild(inputFragment);