Skip to content

Commit

Permalink
[Fix](partial update) Fix __DORIS_SEQUENCE_COL__ is not set for new…
Browse files Browse the repository at this point in the history
…ly inserted rows in partial update (#40272)

## Proposed changes

### 1. Fix `__DORIS_SEQUENCE_COL__` is not set for newly inserted rows
in partial update
before:
```sql
MySQL [email protected]:d1> CREATE TABLE IF NOT EXISTS t3 (
                  ->   `k` BIGINT NOT NULL,
                  ->   `c1` int,
                  ->   `c2` datetime default current_timestamp,
                  ->  ) UNIQUE KEY(`k`)
                  ->  DISTRIBUTED BY HASH(`k`) BUCKETS 1
                  ->  PROPERTIES (
                  ->  "replication_allocation" = "tag.location.default: 1",
                  ->  "enable_unique_key_merge_on_write" = "true",
                  ->  "function_column.sequence_col" = "c2"
                  ->  );
Query OK, 0 rows affected
MySQL [email protected]:d1> set enable_insert_strict=false;
MySQL [email protected]:d1> set enable_unique_key_partial_update=true;
MySQL [email protected]:d1> insert into t3(k,c1) values(99,99);
MySQL [email protected]:d1> set show_hidden_columns=true;
MySQL [email protected]:d1> select * from t3;
+----+----+---------------------+-----------------------+-----------------------+------------------------+
| k  | c1 | c2                  | __DORIS_DELETE_SIGN__ | __DORIS_VERSION_COL__ | __DORIS_SEQUENCE_COL__ |
+----+----+---------------------+-----------------------+-----------------------+------------------------+
| 99 | 99 | 2024-09-02 11:03:09 | 0                     | 2                     | <null>                 |
+----+----+---------------------+-----------------------+-----------------------+------------------------+
```
after:
```sql
MySQL [email protected]:d1> CREATE TABLE IF NOT EXISTS t3 (
                  ->   `k` BIGINT NOT NULL,
                  ->   `c1` int,
                  ->   `c2` datetime default current_timestamp,
                  ->  ) UNIQUE KEY(`k`)
                  ->  DISTRIBUTED BY HASH(`k`) BUCKETS 1
                  ->  PROPERTIES (
                  ->  "replication_allocation" = "tag.location.default: 1",
                  ->  "enable_unique_key_merge_on_write" = "true",
                  ->  "function_column.sequence_col" = "c2"
                  ->  );
Query OK, 0 rows affected
MySQL [email protected]:d1> set enable_insert_strict=false;
MySQL [email protected]:d1> set enable_unique_key_partial_update=true;
MySQL [email protected]:d1> insert into t3(k,c1) values(1,10);
MySQL [email protected]:d1> set show_hidden_columns=true;
MySQL [email protected]:d1> select * from t3;
+---+----+---------------------+-----------------------+-----------------------+------------------------+
| k | c1 | c2                  | __DORIS_DELETE_SIGN__ | __DORIS_VERSION_COL__ | __DORIS_SEQUENCE_COL__ |
+---+----+---------------------+-----------------------+-----------------------+------------------------+
| 1 | 10 | 2024-09-02 16:49:50 | 0                     | 2                     | 2024-09-02 16:49:50    |
+---+----+---------------------+-----------------------+-----------------------+------------------------+
```
### 2. Fix `current_timestamp()` precision loss for newly inserted rows
in partial update
before:
```sql
MySQL [email protected]:d1> CREATE TABLE IF NOT EXISTS t3 (
                  ->   `k` BIGINT NOT NULL,
                  ->   `c1` int,
                  ->   `c2` datetime(6) default current_timestamp(6),
                  ->  ) UNIQUE KEY(`k`)
                  ->  DISTRIBUTED BY HASH(`k`) BUCKETS 1
                  ->  PROPERTIES (
                  ->  "replication_allocation" = "tag.location.default: 1",
                  ->  "enable_unique_key_merge_on_write" = "true",
                  ->  "function_column.sequence_col" = "c2"
                  ->  );
Query OK, 0 rows affected
MySQL [email protected]:d1> set enable_unique_key_partial_update=true;
MySQL [email protected]:d1> set enable_insert_strict=false;
MySQL [email protected]:d1> insert into t3(k,c1) values(3,10);
MySQL [email protected]:d1> set show_hidden_columns=true;
MySQL [email protected]:d1> select * from t3;
+---+----+----------------------------+-----------------------+-----------------------+----------------------------+
| k | c1 | c2                         | __DORIS_DELETE_SIGN__ | __DORIS_VERSION_COL__ | __DORIS_SEQUENCE_COL__     |
+---+----+----------------------------+-----------------------+-----------------------+----------------------------+
| 3 | 10 | 2024-09-02 19:04:55        | 0                     | 2                     | <null>                     |
+---+----+----------------------------+-----------------------+-----------------------+----------------------------+
```
after:
```sql
MySQL [email protected]:d1> CREATE TABLE IF NOT EXISTS t3 (
                  ->   `k` BIGINT NOT NULL,
                  ->   `c1` int,
                  ->   `c2` datetime(6) default current_timestamp(6),
                  ->  ) UNIQUE KEY(`k`)
                  ->  DISTRIBUTED BY HASH(`k`) BUCKETS 1
                  ->  PROPERTIES (
                  ->  "replication_allocation" = "tag.location.default: 1",
                  ->  "enable_unique_key_merge_on_write" = "true",
                  ->  "function_column.sequence_col" = "c2"
                  ->  );
Query OK, 0 rows affected
MySQL [email protected]:d1> set enable_unique_key_partial_update=true;
MySQL [email protected]:d1> set enable_insert_strict=false;
MySQL [email protected]:d1> insert into t3(k,c1) values(3,10);
MySQL [email protected]:d1> set show_hidden_columns=true;
MySQL [email protected]:d1> select * from t3;
+---+----+----------------------------+-----------------------+-----------------------+----------------------------+
| k | c1 | c2                         | __DORIS_DELETE_SIGN__ | __DORIS_VERSION_COL__ | __DORIS_SEQUENCE_COL__     |
+---+----+----------------------------+-----------------------+-----------------------+----------------------------+
| 3 | 10 | 2024-09-02 19:04:55.464438 | 0                     | 2                     | 2024-09-02 19:04:55.464438 |
+---+----+----------------------------+-----------------------+-----------------------+----------------------------+
```
  • Loading branch information
bobhan1 authored and dataroaring committed Oct 5, 2024
1 parent 79c3338 commit a6e57d1
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 16 deletions.
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
}
_timezone = pschema.timezone();

for (const auto& col : pschema.partial_update_input_columns()) {
Expand Down Expand Up @@ -273,6 +276,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class OlapTableSchemaParam {
int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; }
int32_t nano_seconds() const { return _nano_seconds; }
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
Expand All @@ -116,6 +118,7 @@ class OlapTableSchemaParam {
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
};

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn());
}

Expand Down
25 changes: 20 additions & 5 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "vec/common/assert_cast.h"
Expand All @@ -32,12 +33,14 @@ namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, const std::string& auto_increment_column,
int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->nano_seconds = nano_seconds;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
Expand Down Expand Up @@ -78,6 +81,7 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_nano_seconds(nano_seconds);
partial_update_info_pb->set_timezone(timezone);
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
is_input_columns_contains_auto_inc_column);
Expand Down Expand Up @@ -114,6 +118,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
is_schema_contains_auto_inc_column =
partial_update_info_pb->is_schema_contains_auto_inc_column();
if (partial_update_info_pb->has_nano_seconds()) {
nano_seconds = partial_update_info_pb->nano_seconds();
}
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
Expand Down Expand Up @@ -156,9 +163,17 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
auto pos = to_lower(column.default_value()).find('(');
if (pos == std::string::npos) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else {
int precision = std::stoi(column.default_value().substr(pos + 1));
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
default_value = dtv.debug_string();
}
} else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
std::string::npos)) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct RowsetId;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
Expand All @@ -60,6 +60,7 @@ struct PartialUpdateInfo {
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
int32_t nano_seconds {0};
std::string timezone;
bool is_input_columns_contains_auto_inc_column = false;
bool is_schema_contains_auto_inc_column = false;
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(),
_max_version_in_flush_phase);
_partial_update_info->init(
*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase);
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -1181,4 +1181,10 @@ public GeneratedColumnInfo getGeneratedColumnInfo() {
public Set<String> getGeneratedColumnsThatReferToThis() {
return generatedColumnsThatReferToThis;
}

public void setDefaultValueInfo(Column refColumn) {
this.defaultValue = refColumn.defaultValue;
this.defaultValueExprDef = refColumn.defaultValueExprDef;
this.realDefaultValue = refColumn.realDefaultValue;
}
}
16 changes: 15 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ public void setSequenceMapCol(String colName) {
getOrCreatTableProperty().setSequenceMapCol(colName);
}

public void setSequenceInfo(Type type) {
public void setSequenceInfo(Type type, Column refColumn) {
this.hasSequenceCol = true;
this.sequenceType = type;

Expand All @@ -1339,6 +1339,9 @@ public void setSequenceInfo(Type type) {
// unique key table
sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.REPLACE).toColumn();
}
if (refColumn != null) {
sequenceCol.setDefaultValueInfo(refColumn);
}
// add sequence column at last
fullSchema.add(sequenceCol);
nameToColumn.put(Column.SEQUENCE_COL, sequenceCol);
Expand Down Expand Up @@ -1763,6 +1766,17 @@ public void gsonPostProcess() throws IOException {
if (isAutoBucket()) {
defaultDistributionInfo.markAutoBucket();
}
if (isUniqKeyMergeOnWrite() && getSequenceMapCol() != null) {
// set the hidden sequence column's default value the same with
// the sequence map column's for partial update
String seqMapColName = getSequenceMapCol();
Column seqMapCol = getBaseSchema().stream().filter(col -> col.getName().equalsIgnoreCase(seqMapColName))
.findFirst().orElse(null);
Column hiddenSeqCol = getSequenceCol();
if (seqMapCol != null && hiddenSeqCol != null) {
hiddenSeqCol.setDefaultValueInfo(seqMapCol);
}
}
RangePartitionInfo tempRangeInfo = tempPartitions.getPartitionInfo();
if (tempRangeInfo != null) {
for (long partitionId : tempRangeInfo.getIdToItem(false).keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2951,7 +2951,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
throw new DdlException("Sequence type only support integer types and date types");
}
olapTable.setSequenceMapCol(col.getName());
olapTable.setSequenceInfo(col.getType());
olapTable.setSequenceInfo(col.getType(), col);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
Expand All @@ -2965,7 +2965,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
throw new DdlException("The sequence_col and sequence_type cannot be set at the same time");
}
if (sequenceColType != null) {
olapTable.setSequenceInfo(sequenceColType);
olapTable.setSequenceInfo(sequenceColType, null);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ message POlapTableSchemaParam {
optional int64 timestamp_ms = 11 [default = 0];
optional string timezone = 12;
optional int32 auto_increment_column_unique_id = 13 [default = -1];
optional int32 nano_seconds = 14 [default = 0];
};

1 change: 1 addition & 0 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -624,4 +624,5 @@ message PartialUpdateInfoPB {
optional bool is_schema_contains_auto_inc_column = 10 [default = false];
repeated string default_values = 11;
optional int64 max_version_in_flush_phase = 12 [default = -1];
optional int32 nano_seconds = 13 [default = 0];
}
Loading

0 comments on commit a6e57d1

Please sign in to comment.