Skip to content

Commit

Permalink
[performance](move-memtable) async close tablet streams
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Sep 23, 2024
1 parent 7348e73 commit b141e40
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
79 changes: 41 additions & 38 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
_close_timer = ADD_TIMER(_profile, "CloseTime");
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}

Expand Down Expand Up @@ -266,63 +267,61 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
return _status;
}

Status TabletStream::close() {
void TabletStream::_close() {
std::lock_guard<std::mutex> lock(_close_lock);
if (_closed == true) {
return;
}
Defer notify {[&]() {
_closed = true;
_close_cv.notify_all();
}};
if (!_status.ok()) {
return _status;
return;
}

SCOPED_TIMER(_close_wait_timer);
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto wait_func = [this, &mu, &cv] {
signal::set_signal_task_id(_load_id);
for (auto& token : _flush_tokens) {
token->wait();
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
SCOPED_TIMER(_close_timer);
signal::set_signal_task_id(_load_id);
for (auto& token : _flush_tokens) {
token->wait();
}

DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
return;
}

// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
if (!_status.ok()) {
return _status;
return;
}
_status = _load_stream_writer->close();
}

auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
void TabletStream::close_async() {
if (_closing.exchange(true)) {
LOG(WARNING) << "skip double closing " << *this;
return;
}
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer([&]() { _close(); });
if (!ret) {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
"there is not enough thread resource for close_async");
}
}

Status TabletStream::close_wait() {
SCOPED_TIMER(_close_wait_timer);
std::unique_lock<std::mutex> lock(_close_lock);
if (_closed == true) {
return _status;
}
_close_cv.wait(lock);
return _status;
}

Expand Down Expand Up @@ -392,7 +391,11 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
tablet_stream->close_async();
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close_wait();
if (st.ok()) {
success_tablet_ids->push_back(tablet_stream->id());
} else {
Expand Down
9 changes: 8 additions & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ class TabletStream {
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
Status close();
void close_async();
Status close_wait();
int64_t id() const { return _id; }

friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);

private:
void _close();
int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
Expand All @@ -71,9 +73,14 @@ class TabletStream {
Status _status;
PUniqueId _load_id;
int64_t _txn_id;
std::atomic<bool> _closing = false;
bool _closed = false;
std::mutex _close_lock;
std::condition_variable _close_cv;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _add_segment_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
};
Expand Down

0 comments on commit b141e40

Please sign in to comment.