Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](partial update) Fix __DORIS_SEQUENCE_COL__ is not set for newly inserted rows in partial update #40272

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
}
_timezone = pschema.timezone();

for (const auto& col : pschema.partial_update_input_columns()) {
Expand Down Expand Up @@ -273,6 +276,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class OlapTableSchemaParam {
int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; }
int32_t nano_seconds() const { return _nano_seconds; }
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
Expand All @@ -116,6 +118,7 @@ class OlapTableSchemaParam {
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
};

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn());
}

Expand Down
25 changes: 20 additions & 5 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "vec/common/assert_cast.h"
Expand All @@ -32,12 +33,14 @@ namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, const std::string& auto_increment_column,
int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->nano_seconds = nano_seconds;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
Expand Down Expand Up @@ -78,6 +81,7 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_nano_seconds(nano_seconds);
partial_update_info_pb->set_timezone(timezone);
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
is_input_columns_contains_auto_inc_column);
Expand Down Expand Up @@ -114,6 +118,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
is_schema_contains_auto_inc_column =
partial_update_info_pb->is_schema_contains_auto_inc_column();
if (partial_update_info_pb->has_nano_seconds()) {
nano_seconds = partial_update_info_pb->nano_seconds();
}
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
Expand Down Expand Up @@ -156,9 +163,17 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
auto pos = to_lower(column.default_value()).find('(');
if (pos == std::string::npos) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else {
int precision = std::stoi(column.default_value().substr(pos + 1));
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
default_value = dtv.debug_string();
}
} else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
std::string::npos)) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct RowsetId;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
Expand All @@ -60,6 +60,7 @@ struct PartialUpdateInfo {
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
int32_t nano_seconds {0};
std::string timezone;
bool is_input_columns_contains_auto_inc_column = false;
bool is_schema_contains_auto_inc_column = false;
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(),
_max_version_in_flush_phase);
_partial_update_info->init(
*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase);
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -1181,4 +1181,10 @@ public GeneratedColumnInfo getGeneratedColumnInfo() {
public Set<String> getGeneratedColumnsThatReferToThis() {
return generatedColumnsThatReferToThis;
}

public void setDefaultValueInfo(Column refColumn) {
this.defaultValue = refColumn.defaultValue;
this.defaultValueExprDef = refColumn.defaultValueExprDef;
this.realDefaultValue = refColumn.realDefaultValue;
}
}
16 changes: 15 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,7 @@ public void setSequenceMapCol(String colName) {
getOrCreatTableProperty().setSequenceMapCol(colName);
}

public void setSequenceInfo(Type type) {
public void setSequenceInfo(Type type, Column refColumn) {
this.hasSequenceCol = true;
this.sequenceType = type;

Expand All @@ -1435,6 +1435,9 @@ public void setSequenceInfo(Type type) {
// unique key table
sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.REPLACE).toColumn();
}
if (refColumn != null) {
sequenceCol.setDefaultValueInfo(refColumn);
}
// add sequence column at last
fullSchema.add(sequenceCol);
nameToColumn.put(Column.SEQUENCE_COL, sequenceCol);
Expand Down Expand Up @@ -1862,6 +1865,17 @@ public void gsonPostProcess() throws IOException {
if (isAutoBucket()) {
defaultDistributionInfo.markAutoBucket();
}
if (isUniqKeyMergeOnWrite() && getSequenceMapCol() != null) {
// set the hidden sequence column's default value the same with
// the sequence map column's for partial update
String seqMapColName = getSequenceMapCol();
Column seqMapCol = getBaseSchema().stream().filter(col -> col.getName().equalsIgnoreCase(seqMapColName))
.findFirst().orElse(null);
Column hiddenSeqCol = getSequenceCol();
if (seqMapCol != null && hiddenSeqCol != null) {
hiddenSeqCol.setDefaultValueInfo(seqMapCol);
}
}
RangePartitionInfo tempRangeInfo = tempPartitions.getPartitionInfo();
if (tempRangeInfo != null) {
for (long partitionId : tempRangeInfo.getIdToItem(false).keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2981,7 +2981,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
throw new DdlException("Sequence type only support integer types and date types");
}
olapTable.setSequenceMapCol(col.getName());
olapTable.setSequenceInfo(col.getType());
olapTable.setSequenceInfo(col.getType(), col);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
Expand All @@ -2995,7 +2995,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
throw new DdlException("The sequence_col and sequence_type cannot be set at the same time");
}
if (sequenceColType != null) {
olapTable.setSequenceInfo(sequenceColType);
olapTable.setSequenceInfo(sequenceColType, null);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ message POlapTableSchemaParam {
optional int64 timestamp_ms = 11 [default = 0];
optional string timezone = 12;
optional int32 auto_increment_column_unique_id = 13 [default = -1];
optional int32 nano_seconds = 14 [default = 0];
};

1 change: 1 addition & 0 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -624,4 +624,5 @@ message PartialUpdateInfoPB {
optional bool is_schema_contains_auto_inc_column = 10 [default = false];
repeated string default_values = 11;
optional int64 max_version_in_flush_phase = 12 [default = -1];
optional int32 nano_seconds = 13 [default = 0];
}
Loading
Loading