From 4ac1288f23c9f1c8fd370a224d270f6955c318e7 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Tue, 25 Jun 2024 14:01:00 +0800 Subject: [PATCH] [Fix](Variant) fix variant partial update with row store enabled --- be/src/vec/columns/column_object.h | 8 -- be/src/vec/common/schema_util.cpp | 135 +----------------- be/src/vec/common/schema_util.h | 8 -- .../serde/data_type_object_serde.cpp | 10 +- .../data/variant_p0/delete_update.out | 4 +- .../suites/variant_p0/delete_update.groovy | 8 +- 6 files changed, 17 insertions(+), 156 deletions(-) diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 5f2cf129b99b38..f19d51796a8332 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -230,10 +230,6 @@ class ColumnObject final : public COWHelper { // this structure and fill with Subcolumns sub items mutable std::shared_ptr doc_structure; - // column with raw json strings - // used for quickly row store encoding - ColumnPtr rowstore_column; - using SubColumnWithName = std::pair; // Cached search results for previous row (keyed as index in JSON object) - used as a hint. mutable std::vector _prev_positions; @@ -259,10 +255,6 @@ class ColumnObject final : public COWHelper { return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable(); } - void set_rowstore_column(ColumnPtr col) { rowstore_column = col; } - - ColumnPtr get_rowstore_column() const { return rowstore_column; } - Status serialize_one_row_to_string(int row, std::string* output) const; Status serialize_one_row_to_string(int row, BufferWritable& output) const; diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 9f3975576df766..016336d4098d1c 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -492,36 +492,8 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, var.assume_mutable_ref().finalize(); MutableColumnPtr variant_column; - bool record_raw_string_with_serialization = false; - // set - auto encode_rowstore = [&]() { - if (!ctx.record_raw_json_column) { - return Status::OK(); - } - auto* var = static_cast(variant_column.get()); - if (record_raw_string_with_serialization) { - // encode to raw json column - auto raw_column = vectorized::ColumnString::create(); - for (size_t i = 0; i < var->rows(); ++i) { - std::string raw_str; - RETURN_IF_ERROR(var->serialize_one_row_to_string(i, &raw_str)); - raw_column->insert_data(raw_str.c_str(), raw_str.size()); - } - var->set_rowstore_column(raw_column->get_ptr()); - } else { - // use original input json column - auto original_var_root = vectorized::check_and_get_column( - remove_nullable(column_ref).get()) - ->get_root(); - var->set_rowstore_column(original_var_root); - } - return Status::OK(); - }; - if (!var.is_scalar_variant()) { variant_column = var.assume_mutable(); - record_raw_string_with_serialization = true; - RETURN_IF_ERROR(encode_rowstore()); // already parsed continue; } @@ -558,8 +530,6 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, result = ColumnNullable::create(result, null_map); } block.get_by_position(variant_pos[i]).column = result; - RETURN_IF_ERROR(encode_rowstore()); - // block.get_by_position(variant_pos[i]).type = std::make_shared("json", true); } return Status::OK(); } @@ -600,35 +570,6 @@ Status encode_variant_sparse_subcolumns(ColumnObject& column) { return Status::OK(); } -static void _append_column(const TabletColumn& parent_variant, - const ColumnObject::Subcolumns::NodePtr& subcolumn, - TabletSchemaSPtr& to_append, bool is_sparse) { - // If column already exist in original tablet schema, then we pick common type - // and cast column to common type, and modify tablet column to common type, - // otherwise it's a new column - CHECK(to_append.use_count() == 1); - const std::string& column_name = - parent_variant.name_lower_case() + "." + subcolumn->path.get_path(); - const vectorized::DataTypePtr& final_data_type_from_object = - subcolumn->data.get_least_common_type(); - vectorized::PathInDataBuilder full_path_builder; - auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false) - .append(subcolumn->path.get_parts(), false) - .build(); - TabletColumn tablet_column = vectorized::schema_util::get_column_by_type( - final_data_type_from_object, column_name, - vectorized::schema_util::ExtraInfo {.unique_id = -1, - .parent_unique_id = parent_variant.unique_id(), - .path_info = full_path}); - - if (!is_sparse) { - to_append->append_column(std::move(tablet_column)); - } else { - to_append->mutable_column_by_uid(parent_variant.unique_id()) - .append_sparse_column(std::move(tablet_column)); - } -} - // sort by paths in lexicographical order vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns) { @@ -640,70 +581,12 @@ vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( return sorted; } -void rebuild_schema_and_block(const TabletSchemaSPtr& original, - const std::vector& variant_positions, Block& flush_block, - TabletSchemaSPtr& flush_schema) { - // rebuild schema and block with variant extracted columns - - // 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema - // those columns are extracted columns, leave none extracted columns remain in original variant column, which is - // JSONB format at present. - // 2. Collect columns that need to be added or modified when data type changes or new columns encountered - for (size_t variant_pos : variant_positions) { - auto column_ref = flush_block.get_by_position(variant_pos).column; - bool is_nullable = column_ref->is_nullable(); - const vectorized::ColumnObject& object_column = assert_cast( - remove_nullable(column_ref)->assume_mutable_ref()); - const TabletColumn& parent_column = *original->columns()[variant_pos]; - CHECK(object_column.is_finalized()); - std::shared_ptr root; - // common extracted columns - for (const auto& entry : get_sorted_subcolumns(object_column.get_subcolumns())) { - if (entry->path.empty()) { - // root - root = entry; - continue; - } - _append_column(parent_column, entry, flush_schema, false); - const std::string& column_name = - parent_column.name_lower_case() + "." + entry->path.get_path(); - flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(), - entry->data.get_least_common_type(), column_name}); - } - - // add sparse columns to flush_schema - for (const auto& entry : get_sorted_subcolumns(object_column.get_sparse_subcolumns())) { - _append_column(parent_column, entry, flush_schema, true); - } - - // Create new variant column and set root column - auto obj = vectorized::ColumnObject::create(true, false); - // '{}' indicates a root path - static_cast(obj.get())->add_sub_column( - {}, root->data.get_finalized_column_ptr()->assume_mutable(), - root->data.get_least_common_type()); - // // set for rowstore - if (original->has_row_store_for_all_columns()) { - static_cast(obj.get())->set_rowstore_column( - object_column.get_rowstore_column()); - } - vectorized::ColumnPtr result = obj->get_ptr(); - if (is_nullable) { - const auto& null_map = assert_cast(*column_ref) - .get_null_map_column_ptr(); - result = vectorized::ColumnNullable::create(result, null_map); - } - flush_block.get_by_position(variant_pos).column = result; - vectorized::PathInDataBuilder full_root_path_builder; - auto full_root_path = - full_root_path_builder.append(parent_column.name_lower_case(), false).build(); - TabletColumn new_col = flush_schema->column(variant_pos); - new_col.set_path_info(full_root_path); - flush_schema->replace_column(variant_pos, new_col); - VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); - } +// --------------------------- - vectorized::schema_util::inherit_column_attributes(flush_schema); +std::string dump_column(DataTypePtr type, const ColumnPtr& col) { + Block tmp; + tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()}); + return tmp.dump_data(0, tmp.rows()); } // --------------------------- @@ -734,13 +617,5 @@ Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst) ->assume_mutable(); return Status::OK(); } -// --------------------------- - -std::string dump_column(DataTypePtr type, const ColumnPtr& col) { - Block tmp; - tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()}); - return tmp.dump_data(0, tmp.rows()); -} -// --------------------------- } // namespace doris::vectorized::schema_util diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index f519e4dacae376..162885414159e0 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -124,14 +124,6 @@ void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns); -// Rebuild schema from original schema by extend dynamic columns generated from ColumnObject. -// Block consists of two parts, dynamic part of columns and static part of columns. -// static extracted -// | --------- | ----------- | -// The static ones are original tablet_schame columns -void rebuild_schema_and_block(const TabletSchemaSPtr& original, const std::vector& variant_pos, - Block& flush_block, TabletSchemaSPtr& flush_schema); - // Extract json data from source with path Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst); diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 77fef42df55da7..b72c295d8d0a9c 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -87,12 +87,14 @@ void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr const_cast(variant).finalize(); } result.writeKey(col_id); + std::string value_str; + if (!variant.serialize_one_row_to_string(row_num, &value_str)) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}", + variant.dump_structure()); + } JsonbParser json_parser; - CHECK(variant.get_rowstore_column() != nullptr); - // use original document - const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num); // encode as jsonb - bool succ = json_parser.parse(data_ref.data, data_ref.size); + bool succ = json_parser.parse(value_str.data(), value_str.size()); // maybe more graceful, it is ok to check here since data could be parsed CHECK(succ); result.writeStartBinary(); diff --git a/regression-test/data/variant_p0/delete_update.out b/regression-test/data/variant_p0/delete_update.out index 0cf801f1c004e9..4390610c21df33 100644 --- a/regression-test/data/variant_p0/delete_update.out +++ b/regression-test/data/variant_p0/delete_update.out @@ -7,12 +7,12 @@ -- !sql -- 2 {"updated_value":123} {"updated_value":123} -6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123} +6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123} 7 {"updated_value":1111} yyy -- !sql -- 2 {"updated_value":123} {"updated_value":123} -6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123} +6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123} -- !sql -- 1 "ddddddddddd" 1111 199 10 {"new_data1":1} diff --git a/regression-test/suites/variant_p0/delete_update.groovy b/regression-test/suites/variant_p0/delete_update.groovy index ed9667e7f5b324..2b126b4c3a6616 100644 --- a/regression-test/suites/variant_p0/delete_update.groovy +++ b/regression-test/suites/variant_p0/delete_update.groovy @@ -53,14 +53,14 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 4 - properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); + properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true"); """ sql "insert into var_delete_update_mow select k, cast(v as string), cast(v as string) from var_delete_update" sql "delete from ${table_name} where k = 1" sql "delete from ${table_name} where k in (select k from var_delete_update_mow where k in (3, 4, 5))" - sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.0}', 'xxx')""" - sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.0}', 'yyy')""" + sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.1}', 'xxx')""" + sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.1}', 'yyy')""" sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' where k = 6""" sql """update var_delete_update_mow set v = '{"updated_value":1111}' where k = 7""" qt_sql "select * from var_delete_update_mow order by k" @@ -108,7 +108,7 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ `dft` int(11) DEFAULT "4321", `var` variant NULL) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true") + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true", "store_row_column" = "true") """ sql """insert into ${tableName} values