Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](merge-on-write) Skip the alignment process of some rowsets in partial update #38487

Merged
merged 9 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,9 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf

std::unique_ptr<RowsetWriter> transient_rs_writer;
DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) {
bool is_partial_update =
txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update;
if (is_partial_update) {
transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer(
*rowset, txn_info->partial_update_info, txn_expiration));
// Partial update might generate new segments when there is conflicts while publish, and mark
Expand Down Expand Up @@ -1242,6 +1244,37 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
}
auto t3 = watch.get_elapse_time_us();

// If a rowset is produced by compaction before the commit phase of the partial update load
// and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset
// because data remains the same before and after compaction. But we still need to calculate the
// the delete bitmap for that rowset.
std::vector<RowsetSharedPtr> rowsets_skip_alignment;
if (is_partial_update) {
int64_t max_version_in_flush_phase =
txn_info->partial_update_info->max_version_in_flush_phase;
DCHECK(max_version_in_flush_phase != -1);
std::vector<RowsetSharedPtr> remained_rowsets;
for (const auto& rowset : specified_rowsets) {
if (rowset->end_version() <= max_version_in_flush_phase &&
rowset->produced_by_compaction()) {
rowsets_skip_alignment.emplace_back(rowset);
} else {
remained_rowsets.emplace_back(rowset);
}
}
if (!rowsets_skip_alignment.empty()) {
specified_rowsets = std::move(remained_rowsets);
}
}

if (!rowsets_skip_alignment.empty()) {
auto token = self->calc_delete_bitmap_executor()->create_token();
// set rowset_writer to nullptr to skip the alignment process
RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, rowsets_skip_alignment,
delete_bitmap, cur_version - 1, token.get(), nullptr));
RETURN_IF_ERROR(token->wait());
}

// When there is only one segment, it will be calculated in the current thread.
// Otherwise, it will be submitted to the thread pool for calculation.
if (segments.size() <= 1) {
Expand Down Expand Up @@ -1433,7 +1466,8 @@ Status BaseTablet::update_delete_bitmap_without_lock(
return Status::InternalError(
"debug tablet update delete bitmap without lock random failed");
} else {
LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not triggered"
LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not "
"triggered"
<< ", rnd:" << rnd << ", percent: " << percent;
}
});
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column) {
const std::string& auto_increment_column, int64_t cur_max_version = -1) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;

max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
Expand Down Expand Up @@ -91,6 +91,7 @@ struct PartialUpdateInfo {

public:
bool is_partial_update {false};
int64_t max_version_in_flush_phase {-1};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); }

// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ class RowsetMeta {
return num_segments() > 1 && is_singleton_delta() && segments_overlap() != NONOVERLAPPING;
}

bool produced_by_compaction() const {
return has_version() &&
(start_version() < end_version() ||
(start_version() == end_version() && segments_overlap() == NONOVERLAPPING));
}

// get the compaction score of this rowset.
// if segments are overlapping, the score equals to the number of segments,
// otherwise, score is 1.
Expand Down
13 changes: 8 additions & 5 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "olap/calc_delete_bitmap_executor.h"
#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/pending_rowset_helper.h"
Expand Down Expand Up @@ -123,7 +124,7 @@ void RowsetBuilder::_garbage_collection() {

Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context) {
std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
int64_t cur_max_version = tablet()->max_version_unlocked();
_max_version_in_flush_phase = tablet()->max_version_unlocked();
std::vector<RowsetSharedPtr> rowset_ptrs;
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (tablet()->tablet_state() == TABLET_NOTREADY) {
Expand All @@ -135,12 +136,13 @@ Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_cont
}
_rowset_ids.clear();
} else {
RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, &_rowset_ids));
RETURN_IF_ERROR(
tablet()->get_all_rs_id_unlocked(_max_version_in_flush_phase, &_rowset_ids));
rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids);
}
_delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
mow_context = std::make_shared<MowContext>(cur_max_version, _req.txn_id, _rowset_ids,
rowset_ptrs, _delete_bitmap);
mow_context = std::make_shared<MowContext>(_max_version_in_flush_phase, _req.txn_id,
_rowset_ids, rowset_ptrs, _delete_bitmap);
return Status::OK();
}

Expand Down Expand Up @@ -408,7 +410,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn());
table_schema_param->auto_increment_coulumn(),
_max_version_in_flush_phase);
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/rowset_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class BaseRowsetBuilder {
std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;
int64_t _max_version_in_flush_phase {-1};

std::shared_ptr<PartialUpdateInfo> _partial_update_info;

Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ Status EnginePublishVersionTask::execute() {
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", {
auto token = dp->param<std::string>("token", "invalid_token");
while (DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block")) {
auto block_dp = DebugPoints::instance()->get_debug_point(
"EnginePublishVersionTask::execute.block");
if (block_dp) {
auto pass_token = block_dp->param<std::string>("pass_token", "");
if (pass_token == token) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});
std::unique_ptr<ThreadPoolToken> token = _engine.tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,26 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, lo
LOG.info("error ", e);
}
}
if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")) {
LOG.info("debug point: block at CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait");
DebugPoint debugPoint = DebugPointUtil.getDebugPoint(
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait");
String token = debugPoint.param("token", "invalid_token");
while (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")) {
DebugPoint blockDebugPoint = DebugPointUtil.getDebugPoint(
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block");
String passToken = blockDebugPoint.param("pass_token", "");
if (token.equals(passToken)) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.info("error ", e);
}
}
LOG.info("debug point: leave CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait");
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int totalRetryTime = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 666 666
2 888 888 2 2
3 777 777 555 555

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 666 666
2 888 888 2 2
3 777 777 555 555

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 1 1
2 888 888 2 2
3 777 777 3 3

Loading
Loading