Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](load) move generate_delete_bitmap from memtable to beta rowset writer #21329

Merged
merged 9 commits into from
Jul 1, 2023
15 changes: 4 additions & 11 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
#include "common/logging.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "gutil/integral_types.h"
#include "gutil/strings/numbers.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/olap_define.h"
Expand All @@ -47,8 +45,6 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
Expand Down Expand Up @@ -95,7 +91,7 @@ void DeltaWriter::_init_profile(RuntimeProfile* profile) {
_segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
_wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
_put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
_delete_bitmap_timer = ADD_TIMER(_profile, "MemTableDeleteBitmapTime");
_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
_close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime");
_sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
_agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
Expand Down Expand Up @@ -206,7 +202,6 @@ Status DeltaWriter::init() {
_delete_bitmap);
RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));

_schema.reset(new Schema(_tablet_schema));
_reset_mem_table();

// create flush handler
Expand Down Expand Up @@ -346,10 +341,8 @@ void DeltaWriter::_reset_mem_table() {
_mem_table_insert_trackers.push_back(mem_table_insert_tracker);
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
auto mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids,
_delete_bitmap);
_mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(), mow_context,
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc,
_rowset_writer.get(), _tablet->enable_unique_key_merge_on_write(),
mem_table_insert_tracker, mem_table_flush_tracker));

COUNTER_UPDATE(_segment_num, 1);
Expand All @@ -359,7 +352,6 @@ void DeltaWriter::_reset_mem_table() {
COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns);
COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
COUNTER_SET(_sort_times, _memtable_stat.sort_times);
COUNTER_SET(_agg_times, _memtable_stat.agg_times);
Expand Down Expand Up @@ -502,6 +494,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
}
}
COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
return Status::OK();
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ namespace doris {
class FlushToken;
class MemTable;
class MemTracker;
class Schema;
class StorageEngine;
class TupleDescriptor;
class SlotDescriptor;
Expand Down Expand Up @@ -158,7 +157,6 @@ class DeltaWriter {
std::unique_ptr<RowsetWriter> _rowset_writer;
// TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
std::unique_ptr<MemTable> _mem_table;
std::unique_ptr<Schema> _schema;
//const TabletSchema* _tablet_schema;
// tablet schema owned by delta writer, all write will use this tablet schema
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
Expand Down
144 changes: 30 additions & 114 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
#include <pdqsort.h>

#include <algorithm>
#include <cstddef>
#include <limits>
#include <shared_mutex>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -33,11 +31,7 @@
#include "common/consts.h"
#include "common/logging.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand All @@ -46,52 +40,45 @@
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/columns/column.h"
#include "vec/columns/column_object.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/json/path_in_data.h"
#include "vec/jsonb/serialize.h"

namespace doris {
using namespace ErrorCode;

MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* tablet_schema,
MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
RowsetWriter* rowset_writer, std::shared_ptr<MowContext> mow_context,
RowsetWriter* rowset_writer, bool enable_unique_key_mow,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet(std::move(tablet)),
_keys_type(_tablet->keys_type()),
_schema(schema),
: _tablet_id(tablet_id),
_enable_unique_key_mow(enable_unique_key_mow),
_keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
_flush_mem_tracker(flush_mem_tracker),
_schema_size(_schema->schema_size()),
_rowset_writer(rowset_writer),
_is_first_insertion(true),
_agg_functions(schema->num_columns()),
_offsets_of_aggregate_states(schema->num_columns()),
_agg_functions(tablet_schema->num_columns()),
_offsets_of_aggregate_states(tablet_schema->num_columns()),
_total_size_of_aggregate_states(0),
_mem_usage(0),
_mow_context(mow_context) {
_mem_usage(0) {
#ifndef BE_TEST
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())),
fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id)),
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
#else
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())));
fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id)));
#endif
_arena = std::make_unique<vectorized::Arena>();
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema);
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
Expand All @@ -113,9 +100,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescript
}

void MemTable::_init_agg_functions(const vectorized::Block* block) {
for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) {
for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) {
vectorized::AggregateFunctionPtr function;
if (_keys_type == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) {
if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) {
// In such table, non-key column's aggregation type is NONE, so we need to construct
// the aggregate function manually.
function = vectorized::AggregateFunctionSimpleFactory::instance().get(
Expand All @@ -130,7 +117,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
_agg_functions[cid] = function;
}

for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) {
for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) {
_offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states;
_total_size_of_aggregate_states += _agg_functions[cid]->size_of_data();

Expand All @@ -155,7 +142,7 @@ MemTable::~MemTable() {
}
// We should release agg_places here, because they are not released when a
// load is canceled.
for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) {
auto function = _agg_functions[i];
DCHECK(function != nullptr);
function->destroy((*it)->agg_places(i));
Expand All @@ -172,7 +159,7 @@ MemTable::~MemTable() {
}

int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const {
return _pblock->compare_at(left->_row_pos, right->_row_pos, _schema->num_key_columns(),
return _pblock->compare_at(left->_row_pos, right->_row_pos, _tablet_schema->num_key_columns(),
*_pblock, -1);
}

Expand Down Expand Up @@ -217,26 +204,26 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
}

void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
RowInBlock* new_row, RowInBlock* row_in_skiplist) {
RowInBlock* src_row, RowInBlock* dst_row) {
if (_tablet_schema->has_sequence_col()) {
auto sequence_idx = _tablet_schema->sequence_col_idx();
DCHECK_LT(sequence_idx, mutable_block.columns());
auto col_ptr = mutable_block.mutable_columns()[sequence_idx].get();
auto res = col_ptr->compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, *col_ptr, -1);
auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1);
// dst sequence column larger than src, don't need to update
if (res > 0) {
return;
}
// need to update the row pos in skiplist to the new row pos when has
// need to update the row pos in dst row to the src row pos when has
// sequence column
row_in_skiplist->_row_pos = new_row->_row_pos;
dst_row->_row_pos = src_row->_row_pos;
}
// dst is non-sequence row, or dst sequence is smaller
for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) {
for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) {
auto col_ptr = mutable_block.mutable_columns()[cid].get();
_agg_functions[cid]->add(row_in_skiplist->agg_places(cid),
_agg_functions[cid]->add(dst_row->agg_places(cid),
const_cast<const doris::vectorized::IColumn**>(&col_ptr),
new_row->_row_pos, _arena.get());
src_row->_row_pos, _arena.get());
}
}
void MemTable::_put_into_output(vectorized::Block& in_block) {
Expand All @@ -257,7 +244,7 @@ size_t MemTable::_sort() {
size_t same_keys_num = 0;
// sort new rows
Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
for (size_t i = 0; i < _schema->num_key_columns(); i++) {
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1);
};
Expand Down Expand Up @@ -311,13 +298,13 @@ void MemTable::_finalize_one_row(RowInBlock* row,
const vectorized::ColumnsWithTypeAndName& block_data,
int row_pos) {
// move key columns
for (size_t i = 0; i < _schema->num_key_columns(); ++i) {
for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) {
_output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(),
row->_row_pos);
}
if (row->has_init_agg()) {
// get value columns from agg_places
for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) {
auto function = _agg_functions[i];
auto agg_place = row->agg_places(i);
auto col_ptr = _output_mutable_block.get_column_by_position(i).get();
Expand All @@ -335,7 +322,7 @@ void MemTable::_finalize_one_row(RowInBlock* row,
}
} else {
// move columns for rows do not need agg
for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) {
_output_mutable_block.get_column_by_position(i)->insert_from(
*block_data[i].column.get(), row->_row_pos);
}
Expand Down Expand Up @@ -366,7 +353,8 @@ void MemTable::_aggregate() {
prev_row->init_agg_places(
_arena->aligned_alloc(_total_size_of_aggregate_states, 16),
_offsets_of_aggregate_states.data());
for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++) {
for (auto cid = _tablet_schema->num_key_columns();
cid < _tablet_schema->num_columns(); cid++) {
auto col_ptr = mutable_block.mutable_columns()[cid].get();
auto data = prev_row->agg_places(cid);
_agg_functions[cid]->create(data);
Expand Down Expand Up @@ -443,41 +431,6 @@ bool MemTable::need_agg() const {
return false;
}

Status MemTable::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_stat.delete_bitmap_ns);
// generate delete bitmap, build a tmp rowset and load recent segment
if (!_tablet->enable_unique_key_merge_on_write()) {
return Status::OK();
}
auto rowset = _rowset_writer->build_tmp();
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments));
std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock meta_rlock(_tablet->get_header_lock());
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (_tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
return Status::OK();
}
specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
}
OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,
_mow_context->delete_bitmap,
_mow_context->max_version));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << tablet_id()
<< ", rowset_ids: " << _mow_context->rowset_ids.size()
<< ", cur max_version: " << _mow_context->max_version
<< ", transaction_id: " << _mow_context->txn_id
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
return Status::OK();
}

Status MemTable::flush() {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
<< ", memsize: " << memory_usage() << ", rows: " << _stat.raw_rows;
Expand All @@ -501,7 +454,7 @@ Status MemTable::_do_flush() {
SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker);
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _schema->num_key_columns() == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
_output_mutable_block.swap(_input_mutable_block);
} else {
vectorized::Block in_block = _input_mutable_block.to_block();
Expand All @@ -517,11 +470,6 @@ Status MemTable::_do_flush() {
// Unfold variant column
RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
}
if (!_tablet_schema->is_partial_update()) {
ctx.generate_delete_bitmap = [this](size_t segment_id) {
return _generate_delete_bitmap(segment_id);
};
}
ctx.segment_id = _segment_id;
SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size, &ctx));
Expand Down Expand Up @@ -609,36 +557,4 @@ Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* c
return Status::OK();
}

void MemTable::serialize_block_to_row_column(vectorized::Block& block) {
if (block.rows() == 0) {
return;
}
MonotonicStopWatch watch;
watch.start();
// find row column id
int row_column_id = 0;
for (int i = 0; i < _num_columns; ++i) {
if (_tablet_schema->column(i).is_row_store_column()) {
row_column_id = i;
break;
}
}
if (row_column_id == 0) {
return;
}
vectorized::ColumnString* row_store_column =
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
.column->assume_mutable_ref()
.assume_mutable()
.get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column,
_tablet_schema->num_columns(), serdes);
VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
<< ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
<< watch.elapsed_time() / 1000;
}

} // namespace doris
Loading
Loading