From 9c273d8cd642b22112baba736a966d48fa5953b1 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Fri, 29 Mar 2024 00:14:26 +0800 Subject: [PATCH] [improvement](partial-update) make partial-update on agg table use less memory --- be/src/olap/memtable.cpp | 6 +- .../olap/rowset/segment_v2/segment_writer.cpp | 27 ++- .../segment_v2/vertical_segment_writer.cpp | 52 +++-- be/src/olap/tablet_schema.cpp | 50 +++++ be/src/olap/tablet_schema.h | 3 + .../nereids/parser/LogicalPlanBuilder.java | 3 +- .../nereids/rules/analysis/BindSink.java | 3 +- .../doris/planner/StreamLoadPlanner.java | 10 +- .../org/apache/doris/qe/SessionVariable.java | 13 ++ .../test_agg_partial_update_insert_into.out | 34 ++++ .../test_agg_partial_update_stream_load.out | 41 ++++ ...test_agg_partial_update_insert_into.groovy | 95 ++++++++++ ...test_agg_partial_update_stream_load.groovy | 179 ++++++++++++++++++ 13 files changed, 493 insertions(+), 23 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.out create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.out create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.groovy create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.groovy diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 3e6381a0e6e243e..fa086f3526ae736 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -107,6 +107,10 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { function = vectorized::AggregateFunctionSimpleFactory::instance().get( "replace_load", {block->get_data_type(cid)}, block->get_data_type(cid)->is_nullable()); + } else if (_keys_type == KeysType::AGG_KEYS && _is_partial_update) { + function = vectorized::AggregateFunctionSimpleFactory::instance().get( + "replace_if_not_null_load", {block->get_data_type(cid)}, + block->get_data_type(cid)->is_nullable()); } else { function = _tablet_schema->column(cid).get_aggregate_function(vectorized::AGG_LOAD_SUFFIX); @@ -485,7 +489,7 @@ void MemTable::shrink_memtable_by_agg() { bool MemTable::need_flush() const { auto max_size = config::write_buffer_size; - if (_is_partial_update) { + if (_is_partial_update && _keys_type == KeysType::UNIQUE_KEYS) { auto update_columns_size = _num_columns; max_size = max_size * update_columns_size / _tablet_schema->num_columns(); max_size = max_size > 1048576 ? max_size : 1048576; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 8422d9a290c79fc..80f92a99e019132 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -766,11 +766,30 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po if (_opts.rowset_ctx->partial_update_info && _opts.rowset_ctx->partial_update_info->is_partial_update && _opts.write_type == DataWriteType::TYPE_DIRECT && - !_opts.rowset_ctx->is_transient_rowset_writer) { + !_opts.rowset_ctx->is_transient_rowset_writer && + _tablet_schema->keys_type() == UNIQUE_KEYS) { RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows)); return Status::OK(); } - CHECK(block->columns() >= _column_writers.size()) + bool is_agg_partial_update = _opts.rowset_ctx->partial_update_info && + _opts.rowset_ctx->partial_update_info->is_partial_update && + _tablet_schema->keys_type() == AGG_KEYS; + std::shared_ptr full_block_ptr = nullptr; + if (is_agg_partial_update) { + if (block->columns() <= _tablet_schema->num_key_columns() || + block->columns() >= _tablet_schema->num_columns()) { + return Status::InternalError(fmt::format( + "illegal partial update block columns: {}, num key columns: {}, total " + "schema columns: {}", + block->columns(), _tablet_schema->num_key_columns(), + _tablet_schema->num_columns())); + } + RETURN_IF_ERROR(_tablet_schema->make_full_block( + full_block_ptr, block, num_rows, _opts.rowset_ctx->partial_update_info->update_cids, + _opts.rowset_ctx->partial_update_info->missing_cids)); + } + const vectorized::Block* full_block = is_agg_partial_update ? full_block_ptr.get() : block; + CHECK(full_block->columns() >= _column_writers.size()) << ", block->columns()=" << block->columns() << ", _column_writers.size()=" << _column_writers.size(); // Row column should be filled here when it's a directly write from memtable @@ -778,10 +797,10 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po if (_tablet_schema->store_row_column() && (_opts.write_type == DataWriteType::TYPE_DIRECT || _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) { - _serialize_block_to_row_column(*const_cast(block)); + _serialize_block_to_row_column(*const_cast(full_block)); } - _olap_data_convertor->set_source_content(block, row_pos, num_rows); + _olap_data_convertor->set_source_content(full_block, row_pos, num_rows); // find all row pos for short key indexes std::vector short_key_pos; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index d7b8081b5984d3c..abba9b35e900d27 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -694,13 +694,18 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t _opts.rowset_ctx->partial_update_info->is_partial_update && _opts.write_type == DataWriteType::TYPE_DIRECT && !_opts.rowset_ctx->is_transient_rowset_writer) { + if (_tablet_schema->keys_type() != UNIQUE_KEYS && _tablet_schema->keys_type() != AGG_KEYS) { + return Status::InternalError(fmt::format("illegal partial update key type: {}", + _tablet_schema->keys_type())); + } if (block->columns() <= _tablet_schema->num_key_columns() || block->columns() >= _tablet_schema->num_columns()) { - return Status::InternalError(fmt::format( - "illegal partial update block columns: {}, num key columns: {}, total " - "schema columns: {}", - block->columns(), _tablet_schema->num_key_columns(), - _tablet_schema->num_columns())); + return Status::InternalError( + fmt::format("illegal partial update key type {}, block columns: {}, num key " + "columns: {}, total " + "schema columns: {}", + _tablet_schema->keys_type(), block->columns(), + _tablet_schema->num_key_columns(), _tablet_schema->num_columns())); } } else if (block->columns() != _tablet_schema->num_columns()) { return Status::InternalError( @@ -715,7 +720,8 @@ Status VerticalSegmentWriter::write_batch() { if (_opts.rowset_ctx->partial_update_info && _opts.rowset_ctx->partial_update_info->is_partial_update && _opts.write_type == DataWriteType::TYPE_DIRECT && - !_opts.rowset_ctx->is_transient_rowset_writer) { + !_opts.rowset_ctx->is_transient_rowset_writer && + _tablet_schema->keys_type() == UNIQUE_KEYS) { for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); } @@ -728,14 +734,35 @@ Status VerticalSegmentWriter::write_batch() { } return Status::OK(); } + bool is_agg_partial_update = _opts.rowset_ctx->partial_update_info && + _opts.rowset_ctx->partial_update_info->is_partial_update && + _tablet_schema->keys_type() == AGG_KEYS; + std::vector> full_blocks; + if (is_agg_partial_update) { + for (auto& data : _batched_blocks) { + std::shared_ptr full_block_ptr = nullptr; + RETURN_IF_ERROR(_tablet_schema->make_full_block( + full_block_ptr, data.block, data.num_rows, + _opts.rowset_ctx->partial_update_info->update_cids, + _opts.rowset_ctx->partial_update_info->missing_cids)); + full_blocks.emplace_back(full_block_ptr); + } + } + // Row column should be filled here when it's a directly write from memtable // or it's schema change write(since column data type maybe changed, so we should reubild) if (_tablet_schema->store_row_column() && (_opts.write_type == DataWriteType::TYPE_DIRECT || _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) { + int index = 0; for (auto& data : _batched_blocks) { // TODO: maybe we should pass range to this method - _serialize_block_to_row_column(*const_cast(data.block)); + if (is_agg_partial_update) { + _serialize_block_to_row_column( + *const_cast(full_blocks[index++].get())); + } else { + _serialize_block_to_row_column(*const_cast(data.block)); + } } } @@ -743,9 +770,11 @@ Status VerticalSegmentWriter::write_batch() { vectorized::IOlapColumnDataAccessor* seq_column = nullptr; for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); + int index = 0; for (auto& data : _batched_blocks) { _olap_data_convertor->set_source_content_with_specifid_columns( - data.block, data.row_pos, data.num_rows, std::vector {cid}); + is_agg_partial_update ? full_blocks[index++].get() : data.block, data.row_pos, + data.num_rows, std::vector {cid}); // convert column data from engine format to storage layer format auto [status, column] = _olap_data_convertor->convert_column_data(cid); @@ -770,9 +799,11 @@ Status VerticalSegmentWriter::write_batch() { RETURN_IF_ERROR(_column_writers[cid]->finish()); RETURN_IF_ERROR(_column_writers[cid]->write_data()); } - + int index = 0; for (auto& data : _batched_blocks) { - _olap_data_convertor->set_source_content(data.block, data.row_pos, data.num_rows); + _olap_data_convertor->set_source_content( + is_agg_partial_update ? full_blocks[index++].get() : data.block, data.row_pos, + data.num_rows); // find all row pos for short key indexes std::vector short_key_pos; // We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we @@ -814,7 +845,6 @@ Status VerticalSegmentWriter::write_batch() { _olap_data_convertor->clear_source_content(); _num_rows_written += data.num_rows; } - _batched_blocks.clear(); return Status::OK(); } diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index a13598c2fad0e9e..7fd65f7b0db9348 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1470,4 +1470,54 @@ std::string TabletSchema::deterministic_string_serialize(const TabletSchemaPB& s return output; } +Status TabletSchema::make_full_block(std::shared_ptr& full_block_ptr, + const vectorized::Block* block, size_t num_rows, + std::vector& update_cids, + std::vector& missing_cids) { + full_block_ptr = std::make_shared(this->create_block()); + size_t input_id = 0; + for (auto i : update_cids) { + full_block_ptr->replace_by_position(i, block->get_by_position(input_id++).column); + } + auto mutable_full_columns = full_block_ptr->mutate_columns(); + auto old_value_block = create_block_by_cids(missing_cids); + CHECK(missing_cids.size() == old_value_block.columns()); + + // build default value columns + auto default_value_block = old_value_block.clone_empty(); + auto mutable_default_value_columns = default_value_block.mutate_columns(); + + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& missing_column = this->column(missing_cids[i]); + if (missing_column.has_default_value()) { + auto default_value = this->column(missing_cids[i]).default_value(); + vectorized::ReadBuffer rb(const_cast(default_value.c_str()), + default_value.size()); + RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string( + rb, mutable_default_value_columns[i].get())); + } + } + + for (auto idx = 0; idx < num_rows; idx++) { + for (auto i = 0; i < missing_cids.size(); ++i) { + // if the column has default value, fill it with default value + // otherwise, if the column is nullable, fill it with null value + const auto& tablet_column = this->column(missing_cids[i]); + if (tablet_column.is_nullable()) { + auto nullable_column = assert_cast( + mutable_full_columns[missing_cids[i]].get()); + nullable_column->insert_null_elements(1); + } else if (tablet_column.has_default_value()) { + mutable_full_columns[missing_cids[i]]->insert_from( + *mutable_default_value_columns[i].get(), 0); + } else { + return Status::InternalError( + "missing column {} is not null but don't have default value", + tablet_column.name()); + } + } + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index b5b8df730b3350d..383e6d697c88c92 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -435,6 +435,9 @@ class TabletSchema { void update_tablet_columns(const TabletSchema& tablet_schema, const std::vector& t_columns); + Status make_full_block(std::shared_ptr& full_block_ptr, + const vectorized::Block* block, size_t num_rows, + std::vector& update_cids, std::vector& missing_cids); private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a9ba3603f534429..67059c5eb0f50c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -539,7 +539,8 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { partitionSpec.second, // partition names isAutoDetect, isOverwrite, - ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), + ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate() + || ConnectContext.get().getSessionVariable().isEnableAggregateKeyPartialUpdate(), DMLCommandType.INSERT, plan); LogicalPlan command; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 20f05729822df49..ad0ba61453930dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -22,7 +22,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; @@ -102,7 +101,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { Pair pair = bind(ctx.cascadesContext, sink); Database database = pair.first; OlapTable table = pair.second; - boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType().isAggregationFamily(); LogicalPlan child = ((LogicalPlan) sink.child()); boolean childHasSeqCol = child.getOutput().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index d35edba1c1504e7..2881eb6fbdbb6f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -144,8 +144,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde boolean negative = taskInfo.getNegative(); // get partial update related info boolean isPartialUpdate = taskInfo.isPartialUpdate(); - if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) { - throw new UserException("Only unique key merge on write support partial update"); + if (isPartialUpdate && (!destTable.getKeysType().isAggregationFamily() || ( + destTable.getKeysType() == KeysType.UNIQUE_KEYS && !destTable.getEnableUniqueKeyMergeOnWrite()))) { + throw new UserException("Only unique key merge on write or agg key support partial update"); } HashSet partialUpdateInputColumns = new HashSet<>(); if (isPartialUpdate) { @@ -371,8 +372,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns boolean negative = taskInfo.getNegative(); // get partial update related info boolean isPartialUpdate = taskInfo.isPartialUpdate(); - if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) { - throw new UserException("Only unique key merge on write support partial update"); + if (isPartialUpdate && (!destTable.getKeysType().isAggregationFamily() || ( + destTable.getKeysType() == KeysType.UNIQUE_KEYS && !destTable.getEnableUniqueKeyMergeOnWrite()))) { + throw new UserException("Only unique key merge on write or agg key support partial update"); } HashSet partialUpdateInputColumns = new HashSet<>(); if (isPartialUpdate) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 67f3569091418a8..054c728be8a84d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -454,6 +454,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update"; + public static final String ENABLE_AGG_KEY_PARTIAL_UPDATE = "enable_agg_key_partial_update"; + public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold"; public static final String INVERTED_INDEX_MAX_EXPANSIONS = "inverted_index_max_expansions"; @@ -1493,6 +1495,9 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true) public boolean enableUniqueKeyPartialUpdate = false; + @VariableMgr.VarAttr(name = ENABLE_AGG_KEY_PARTIAL_UPDATE, needForward = true) + public boolean enableAggregateKeyPartialUpdate = false; + @VariableMgr.VarAttr(name = TEST_QUERY_CACHE_HIT, description = { "用于测试查询缓存是否命中,如果未命中指定类型的缓存,则会报错", "Used to test whether the query cache is hit. " @@ -2955,6 +2960,14 @@ public void setEnableUniqueKeyPartialUpdate(boolean enableUniqueKeyPartialUpdate this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate; } + public boolean isEnableAggregateKeyPartialUpdate() { + return enableAggregateKeyPartialUpdate; + } + + public void setEnableAggregateKeyPartialUpdate(boolean enableAggregateKeyPartialUpdate) { + this.enableAggregateKeyPartialUpdate = enableAggregateKeyPartialUpdate; + } + public int getLoadStreamPerNode() { return loadStreamPerNode; } diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.out b/regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.out new file mode 100644 index 000000000000000..f2df4b7993996b4 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.out @@ -0,0 +1,34 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 doris 1000 123 1 +2 doris2 2000 223 1 + +-- !2 -- +1 doris 200 123 1 +2 doris2 400 223 1 +4 \N 400 \N \N + +-- !3 -- +1 1 3 4 +2 2 4 5 +3 3 2 3 +4 4 1 2 + +-- !4 -- +1 2 3 4 +2 3 4 5 +3 4 2 3 +4 5 1 2 + +-- !5 -- +1 1 1 3 4 +2 2 2 4 5 +3 3 3 2 3 +4 4 4 1 2 + +-- !6 -- +1 1 1 3 4 +2 2 2 4 5 +3 3 3 2 3 +4 4 4 1 2 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.out b/regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.out new file mode 100644 index 000000000000000..a6e180e77527643 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.out @@ -0,0 +1,41 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !basic -- +1 doris 200 123 1 +2 doris2 400 223 1 + +-- !basic_with_duplicate -- +1 doris 444 123 1 +2 doris2 555 223 1 + +-- !basic_with_duplicate2 -- +1 doris 1111 123 1 +2 doris2 2222 223 1 + +-- !basic_with_new_keys -- +1 doris 1111 123 1 +2 doris2 2222 223 1 +3 "stranger" 500 \N \N +4 "foreigner" 600 \N \N + +-- !basic_with_new_keys2 -- +1 doris 1111 123 1 +2 doris2 2222 223 1 +3 "stranger" 500 \N 4321 +4 "foreigner" 600 \N 4321 + +-- !basic_with_new_keys_and_invalid -- +1 doris 1111 123 1 +2 doris2 2222 223 1 +3 "stranger" 500 \N 4321 +4 "foreigner" 600 \N 4321 +5 "stranger" \N \N \N +6 "foreigner" \N \N \N + +-- !basic_invalid -- +1 doris 1111 123 1 +2 doris2 2222 223 1 +3 "stranger" 500 \N 4321 +4 "foreigner" 600 \N 4321 +5 "stranger" \N \N \N +6 "foreigner" \N \N \N + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.groovy new file mode 100644 index 000000000000000..354b3743a77bc9b --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_insert_into.groovy @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_agg_partial_update_insert_into", "p0") { + + String db = context.config.getDbNameByFile(context.file) + sql "select 1;" // to create database + + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql "use ${db};" + sql "sync;" + + def tableName = "agg_partial_update_insert_into" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) REPLACE_IF_NOT_NULL NOT NULL COMMENT "用户姓名", + `score` int(11) REPLACE_IF_NOT_NULL NOT NULL COMMENT "用户得分", + `test` int(11) REPLACE_IF_NOT_NULL NULL COMMENT "null test", + `dft` int(11) REPLACE_IF_NOT_NULL DEFAULT "4321") + AGGREGATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + + sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)""" + qt_1 """ select * from ${tableName} order by id; """ + sql "set enable_agg_key_partial_update=true;" + sql "sync;" + sql """insert into ${tableName}(id,score) values(2,400),(1,200),(4,400)""" + qt_2 """ select * from ${tableName} order by id; """ + test { + sql """insert into ${tableName} values(2,400),(1,200),(4,400)""" + exception "Column count doesn't match value count" + } + sql """ DROP TABLE IF EXISTS ${tableName} """ + + def tableName2 = "agg_partial_update_insert_into2" + sql """ DROP TABLE IF EXISTS ${tableName2} """ + sql """create table ${tableName2} ( + k int null, + v int replace_if_not_null null, + v2 int replace_if_not_null null, + v3 int replace_if_not_null null + ) aggregate key (k) distributed by hash(k) buckets 1 + properties("replication_num" = "1"); """ + sql "set enable_agg_key_partial_update=false;" + sql "sync;" + sql "insert into ${tableName2} values(1,1,3,4),(2,2,4,5),(3,3,2,3),(4,4,1,2);" + qt_3 "select * from ${tableName2} order by k;" + sql "set enable_agg_key_partial_update=true;" + sql "sync;" + sql "insert into ${tableName2}(k,v) select v2,v3 from ${tableName2};" + qt_4 "select * from ${tableName2} order by k;" + sql """ DROP TABLE IF EXISTS ${tableName2}; """ + + def tableName3 = "agg_partial_update_insert_into3" + sql """ DROP TABLE IF EXISTS ${tableName3} """ + sql """create table ${tableName3} ( + k1 int null, + k2 int null, + k3 int null, + v1 int replace_if_not_null null, + v2 int replace_if_not_null null + ) aggregate key (k1,k2,k3) distributed by hash(k1,k2) buckets 4 + properties("replication_num" = "1"); """ + sql "set enable_agg_key_partial_update=false;" + sql "sync;" + sql "insert into ${tableName3} values(1,1,1,3,4),(2,2,2,4,5),(3,3,3,2,3),(4,4,4,1,2);" + qt_5 "select * from ${tableName3} order by k1;" + sql "set enable_agg_key_partial_update=true;" + sql "sync;" + test { + sql "insert into ${tableName3}(k1,k2,v2) select k2,k3,v1 from ${tableName3};" + exception "illegal partial update" + } + qt_6 "select * from ${tableName3} order by k1;" + sql """ DROP TABLE IF EXISTS ${tableName3}; """ + + sql "set enable_agg_key_partial_update=false;" + sql "sync;" + } +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.groovy new file mode 100644 index 000000000000000..403a6ec87b106e7 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_agg_partial_update_stream_load.groovy @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_agg_partial_update_stream_load", "p0") { + + String db = context.config.getDbNameByFile(context.file) + sql "select 1;" // to create database + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql "use ${db};" + def tableName = "test_agg_partial_update" + // create table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) REPLACE_IF_NOT_NULL NOT NULL COMMENT "用户姓名", + `score` int(11) REPLACE_IF_NOT_NULL NOT NULL COMMENT "用户得分", + `test` int(11) REPLACE_IF_NOT_NULL NULL COMMENT "null test", + `dft` int(11) REPLACE_IF_NOT_NULL DEFAULT "4321") + AGGREGATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + + // insert 2 lines + sql """ + insert into ${tableName} values(2, "doris2", 2000, 223, 1) + """ + + sql """ + insert into ${tableName} values(1, "doris", 1000, 123, 1) + """ + + // skip 3 lines and file have 4 lines + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score' + + file 'basic.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + + qt_basic """ + select * from ${tableName} order by id; + """ + + // partial update a row multiple times in one stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score' + + file 'basic_with_duplicate.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + + qt_basic_with_duplicate """ + select * from ${tableName} order by id; + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score' + + file 'basic_with_duplicate2.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + + qt_basic_with_duplicate2 """ + select * from ${tableName} order by id; + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name,score' + + file 'basic_with_new_keys.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + + qt_basic_with_new_keys """ + select * from ${tableName} order by id; + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'false' + set 'columns', 'id,name,score' + + file 'basic_with_new_keys.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + + qt_basic_with_new_keys2 """ + select * from ${tableName} order by id; + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name,score' + + file 'basic_with_new_keys_and_invalid.csv' + time 10000// limit inflight 10s + } + sql "sync" + qt_basic_with_new_keys_and_invalid """ + select * from ${tableName} order by id; + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score' + + file 'basic_invalid.csv' + time 10000// limit inflight 10s + + check { result, exception, startTime, endTime -> + assertTrue(exception == null) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) + assertEquals(3, json.NumberTotalRows) + assertEquals(1, json.NumberLoadedRows) + assertEquals(2, json.NumberFilteredRows) + } + } + sql "sync" + qt_basic_invalid """ + select * from ${tableName} order by id; + """ + + // drop drop + sql """ DROP TABLE IF EXISTS ${tableName} """ + } +}