Skip to content

Commit

Permalink
[improvement](partial-update) make partial-update on agg table use le…
Browse files Browse the repository at this point in the history
…ss memory
  • Loading branch information
hust-hhb committed Mar 28, 2024
1 parent 6d66196 commit 9c273d8
Show file tree
Hide file tree
Showing 13 changed files with 493 additions and 23 deletions.
6 changes: 5 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
function = vectorized::AggregateFunctionSimpleFactory::instance().get(
"replace_load", {block->get_data_type(cid)},
block->get_data_type(cid)->is_nullable());
} else if (_keys_type == KeysType::AGG_KEYS && _is_partial_update) {
function = vectorized::AggregateFunctionSimpleFactory::instance().get(
"replace_if_not_null_load", {block->get_data_type(cid)},
block->get_data_type(cid)->is_nullable());
} else {
function =
_tablet_schema->column(cid).get_aggregate_function(vectorized::AGG_LOAD_SUFFIX);
Expand Down Expand Up @@ -485,7 +489,7 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
if (_is_partial_update) {
if (_is_partial_update && _keys_type == KeysType::UNIQUE_KEYS) {
auto update_columns_size = _num_columns;
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand Down
27 changes: 23 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,22 +766,41 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
!_opts.rowset_ctx->is_transient_rowset_writer &&
_tablet_schema->keys_type() == UNIQUE_KEYS) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
return Status::OK();
}
CHECK(block->columns() >= _column_writers.size())
bool is_agg_partial_update = _opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_tablet_schema->keys_type() == AGG_KEYS;
std::shared_ptr<vectorized::Block> full_block_ptr = nullptr;
if (is_agg_partial_update) {
if (block->columns() <= _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InternalError(fmt::format(
"illegal partial update block columns: {}, num key columns: {}, total "
"schema columns: {}",
block->columns(), _tablet_schema->num_key_columns(),
_tablet_schema->num_columns()));
}
RETURN_IF_ERROR(_tablet_schema->make_full_block(
full_block_ptr, block, num_rows, _opts.rowset_ctx->partial_update_info->update_cids,
_opts.rowset_ctx->partial_update_info->missing_cids));
}
const vectorized::Block* full_block = is_agg_partial_update ? full_block_ptr.get() : block;
CHECK(full_block->columns() >= _column_writers.size())
<< ", block->columns()=" << block->columns()
<< ", _column_writers.size()=" << _column_writers.size();
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_tablet_schema->store_row_column() &&
(_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) {
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(block));
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(full_block));
}

_olap_data_convertor->set_source_content(block, row_pos, num_rows);
_olap_data_convertor->set_source_content(full_block, row_pos, num_rows);

// find all row pos for short key indexes
std::vector<size_t> short_key_pos;
Expand Down
52 changes: 41 additions & 11 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,13 +694,18 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
if (_tablet_schema->keys_type() != UNIQUE_KEYS && _tablet_schema->keys_type() != AGG_KEYS) {
return Status::InternalError(fmt::format("illegal partial update key type: {}",
_tablet_schema->keys_type()));
}
if (block->columns() <= _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InternalError(fmt::format(
"illegal partial update block columns: {}, num key columns: {}, total "
"schema columns: {}",
block->columns(), _tablet_schema->num_key_columns(),
_tablet_schema->num_columns()));
return Status::InternalError(
fmt::format("illegal partial update key type {}, block columns: {}, num key "
"columns: {}, total "
"schema columns: {}",
_tablet_schema->keys_type(), block->columns(),
_tablet_schema->num_key_columns(), _tablet_schema->num_columns()));
}
} else if (block->columns() != _tablet_schema->num_columns()) {
return Status::InternalError(
Expand All @@ -715,7 +720,8 @@ Status VerticalSegmentWriter::write_batch() {
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
!_opts.rowset_ctx->is_transient_rowset_writer &&
_tablet_schema->keys_type() == UNIQUE_KEYS) {
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid)));
}
Expand All @@ -728,24 +734,47 @@ Status VerticalSegmentWriter::write_batch() {
}
return Status::OK();
}
bool is_agg_partial_update = _opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_tablet_schema->keys_type() == AGG_KEYS;
std::vector<std::shared_ptr<vectorized::Block>> full_blocks;
if (is_agg_partial_update) {
for (auto& data : _batched_blocks) {
std::shared_ptr<vectorized::Block> full_block_ptr = nullptr;
RETURN_IF_ERROR(_tablet_schema->make_full_block(
full_block_ptr, data.block, data.num_rows,
_opts.rowset_ctx->partial_update_info->update_cids,
_opts.rowset_ctx->partial_update_info->missing_cids));
full_blocks.emplace_back(full_block_ptr);
}
}

// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_tablet_schema->store_row_column() &&
(_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) {
int index = 0;
for (auto& data : _batched_blocks) {
// TODO: maybe we should pass range to this method
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
if (is_agg_partial_update) {
_serialize_block_to_row_column(
*const_cast<vectorized::Block*>(full_blocks[index++].get()));
} else {
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
}
}
}

std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid)));
int index = 0;
for (auto& data : _batched_blocks) {
_olap_data_convertor->set_source_content_with_specifid_columns(
data.block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid});
is_agg_partial_update ? full_blocks[index++].get() : data.block, data.row_pos,
data.num_rows, std::vector<uint32_t> {cid});

// convert column data from engine format to storage layer format
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
Expand All @@ -770,9 +799,11 @@ Status VerticalSegmentWriter::write_batch() {
RETURN_IF_ERROR(_column_writers[cid]->finish());
RETURN_IF_ERROR(_column_writers[cid]->write_data());
}

int index = 0;
for (auto& data : _batched_blocks) {
_olap_data_convertor->set_source_content(data.block, data.row_pos, data.num_rows);
_olap_data_convertor->set_source_content(
is_agg_partial_update ? full_blocks[index++].get() : data.block, data.row_pos,
data.num_rows);
// find all row pos for short key indexes
std::vector<size_t> short_key_pos;
// We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we
Expand Down Expand Up @@ -814,7 +845,6 @@ Status VerticalSegmentWriter::write_batch() {
_olap_data_convertor->clear_source_content();
_num_rows_written += data.num_rows;
}

_batched_blocks.clear();
return Status::OK();
}
Expand Down
50 changes: 50 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1470,4 +1470,54 @@ std::string TabletSchema::deterministic_string_serialize(const TabletSchemaPB& s
return output;
}

Status TabletSchema::make_full_block(std::shared_ptr<vectorized::Block>& full_block_ptr,
const vectorized::Block* block, size_t num_rows,
std::vector<uint32_t>& update_cids,
std::vector<uint32_t>& missing_cids) {
full_block_ptr = std::make_shared<vectorized::Block>(this->create_block());
size_t input_id = 0;
for (auto i : update_cids) {
full_block_ptr->replace_by_position(i, block->get_by_position(input_id++).column);
}
auto mutable_full_columns = full_block_ptr->mutate_columns();
auto old_value_block = create_block_by_cids(missing_cids);
CHECK(missing_cids.size() == old_value_block.columns());

// build default value columns
auto default_value_block = old_value_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();

for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& missing_column = this->column(missing_cids[i]);
if (missing_column.has_default_value()) {
auto default_value = this->column(missing_cids[i]).default_value();
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get()));
}
}

for (auto idx = 0; idx < num_rows; idx++) {
for (auto i = 0; i < missing_cids.size(); ++i) {
// if the column has default value, fill it with default value
// otherwise, if the column is nullable, fill it with null value
const auto& tablet_column = this->column(missing_cids[i]);
if (tablet_column.is_nullable()) {
auto nullable_column = assert_cast<vectorized::ColumnNullable*>(
mutable_full_columns[missing_cids[i]].get());
nullable_column->insert_null_elements(1);
} else if (tablet_column.has_default_value()) {
mutable_full_columns[missing_cids[i]]->insert_from(
*mutable_default_value_columns[i].get(), 0);
} else {
return Status::InternalError(
"missing column {} is not null but don't have default value",
tablet_column.name());
}
}
}
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ class TabletSchema {

void update_tablet_columns(const TabletSchema& tablet_schema,
const std::vector<TColumn>& t_columns);
Status make_full_block(std::shared_ptr<vectorized::Block>& full_block_ptr,
const vectorized::Block* block, size_t num_rows,
std::vector<uint32_t>& update_cids, std::vector<uint32_t>& missing_cids);

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) {
partitionSpec.second, // partition names
isAutoDetect,
isOverwrite,
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate()
|| ConnectContext.get().getSessionVariable().isEnableAggregateKeyPartialUpdate(),
DMLCommandType.INSERT,
plan);
LogicalPlan command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
Expand Down Expand Up @@ -102,7 +101,7 @@ private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
Database database = pair.first;
OlapTable table = pair.second;
boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS;
boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType().isAggregationFamily();

LogicalPlan child = ((LogicalPlan) sink.child());
boolean childHasSeqCol = child.getOutput().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
boolean negative = taskInfo.getNegative();
// get partial update related info
boolean isPartialUpdate = taskInfo.isPartialUpdate();
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support partial update");
if (isPartialUpdate && (!destTable.getKeysType().isAggregationFamily() || (
destTable.getKeysType() == KeysType.UNIQUE_KEYS && !destTable.getEnableUniqueKeyMergeOnWrite()))) {
throw new UserException("Only unique key merge on write or agg key support partial update");
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
Expand Down Expand Up @@ -371,8 +372,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
boolean negative = taskInfo.getNegative();
// get partial update related info
boolean isPartialUpdate = taskInfo.isPartialUpdate();
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support partial update");
if (isPartialUpdate && (!destTable.getKeysType().isAggregationFamily() || (
destTable.getKeysType() == KeysType.UNIQUE_KEYS && !destTable.getEnableUniqueKeyMergeOnWrite()))) {
throw new UserException("Only unique key merge on write or agg key support partial update");
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update";

public static final String ENABLE_AGG_KEY_PARTIAL_UPDATE = "enable_agg_key_partial_update";

public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold";
public static final String INVERTED_INDEX_MAX_EXPANSIONS = "inverted_index_max_expansions";

Expand Down Expand Up @@ -1493,6 +1495,9 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true)
public boolean enableUniqueKeyPartialUpdate = false;

@VariableMgr.VarAttr(name = ENABLE_AGG_KEY_PARTIAL_UPDATE, needForward = true)
public boolean enableAggregateKeyPartialUpdate = false;

@VariableMgr.VarAttr(name = TEST_QUERY_CACHE_HIT, description = {
"用于测试查询缓存是否命中,如果未命中指定类型的缓存,则会报错",
"Used to test whether the query cache is hit. "
Expand Down Expand Up @@ -2955,6 +2960,14 @@ public void setEnableUniqueKeyPartialUpdate(boolean enableUniqueKeyPartialUpdate
this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate;
}

public boolean isEnableAggregateKeyPartialUpdate() {
return enableAggregateKeyPartialUpdate;
}

public void setEnableAggregateKeyPartialUpdate(boolean enableAggregateKeyPartialUpdate) {
this.enableAggregateKeyPartialUpdate = enableAggregateKeyPartialUpdate;
}

public int getLoadStreamPerNode() {
return loadStreamPerNode;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !1 --
1 doris 1000 123 1
2 doris2 2000 223 1

-- !2 --
1 doris 200 123 1
2 doris2 400 223 1
4 \N 400 \N \N

-- !3 --
1 1 3 4
2 2 4 5
3 3 2 3
4 4 1 2

-- !4 --
1 2 3 4
2 3 4 5
3 4 2 3
4 5 1 2

-- !5 --
1 1 1 3 4
2 2 2 4 5
3 3 3 2 3
4 4 4 1 2

-- !6 --
1 1 1 3 4
2 2 2 4 5
3 3 3 2 3
4 4 4 1 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !basic --
1 doris 200 123 1
2 doris2 400 223 1

-- !basic_with_duplicate --
1 doris 444 123 1
2 doris2 555 223 1

-- !basic_with_duplicate2 --
1 doris 1111 123 1
2 doris2 2222 223 1

-- !basic_with_new_keys --
1 doris 1111 123 1
2 doris2 2222 223 1
3 "stranger" 500 \N \N
4 "foreigner" 600 \N \N

-- !basic_with_new_keys2 --
1 doris 1111 123 1
2 doris2 2222 223 1
3 "stranger" 500 \N 4321
4 "foreigner" 600 \N 4321

-- !basic_with_new_keys_and_invalid --
1 doris 1111 123 1
2 doris2 2222 223 1
3 "stranger" 500 \N 4321
4 "foreigner" 600 \N 4321
5 "stranger" \N \N \N
6 "foreigner" \N \N \N

-- !basic_invalid --
1 doris 1111 123 1
2 doris2 2222 223 1
3 "stranger" 500 \N 4321
4 "foreigner" 600 \N 4321
5 "stranger" \N \N \N
6 "foreigner" \N \N \N

Loading

0 comments on commit 9c273d8

Please sign in to comment.