Skip to content

Commit

Permalink
fix hidden sequenc_col not set when insert new rows in partial update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Sep 2, 2024
1 parent 795bffa commit 312ab3b
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 5 deletions.
9 changes: 9 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
"different from BE.");
}
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();

if (pschema.has_sequence_map_col_name()) {
_sequence_map_column = pschema.sequence_map_col_name();
}
}
_timestamp_ms = pschema.timestamp_ms();
_timezone = pschema.timezone();
Expand Down Expand Up @@ -198,6 +202,10 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
"different from BE.");
}
_auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;

if (tschema.__isset.sequence_map_col_name) {
_sequence_map_column = tschema.sequence_map_col_name;
}
}

for (const auto& tcolumn : tschema.partial_update_input_columns) {
Expand Down Expand Up @@ -273,6 +281,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_sequence_map_col_name(_sequence_map_column);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class OlapTableSchemaParam {
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
std::string debug_string() const;
std::string sequence_map_column() const { return _sequence_map_column; }

private:
int64_t _db_id;
Expand All @@ -114,6 +115,7 @@ class OlapTableSchemaParam {
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
std::string _sequence_map_column {};
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
std::string _timezone;
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@ Status BaseTablet::generate_default_value_block(const TabletSchema& schema,
auto mutable_default_value_columns = default_value_block.mutate_columns();
for (auto i = 0; i < cids.size(); ++i) {
const auto& column = schema.column(cids[i]);
if (column.has_default_value()) {
if (column.has_default_value() ||
(column.name() == SEQUENCE_COL && schema.sequence_col_use_default_value())) {
const auto& default_value = default_values[i];
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
Expand Down Expand Up @@ -957,6 +958,8 @@ Status BaseTablet::generate_new_block_for_partial_update(

CHECK(update_rows >= old_rows);

bool sequence_col_use_default_value = rowset_schema->sequence_col_use_default_value();

// build full block
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& rs_column = rowset_schema->column(missing_cids[i]);
Expand All @@ -975,7 +978,8 @@ Status BaseTablet::generate_new_block_for_partial_update(
mutable_column->insert_default();
} else if (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0) {
if (rs_column.has_default_value()) {
if (rs_column.has_default_value() ||
(rs_column.name() == SEQUENCE_COL && sequence_col_use_default_value)) {
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
} else if (rs_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_tablet_schema->set_db_id(table_schema_param->db_id());
if (table_schema_param->is_partial_update()) {
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
_tablet_schema->set_sequence_map_column(table_schema_param->sequence_map_column());
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
Expand Down
23 changes: 21 additions & 2 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "vec/common/assert_cast.h"
Expand Down Expand Up @@ -135,12 +136,16 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
LOG_INFO("_generate_default_values_for_missing_cids: column.default_value()={}",
column.default_value());
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
std::string::npos)) {
LOG_INFO("_generate_default_values_for_missing_cids: column.default_value()={}",
column.default_value());
DateV2Value<DateV2ValueType> dv;
dv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dv.debug_string();
Expand All @@ -153,6 +158,20 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
default_values.emplace_back();
}
}
if (!tablet_schema.sequence_map_column().empty() &&
!partial_update_input_columns.contains(tablet_schema.sequence_map_column())) {
auto it = std::find(missing_cids.cbegin(), missing_cids.cend(),
tablet_schema.sequence_col_idx());
DCHECK(it != missing_cids.cend());
std::size_t seq_col_idx_in_missing_cids = std::distance(missing_cids.cbegin(), it);
it = std::find(missing_cids.cbegin(), missing_cids.cend(),
tablet_schema.field_index(tablet_schema.sequence_map_column()));
DCHECK(it != missing_cids.cend());
std::size_t seq_map_col_idx_in_missing_cids = std::distance(missing_cids.cbegin(), it);
default_values[seq_col_idx_in_missing_cids] =
default_values[seq_map_col_idx_in_missing_cids];
}

CHECK_EQ(missing_cids.size(), default_values.size());
}

Expand Down Expand Up @@ -232,7 +251,7 @@ Status PartialUpdateReadPlan::fill_missing_columns(
old_value_block, default_value_block));
}
auto mutable_default_value_columns = default_value_block.mutate_columns();

bool sequence_col_use_default_value = tablet_schema.sequence_col_use_default_value();
// fill all missing value from mutable_old_columns, need to consider default value and null value
for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
// `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row
Expand All @@ -250,7 +269,7 @@ Status PartialUpdateReadPlan::fill_missing_columns(
const auto& tablet_column = tablet_schema.column(missing_cids[i]);
auto& missing_col = mutable_full_columns[missing_cids[i]];
// clang-format off
if (tablet_column.has_default_value()) {
if (tablet_column.has_default_value() || (tablet_column.name() == SEQUENCE_COL && sequence_col_use_default_value)) {
missing_col->insert_from(*mutable_default_value_columns[i].get(), 0);
} else if (tablet_column.is_nullable()) {
auto* nullable_column =
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
_tablet_schema->set_db_id(table_schema_param->db_id());
if (table_schema_param->is_partial_update()) {
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
_tablet_schema->set_sequence_map_column(table_schema_param->sequence_map_column());
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,15 @@ class TabletSchema {
_auto_increment_column = auto_increment_column;
}
std::string auto_increment_column() const { return _auto_increment_column; }

void set_sequence_map_column(std::string name) { _sequence_map_column = name; }
std::string sequence_map_column() const { return _sequence_map_column; }
bool sequence_col_use_default_value() const {
if (!_sequence_map_column.empty()) {
auto seq_map_column = *DORIS_TRY(column(_sequence_map_column));
return seq_map_column.has_default_value();
}
return false;
}
void set_table_id(int64_t table_id) { _table_id = table_id; }
int64_t table_id() const { return _table_id; }
void set_db_id(int64_t db_id) { _db_id = db_id; }
Expand Down Expand Up @@ -522,6 +530,7 @@ class TabletSchema {
long _row_store_page_size = segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
size_t _next_column_unique_id = 0;
std::string _auto_increment_column;
std::string _sequence_map_column;

bool _has_bf_fpp = false;
double _bf_fpp = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a
schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId());
}
}
if (table.getSequenceMapCol() != null) {
Column seqMapCol = table.getFullSchema().stream()
.filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
.findFirst().get();
schemaParam.setSequenceMapColName(seqMapCol.getName());
}
}
schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat());
return schemaParam;
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ message POlapTableSchemaParam {
optional int64 timestamp_ms = 11 [default = 0];
optional string timezone = 12;
optional int32 auto_increment_column_unique_id = 13 [default = -1];
optional string sequence_map_col_name = 14;
};

1 change: 1 addition & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ struct TOlapTableSchemaParam {
11: optional string auto_increment_column
12: optional i32 auto_increment_column_unique_id = -1
13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1
14: optional string sequence_map_col_name;
}

struct TTabletLocation {
Expand Down

0 comments on commit 312ab3b

Please sign in to comment.