Skip to content

Commit

Permalink
[Bug](exchange) fix tablet sink shuffle without project not match the…
Browse files Browse the repository at this point in the history
… output tuple (apache#40299)

```
INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;

the tbl_1 have k1,k2 columns
the tbl_4 have k1,k2,v columns
if without project expr, will be only two columns not match the output tuple.
```
the co-auther of FE code  from @starocean999
  • Loading branch information
zhangstar333 committed Sep 26, 2024
1 parent f2b93d5 commit 2f427fd
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 6 deletions.
31 changes: 28 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "exchange_sink_operator.h"

#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>

Expand Down Expand Up @@ -249,6 +250,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode);
_tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
_tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false));
_tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
}
// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor =
Expand All @@ -265,7 +270,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
.txn_id = _txn_id,
.pool = p._pool.get(),
.location = _location,
.vec_output_expr_ctxs = &_fake_expr_ctxs,
.vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
.schema = _schema,
.caller = (void*)this,
.create_partition_callback = &ExchangeSinkLocalState::empty_callback_function});
Expand Down Expand Up @@ -355,7 +360,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_partition(sink.tablet_sink_partition),
_tablet_sink_location(sink.tablet_sink_location),
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
_tablet_sink_txn_id(sink.tablet_sink_txn_id) {
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand All @@ -367,13 +373,20 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
_output_tuple_id = sink.output_tuple_id;
}
}

Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tsink));
if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used");
}
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
_tablet_sink_expr_ctxs));
}
return Status::OK();
}

Expand All @@ -386,6 +399,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
Status ExchangeSinkOperatorX::open(RuntimeState* state) {
DCHECK(state != nullptr);
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc()));
} else {
auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state));
}
return Status::OK();
}

Expand Down Expand Up @@ -535,7 +560,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(local_state._send_new_partition_batch());
}
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels,
channel2rows, convert_block.get(), eos));
channel2rows, block, eos));
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
{
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

// for shuffle data by partition and tablet
int64_t _txn_id = -1;
vectorized::VExprContextSPtrs _fake_expr_ctxs;
vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
Expand Down Expand Up @@ -273,6 +273,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
const std::vector<TExpr> _texprs;

const RowDescriptor& _row_desc;
TTupleId _output_tuple_id = -1;

TPartitionType::type _part_type;

Expand All @@ -299,6 +300,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
const TTupleId _tablet_sink_tuple_id;
int64_t _tablet_sink_txn_id = -1;
std::shared_ptr<ObjectPool> _pool;
vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;

// for external table sink random partition
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> d
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context);
exchangeNode.updateTupleIds(tupleDescriptor);
exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc());
dataStreamSink.setExchNodeId(exchangeNode.getId());
dataStreamSink.setOutputPartition(dataPartition);
parentFragment.addChild(inputFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys
.createLocation(database.getId(), olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink {
protected TOlapTableLocationParam tabletSinkLocationParam = null;
protected TupleDescriptor tabletSinkTupleDesc = null;
protected long tabletSinkTxnId = -1;
protected List<Expr> tabletSinkExprs = null;

public DataStreamSink() {

Expand Down Expand Up @@ -145,6 +146,10 @@ public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) {
this.tabletSinkLocationParam = locationParam;
}

public void setTabletSinkExprs(List<Expr> tabletSinkExprs) {
this.tabletSinkExprs = tabletSinkExprs;
}

public void setTabletSinkTxnId(long txnId) {
this.tabletSinkTxnId = txnId;
}
Expand Down Expand Up @@ -224,6 +229,11 @@ protected TDataSink toThrift() {
if (tabletSinkLocationParam != null) {
tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
}
if (tabletSinkExprs != null) {
for (Expr expr : tabletSinkExprs) {
tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
}
}
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ struct TDataStreamSink {
10: optional Descriptors.TOlapTableLocationParam tablet_sink_location
11: optional i64 tablet_sink_txn_id
12: optional Types.TTupleId tablet_sink_tuple_id
13: optional list<Exprs.TExpr> tablet_sink_exprs
}

struct TMultiCastDataStreamSink {
Expand Down
3 changes: 3 additions & 0 deletions regression-test/data/nereids_p0/insert_into_table/random.out
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,6 @@
13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634
13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634

-- !sql_select --
1 11 11

11 changes: 11 additions & 0 deletions regression-test/suites/nereids_p0/insert_into_table/random.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,15 @@ suite('nereids_insert_random') {
sql 'set delete_without_partition=true'
sql '''delete from dup_t_type_cast_rd where id is not null'''
sql '''delete from dup_t_type_cast_rd where id is null'''

sql 'set enable_strict_consistency_dml=true'
sql 'drop table if exists tbl_1'
sql 'drop table if exists tbl_4'
sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");"""
sql """INSERT INTO tbl_1 VALUES (1, 11);"""
sql 'sync'
sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1"); """
sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;"""
sql 'sync'
qt_sql_select """ select * from tbl_4; """;
}

0 comments on commit 2f427fd

Please sign in to comment.