Skip to content

Commit

Permalink
support BE schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Jul 4, 2023
1 parent e0f16fe commit 5baebdb
Show file tree
Hide file tree
Showing 25 changed files with 580 additions and 123 deletions.
6 changes: 4 additions & 2 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Status DeltaWriter::init() {
context.segments_overlap = OVERLAPPING;
context.tablet_schema = _tablet_schema;
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = _tablet->table_id();
context.tablet_id = _tablet->tablet_id();
context.tablet = _tablet;
context.write_type = DataWriteType::TYPE_DIRECT;
context.mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids,
Expand Down Expand Up @@ -352,7 +352,8 @@ void DeltaWriter::_reset_mem_table() {
_delete_bitmap);
_mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(), mow_context,
mem_table_insert_tracker, mem_table_flush_tracker));
mem_table_insert_tracker, mem_table_flush_tracker,
_req.index_id));

COUNTER_UPDATE(_segment_num, 1);
_mem_table->set_callback([this](MemTableStat& stat) {
Expand Down Expand Up @@ -470,6 +471,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
vectorized::schema_util::get_least_common_schema(
{_tablet->tablet_schema(), rw_ctx.tablet_schema}, update_schema);
_tablet->update_by_least_common_schema(update_schema);
VLOG_DEBUG << "dump updated tablet schema: " << update_schema->dump_structure();
}

Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
Expand Down
107 changes: 93 additions & 14 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/get_least_supertype.h"
#include "vec/json/path_in_data.h"
#include "vec/jsonb/serialize.h"

Expand All @@ -67,7 +68,7 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
RowsetWriter* rowset_writer, std::shared_ptr<MowContext> mow_context,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t index_id)
: _tablet(std::move(tablet)),
_keys_type(_tablet->keys_type()),
_schema(schema),
Expand All @@ -81,7 +82,8 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t
_offsets_of_aggregate_states(schema->num_columns()),
_total_size_of_aggregate_states(0),
_mem_usage(0),
_mow_context(mow_context) {
_mow_context(mow_context),
_index_id(index_id) {
#ifndef BE_TEST
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())),
Expand Down Expand Up @@ -527,7 +529,6 @@ Status MemTable::close() {
return flush();
}

// This function could throw exeception, it's not expection safe
Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* ctx) {
if (block.rows() == 0 || _tablet_schema->num_variant_columns() == 0) {
return Status::OK();
Expand Down Expand Up @@ -562,9 +563,69 @@ Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* c
TabletSchemaSPtr flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
vectorized::Block flush_block(std::move(block));

// Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema
// Get subcolumns name->column map
phmap::flat_hash_map<StringRef, const TabletColumn*, StringRefHash> subcolumn_map;
for (size_t i = 0; i < variant_column_pos.size(); ++i) {
size_t variant_pos = variant_column_pos[i];
const TabletColumn& variant_col = _tablet_schema->columns()[variant_pos];
std::for_each(
variant_col.get_sub_columns().begin(), variant_col.get_sub_columns().end(),
[&](const TabletColumn& subcolumn) {
subcolumn_map[StringRef(subcolumn.name().data(), subcolumn.name().size())] =
&subcolumn;
});
}

// column positions in flush schema
std::vector<int> modifying_columns_pos;
std::vector<int> new_columns_pos;

// If column already exist in original tablet schema, then we pick common type
// and cast column to common type, and modify tablet column to common type,
// otherwise it's a new column, we should add to frontend
auto column_integrate = [&](const TabletColumn& parent_variant,
auto& column_entry_from_object) {
const std::string& column_name =
parent_variant.name() + "." + column_entry_from_object->path.get_path();
const vectorized::DataTypePtr& final_data_type_from_object =
column_entry_from_object->data.get_least_common_type();
auto it = subcolumn_map.find(column_name);
TabletColumn tablet_column;
if (it != subcolumn_map.end()) {
// Already exists column, check type equality, directly modify meta.
// If tablet column type is different from entry's column type, a cast will be performed later
const TabletColumn* original = it->second;
vectorized::DataTypePtr original_type = it->second->get_vec_type();
vectorized::DataTypePtr common_type;
vectorized::get_least_supertype<vectorized::LeastSupertypeOnError::Jsonb>(
vectorized::DataTypes {original_type, final_data_type_from_object},
&common_type);
if (!original_type->equals(*common_type)) {
// update to common type
modifying_columns_pos.push_back(flush_schema->num_columns());
vectorized::schema_util::get_column_by_type(common_type, column_name,
tablet_column);
tablet_column.set_unique_id(original->unique_id());
} else {
tablet_column = *original;
}
} else {
vectorized::schema_util::get_column_by_type(final_data_type_from_object, column_name,
tablet_column);
// New columns, directly add to meta, new column unique id need to be set
new_columns_pos.push_back(flush_schema->num_columns());
}
tablet_column.set_parent_unique_id(parent_variant.unique_id());
tablet_column.set_path_info(column_entry_from_object->path);
flush_schema->append_column(std::move(tablet_column));
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
final_data_type_from_object, column_name});
};

// 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema
// those columns are extracted columns, leave none extracted columns remain in original variant column, which is
// JSONB format at present
// JSONB format at present.
// 2. Collect columns that need to be added or modified when data type changes or new columns encountered
for (size_t i = 0; i < variant_column_pos.size(); ++i) {
size_t variant_pos = variant_column_pos[i];
vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>(
Expand All @@ -578,21 +639,27 @@ Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* c
root = entry;
continue;
}
const std::string& column_name = parent_column.name() + "." + entry->path.get_path();
TabletColumn tablet_column;
vectorized::schema_util::get_column_by_type(entry->data.get_least_common_type(),
column_name, tablet_column);
tablet_column.set_path_info(entry->path);
tablet_column.set_parent_unique_id(_tablet_schema->columns()[variant_pos].unique_id());
flush_schema->append_column(std::move(tablet_column));
flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), column_name});
column_integrate(parent_column, entry);
}
// Handle root column
flush_block.get_by_position(variant_pos).column = root->data.get_finalized_column_ptr();
flush_block.get_by_position(variant_pos).type = root->data.get_least_common_type();
}

// Check new columns and modified columns and send FrontendService::modifyColumns RPC to
// 1. Get each unique if for sub columns, and record related column in FE meta.
// 2. Notify the Front end meta that some columns type changed.
// Those operation is based on the light weight schema change feature
vectorized::schema_util::UpdateSchemaRequest request;
request.from_schema = flush_schema;
request.new_columns_pos = new_columns_pos;
request.modifying_columns = modifying_columns_pos;
request.tablet_id = _tablet->table_id();
request.index_id = _index_id;
// For CAS
request.schema_version = _tablet_schema->schema_version();
RETURN_IF_ERROR(vectorized::schema_util::update_front_end_schema(request));

{
// Update rowset schema, tablet's tablet schema will be updated when build Rowset
// Eg. flush schema: A(int), B(float), C(int), D(int)
Expand All @@ -610,6 +677,18 @@ Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* c
VLOG_DEBUG << "dump rs schema: " << rw_ctx.tablet_schema->dump_structure();
}

// Cast to expected type
for (size_t i = 0; i < flush_block.columns(); ++i) {
auto expected_type = flush_schema->columns()[i].get_vec_type();
if (!expected_type->equals(*flush_block.get_by_position(i).type)) {
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
{flush_block.get_by_position(i).column, flush_block.get_by_position(i).type,
""},
expected_type, &flush_block.get_by_position(i).column));
flush_block.get_by_position(i).type = expected_type;
}
}

ctx->flush_schema = flush_schema;
block.swap(flush_block);
VLOG_DEBUG << "dump block: " << block.dump_data();
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class MemTable {
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
RowsetWriter* rowset_writer, std::shared_ptr<MowContext> mow_context,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t index_id);
~MemTable();

int64_t tablet_id() const { return _tablet->tablet_id(); }
Expand Down Expand Up @@ -301,6 +301,7 @@ class MemTable {

std::shared_ptr<MowContext> _mow_context;
size_t _num_columns;
int64_t _index_id;
}; // class MemTable

inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
// TODO(lhy) handle variant
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema.append_column(TabletColumn(column_desc));
Expand All @@ -163,6 +164,7 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
tablet_schema->copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema->clear_columns();
// TODO(lhy) handle variant
for (const auto& column_desc : request.columns_desc) {
tablet_schema->append_column(TabletColumn(column_desc));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,7 @@ void SegmentIterator::_init_current_block(
i >= block->columns()) { //todo(wb) maybe we can release it after output block
current_columns[cid]->clear();
} else { // non-predicate column
// if (!block->get_by_position(i).type->equals())
current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();

if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_DATE) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
TabletSchemaSPtr base_tablet_schema = std::make_shared<TabletSchema>();
base_tablet_schema->copy_from(*base_tablet->tablet_schema());
if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
// TODO(lhy) handle variant
base_tablet_schema->clear_columns();
for (const auto& column : request.columns) {
base_tablet_schema->append_column(TabletColumn(column));
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco
}
for (size_t i = 0; i < tcolumn.children_column.size(); i++) {
ColumnPB* children_column = column->add_children_columns();
init_column_from_tcolumn(i, tcolumn.children_column[i], children_column);
init_column_from_tcolumn(tcolumn.children_column[i].col_unique_id,
tcolumn.children_column[i], children_column);
}
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ void TabletColumn::set_path_info(const vectorized::PathInData& path) {
_column_path = path;
}

vectorized::DataTypePtr TabletColumn::get_vec_type() const {
return vectorized::DataTypeFactory::instance().create_data_type(*this);
}

void TabletIndex::init_from_thrift(const TOlapTableIndex& index,
const TabletSchema& tablet_schema) {
_index_id = index.index_id;
Expand Down Expand Up @@ -917,6 +921,10 @@ const std::vector<TabletColumn>& TabletSchema::columns() const {
return _cols;
}

std::vector<TabletColumn>& TabletSchema::mutable_columns() {
return _cols;
}

const TabletColumn& TabletSchema::column(size_t ordinal) const {
DCHECK(ordinal < _num_columns) << "ordinal:" << ordinal << ", _num_columns:" << _num_columns;
return _cols[ordinal];
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace doris {
namespace vectorized {
class Block;
class PathInData;
class IDataType;
} // namespace vectorized

struct OlapTableIndexSchema;
Expand All @@ -63,7 +64,7 @@ class TabletColumn {

int32_t unique_id() const { return _unique_id; }
void set_unique_id(int32_t id) { _unique_id = id; }
std::string name() const { return _col_name; }
const std::string& name() const { return _col_name; }
void set_name(std::string col_name) { _col_name = col_name; }
FieldType type() const { return _type; }
void set_type(FieldType type) { _type = type; }
Expand Down Expand Up @@ -113,6 +114,7 @@ class TabletColumn {

uint32_t get_subtype_count() const { return _sub_column_count; }
const TabletColumn& get_sub_column(uint32_t i) const { return _sub_columns[i]; }
const std::vector<TabletColumn>& get_sub_columns() const { return _sub_columns; }

friend bool operator==(const TabletColumn& a, const TabletColumn& b);
friend bool operator!=(const TabletColumn& a, const TabletColumn& b);
Expand All @@ -130,6 +132,7 @@ class TabletColumn {
bool is_extracted_column() const { return !_column_path.empty(); };
bool parent_unique_d() const { return _parent_col_unique_id; }
void set_parent_unique_id(int32_t col_unique_id) { _parent_col_unique_id = col_unique_id; }
std::shared_ptr<const vectorized::IDataType> get_vec_type() const;

private:
int32_t _unique_id;
Expand Down Expand Up @@ -234,6 +237,7 @@ class TabletSchema {
Status have_column(const std::string& field_name) const;
const TabletColumn& column_by_uid(int32_t col_unique_id) const;
const std::vector<TabletColumn>& columns() const;
std::vector<TabletColumn>& mutable_columns();
size_t num_columns() const { return _num_columns; }
size_t num_key_columns() const { return _num_key_columns; }
size_t num_null_columns() const { return _num_null_columns; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/jsonb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class JsonbParserT {
explicit JsonbParserT(OS_TYPE& os) : writer_(os), stream_pos_(0), err_(JsonbErrType::E_NONE) {}

// parse a UTF-8 JSON string
bool parse(const std::string& str, hDictInsert handler = nullptr) {
bool(parseconst std::string& str, hDictInsert handler = nullptr) {
return parse(str.c_str(), (unsigned int)str.size(), handler);
}

Expand Down
Loading

0 comments on commit 5baebdb

Please sign in to comment.