diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 3d73bf1bd886de8..7f84f28bc6b401b 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -127,6 +127,10 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { "different from BE."); } _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); + + if (pschema.has_sequence_map_col_name()) { + _sequence_map_column = pschema.sequence_map_col_name(); + } } _timestamp_ms = pschema.timestamp_ms(); _timezone = pschema.timezone(); @@ -198,6 +202,10 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { "different from BE."); } _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; + + if (tschema.__isset.sequence_map_col_name) { + _sequence_map_column = tschema.sequence_map_col_name; + } } for (const auto& tcolumn : tschema.partial_update_input_columns) { @@ -273,6 +281,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); pschema->set_timestamp_ms(_timestamp_ms); pschema->set_timezone(_timezone); + pschema->set_sequence_map_col_name(_sequence_map_column); for (auto col : _partial_update_input_columns) { *pschema->add_partial_update_input_columns() = col; } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index fcba8fd82623bb9..24d125949f788dc 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -100,6 +100,7 @@ class OlapTableSchemaParam { std::string timezone() const { return _timezone; } bool is_strict_mode() const { return _is_strict_mode; } std::string debug_string() const; + std::string sequence_map_column() const { return _sequence_map_column; } private: int64_t _db_id; @@ -114,6 +115,7 @@ class OlapTableSchemaParam { std::set _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; + std::string _sequence_map_column {}; int32_t _auto_increment_column_unique_id; int64_t _timestamp_ms = 0; std::string _timezone; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 143c1ad706bbe7b..009c2f0821689b1 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -885,7 +885,8 @@ Status BaseTablet::generate_default_value_block(const TabletSchema& schema, auto mutable_default_value_columns = default_value_block.mutate_columns(); for (auto i = 0; i < cids.size(); ++i) { const auto& column = schema.column(cids[i]); - if (column.has_default_value()) { + if (column.has_default_value() || + (column.name() == SEQUENCE_COL && schema.sequence_col_use_default_value())) { const auto& default_value = default_values[i]; vectorized::ReadBuffer rb(const_cast(default_value.c_str()), default_value.size()); @@ -957,6 +958,8 @@ Status BaseTablet::generate_new_block_for_partial_update( CHECK(update_rows >= old_rows); + bool sequence_col_use_default_value = rowset_schema->sequence_col_use_default_value(); + // build full block for (auto i = 0; i < missing_cids.size(); ++i) { const auto& rs_column = rowset_schema->column(missing_cids[i]); @@ -975,7 +978,8 @@ Status BaseTablet::generate_new_block_for_partial_update( mutable_column->insert_default(); } else if (old_block_delete_signs != nullptr && old_block_delete_signs[read_index_old[idx]] != 0) { - if (rs_column.has_default_value()) { + if (rs_column.has_default_value() || + (rs_column.name() == SEQUENCE_COL && sequence_col_use_default_value)) { mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0); } else if (rs_column.is_nullable()) { assert_cast( diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 4b9f4231bf1745f..8407d421aa031be 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -234,6 +234,7 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_db_id(table_schema_param->db_id()); if (table_schema_param->is_partial_update()) { _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn()); + _tablet_schema->set_sequence_map_column(table_schema_param->sequence_map_column()); } // set partial update columns info _partial_update_info = std::make_shared(); diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index bff3f4196369db2..99ee4000ca342d4 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -23,6 +23,7 @@ #include "olap/olap_common.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/tablet_meta.h" #include "olap/tablet_schema.h" #include "olap/utils.h" #include "vec/common/assert_cast.h" @@ -135,12 +136,16 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids( if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) != std::string::npos)) { + LOG_INFO("_generate_default_values_for_missing_cids: column.default_value()={}", + column.default_value()); DateV2Value dtv; dtv.from_unixtime(timestamp_ms / 1000, timezone); default_value = dtv.debug_string(); } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 && to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) != std::string::npos)) { + LOG_INFO("_generate_default_values_for_missing_cids: column.default_value()={}", + column.default_value()); DateV2Value dv; dv.from_unixtime(timestamp_ms / 1000, timezone); default_value = dv.debug_string(); @@ -153,6 +158,20 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids( default_values.emplace_back(); } } + if (!tablet_schema.sequence_map_column().empty() && + !partial_update_input_columns.contains(tablet_schema.sequence_map_column())) { + auto it = std::find(missing_cids.cbegin(), missing_cids.cend(), + tablet_schema.sequence_col_idx()); + DCHECK(it != missing_cids.cend()); + std::size_t seq_col_idx_in_missing_cids = std::distance(missing_cids.cbegin(), it); + it = std::find(missing_cids.cbegin(), missing_cids.cend(), + tablet_schema.field_index(tablet_schema.sequence_map_column())); + DCHECK(it != missing_cids.cend()); + std::size_t seq_map_col_idx_in_missing_cids = std::distance(missing_cids.cbegin(), it); + default_values[seq_col_idx_in_missing_cids] = + default_values[seq_map_col_idx_in_missing_cids]; + } + CHECK_EQ(missing_cids.size(), default_values.size()); } @@ -232,7 +251,7 @@ Status PartialUpdateReadPlan::fill_missing_columns( old_value_block, default_value_block)); } auto mutable_default_value_columns = default_value_block.mutate_columns(); - + bool sequence_col_use_default_value = tablet_schema.sequence_col_use_default_value(); // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row @@ -250,7 +269,7 @@ Status PartialUpdateReadPlan::fill_missing_columns( const auto& tablet_column = tablet_schema.column(missing_cids[i]); auto& missing_col = mutable_full_columns[missing_cids[i]]; // clang-format off - if (tablet_column.has_default_value()) { + if (tablet_column.has_default_value() || (tablet_column.name() == SEQUENCE_COL && sequence_col_use_default_value)) { missing_col->insert_from(*mutable_default_value_columns[i].get(), 0); } else if (tablet_column.is_nullable()) { auto* nullable_column = diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 39fcc3f6c231ab3..a24a54e8b0171f5 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -415,6 +415,7 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_db_id(table_schema_param->db_id()); if (table_schema_param->is_partial_update()) { _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn()); + _tablet_schema->set_sequence_map_column(table_schema_param->sequence_map_column()); } // set partial update columns info _partial_update_info = std::make_shared(); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 7bf40c65f7a91af..d2dba90afcd903c 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -406,7 +406,15 @@ class TabletSchema { _auto_increment_column = auto_increment_column; } std::string auto_increment_column() const { return _auto_increment_column; } - + void set_sequence_map_column(std::string name) { _sequence_map_column = name; } + std::string sequence_map_column() const { return _sequence_map_column; } + bool sequence_col_use_default_value() const { + if (!_sequence_map_column.empty()) { + auto seq_map_column = *DORIS_TRY(column(_sequence_map_column)); + return seq_map_column.has_default_value(); + } + return false; + } void set_table_id(int64_t table_id) { _table_id = table_id; } int64_t table_id() const { return _table_id; } void set_db_id(int64_t db_id) { _db_id = db_id; } @@ -522,6 +530,7 @@ class TabletSchema { long _row_store_page_size = segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE; size_t _next_column_unique_id = 0; std::string _auto_increment_column; + std::string _sequence_map_column; bool _has_bf_fpp = false; double _bf_fpp = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 6c2cb8bd130a0f8..b6ddf6fe799e856 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -327,6 +327,12 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId()); } } + if (table.getSequenceMapCol() != null) { + Column seqMapCol = table.getFullSchema().stream() + .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) + .findFirst().get(); + schemaParam.setSequenceMapColName(seqMapCol.getName()); + } } schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); return schemaParam; diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 21fc646c92d12d6..60f85fae4d395c0 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -73,5 +73,6 @@ message POlapTableSchemaParam { optional int64 timestamp_ms = 11 [default = 0]; optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; + optional string sequence_map_col_name = 14; }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index e11160caa7f5afa..2bfb1fe88cd44f5 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -254,6 +254,7 @@ struct TOlapTableSchemaParam { 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 + 14: optional string sequence_map_col_name; } struct TTabletLocation {