diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 7f84f28bc6b401b..24e55f2d896a89c 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -133,6 +133,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { } } _timestamp_ms = pschema.timestamp_ms(); + if (pschema.has_nano_seconds()) { + _nano_seconds = pschema.nano_seconds(); + } _timezone = pschema.timezone(); for (const auto& col : pschema.partial_update_input_columns()) { @@ -281,6 +284,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); pschema->set_timestamp_ms(_timestamp_ms); pschema->set_timezone(_timezone); + pschema->set_nano_seconds(_nano_seconds); pschema->set_sequence_map_col_name(_sequence_map_column); for (auto col : _partial_update_input_columns) { *pschema->add_partial_update_input_columns() = col; diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 24d125949f788dc..1e500f3b1fc862e 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -96,6 +96,8 @@ class OlapTableSchemaParam { int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } int64_t timestamp_ms() const { return _timestamp_ms; } + void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; } + int32_t nano_seconds() const { return _nano_seconds; } void set_timezone(std::string timezone) { _timezone = timezone; } std::string timezone() const { return _timezone; } bool is_strict_mode() const { return _is_strict_mode; } @@ -118,6 +120,7 @@ class OlapTableSchemaParam { std::string _sequence_map_column {}; int32_t _auto_increment_column_unique_id; int64_t _timestamp_ms = 0; + int32_t _nano_seconds {0}; std::string _timezone; }; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 8407d421aa031be..f62aa9e80de1251 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -241,7 +241,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->timestamp_ms(), + table_schema_param->nano_seconds(), table_schema_param->timezone(), table_schema_param->auto_increment_coulumn()); } diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index 99ee4000ca342d4..102f7679c1cde23 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -33,12 +33,14 @@ namespace doris { void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update, const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, const std::string& timezone, - const std::string& auto_increment_column, int64_t cur_max_version) { + int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, const std::string& auto_increment_column, + int64_t cur_max_version) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; + this->nano_seconds = nano_seconds; this->timezone = timezone; missing_cids.clear(); update_cids.clear(); @@ -79,6 +81,7 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const can_insert_new_rows_in_partial_update); partial_update_info_pb->set_is_strict_mode(is_strict_mode); partial_update_info_pb->set_timestamp_ms(timestamp_ms); + partial_update_info_pb->set_nano_seconds(nano_seconds); partial_update_info_pb->set_timezone(timezone); partial_update_info_pb->set_is_input_columns_contains_auto_inc_column( is_input_columns_contains_auto_inc_column); @@ -115,6 +118,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { partial_update_info_pb->is_input_columns_contains_auto_inc_column(); is_schema_contains_auto_inc_column = partial_update_info_pb->is_schema_contains_auto_inc_column(); + if (partial_update_info_pb->has_nano_seconds()) { + nano_seconds = partial_update_info_pb->nano_seconds(); + } default_values.clear(); for (const auto& value : partial_update_info_pb->default_values()) { default_values.push_back(value); @@ -136,16 +142,20 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids( if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) != std::string::npos)) { - LOG_INFO("_generate_default_values_for_missing_cids: column.default_value()={}", - column.default_value()); - DateV2Value dtv; - dtv.from_unixtime(timestamp_ms / 1000, timezone); - default_value = dtv.debug_string(); + auto pos = to_lower(column.default_value()).find('('); + if (pos == std::string::npos) { + DateV2Value dtv; + dtv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dtv.debug_string(); + } else { + int precision = std::stoi(column.default_value().substr(pos + 1)); + DateV2Value dtv; + dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision); + default_value = dtv.debug_string(); + } } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 && to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) != std::string::npos)) { - LOG_INFO("_generate_default_values_for_missing_cids: column.default_value()={}", - column.default_value()); DateV2Value dv; dv.from_unixtime(timestamp_ms / 1000, timezone); default_value = dv.debug_string(); diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index a99bf7181184f40..71b3ab8033e579f 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -39,7 +39,7 @@ struct RowsetId; struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, const std::string& timezone, + int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); @@ -59,6 +59,7 @@ struct PartialUpdateInfo { bool can_insert_new_rows_in_partial_update {true}; bool is_strict_mode {false}; int64_t timestamp_ms {0}; + int32_t nano_seconds {0}; std::string timezone; bool is_input_columns_contains_auto_inc_column = false; bool is_schema_contains_auto_inc_column = false; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index a24a54e8b0171f5..1a4e3718ceeaf61 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -419,12 +419,12 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn(), - _max_version_in_flush_phase); + _partial_update_info->init( + *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), + table_schema_param->nano_seconds(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase); } } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 8e10d2da5f08e06..9ad02634cb4b15b 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1146,6 +1146,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { _schema.reset(new OlapTableSchemaParam()); RETURN_IF_ERROR(_schema->init(table_sink.schema)); _schema->set_timestamp_ms(state->timestamp_ms()); + _schema->set_nano_seconds(state->nano_seconds()); _schema->set_timezone(state->timezone()); _location = _pool->add(new OlapTableLocationParam(table_sink.location)); _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 780ea8283bdf3d3..ff0ba8f9d595dda 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -150,6 +150,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { _schema.reset(new OlapTableSchemaParam()); RETURN_IF_ERROR(_schema->init(table_sink.schema)); _schema->set_timestamp_ms(state->timestamp_ms()); + _schema->set_nano_seconds(state->nano_seconds()); _schema->set_timezone(state->timezone()); _location = _pool->add(new OlapTableLocationParam(table_sink.location)); _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 60f85fae4d395c0..9e46ba9ab823dca 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -74,5 +74,6 @@ message POlapTableSchemaParam { optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; optional string sequence_map_col_name = 14; + optional int32 nano_seconds = 15 [default = 0]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 2e9fa94a343f357..9032b5ba4abe0fb 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -624,4 +624,5 @@ message PartialUpdateInfoPB { optional bool is_schema_contains_auto_inc_column = 10 [default = false]; repeated string default_values = 11; optional int64 max_version_in_flush_phase = 12 [default = -1]; + optional int32 nano_seconds = 13 [default = 0]; } diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_map_col.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_map_col.out new file mode 100644 index 000000000000000..ef8a5b860f636ac --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_map_col.out @@ -0,0 +1,97 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql2 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql3 -- +1 1 999 999 +2 2 999 999 +3 3 999 999 +4 4 999 999 + +-- !sql4 -- +1 1 \N \N +2 2 \N \N +3 3 \N \N +4 4 \N \N + +-- !sql1 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql2 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql3 -- +1 1 999 999 +2 2 999 999 +3 3 999 999 +4 4 999 999 + +-- !sql4 -- +1 1 \N \N +2 2 \N \N +3 3 \N \N +4 4 \N \N + +-- !sql1 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql2 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql3 -- +1 1 999 999 +2 2 999 999 +3 3 999 999 +4 4 999 999 + +-- !sql4 -- +1 1 \N \N +2 2 \N \N +3 3 \N \N +4 4 \N \N + +-- !sql1 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql2 -- +1 1 +2 2 +3 3 +4 4 + +-- !sql3 -- +1 1 999 999 +2 2 999 999 +3 3 999 999 +4 4 999 999 + +-- !sql4 -- +1 1 \N \N +2 2 \N \N +3 3 \N \N +4 4 \N \N + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_map_col.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_map_col.groovy new file mode 100644 index 000000000000000..2005b2f9c319be7 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_map_col.groovy @@ -0,0 +1,133 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_seq_map_col", "p0") { + for (def use_nereids : [true, false]) { + for (def use_row_store : [false, true]) { + logger.info("current params: use_nereids: ${use_nereids}, use_row_store: ${use_row_store}") + if (use_nereids) { + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_planner = false; """ + } + sql "set enable_insert_strict=false;" + sql "set enable_unique_key_partial_update=true;" + sql "set show_hidden_columns=true;" + sql "sync;" + + def tableName = "test_partial_update_seq_map_col1" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` datetime(6) null default current_timestamp(6), + ) UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "c2", + "store_row_column" = "${use_row_store}"); """ + sql "insert into ${tableName}(k,c1) values(1,1);" + sql "insert into ${tableName}(k,c1) values(2,2);" + sql "insert into ${tableName}(k,c1) values(3,3);" + sql "insert into ${tableName}(k,c1) values(4,4);" + qt_sql1 "select k,c1 from ${tableName} where c2=__DORIS_SEQUENCE_COL__;" + + + tableName = "test_partial_update_seq_map_col2" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` datetime(6) not null default current_timestamp, + ) UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "c2", + "store_row_column" = "${use_row_store}"); """ + sql "insert into ${tableName}(k,c1) values(1,1);" + sql "insert into ${tableName}(k,c1) values(2,2);" + sql "insert into ${tableName}(k,c1) values(3,3);" + sql "insert into ${tableName}(k,c1) values(4,4);" + qt_sql2 "select k,c1 from ${tableName} where c2=__DORIS_SEQUENCE_COL__;" + + + tableName = "test_partial_update_seq_map_col3" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int not null default "999", + ) UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "c2", + "store_row_column" = "${use_row_store}"); """ + sql "insert into ${tableName}(k,c1) values(1,1);" + sql "insert into ${tableName}(k,c1) values(2,2);" + sql "insert into ${tableName}(k,c1) values(3,3);" + sql "insert into ${tableName}(k,c1) values(4,4);" + qt_sql3 "select k,c1,c2,__DORIS_SEQUENCE_COL__ from ${tableName} where c2=__DORIS_SEQUENCE_COL__;" + + + tableName = "test_partial_update_seq_map_col4" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int null, + ) UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "c2", + "store_row_column" = "${use_row_store}"); """ + sql "insert into ${tableName}(k,c1) values(1,1);" + sql "insert into ${tableName}(k,c1) values(2,2);" + sql "insert into ${tableName}(k,c1) values(3,3);" + sql "insert into ${tableName}(k,c1) values(4,4);" + qt_sql4 "select k,c1,c2,__DORIS_SEQUENCE_COL__ from ${tableName};" + + + tableName = "test_partial_update_seq_map_col5" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int not null + ) UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "c2", + "store_row_column" = "${use_row_store}"); """ + test { + sql "insert into ${tableName}(k,c1) values(1,1);" + exception "the unmentioned column `c2` should have default value or be nullable for newly inserted rows in non-strict mode partial update" + } + } + } +}