Skip to content

Commit

Permalink
[improve](move-memtable) remove heavy work pool from load stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Dec 20, 2023
1 parent 9a5ec43 commit 32d8ee0
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 70 deletions.
58 changes: 8 additions & 50 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,37 +302,13 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
return Status::OK();
}

Status st = Status::OK();
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() {
signal::set_signal_task_id(_load_id);
for (auto& it : _index_streams_map) {
st = it.second->close(_tablets_to_commit, success_tablet_ids,
failed_tablet_ids);
if (!st.ok()) {
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
return;
}
}
LOG(INFO) << "close load " << *this
<< ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
if (ret) {
cond.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
signal::set_signal_task_id(_load_id);
for (auto& it : _index_streams_map) {
RETURN_IF_ERROR(it.second->close(_tablets_to_commit, success_tablet_ids, failed_tablet_ids));
}
return st;
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablet_ids->size();
return Status::OK();
}

void LoadStream::_report_result(StreamId stream, const Status& st,
Expand Down Expand Up @@ -418,26 +394,8 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data)
index_stream = it->second;
}

Status st = Status::OK();
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &index_stream, &header, &data, &mutex, &cond, &st] {
signal::set_signal_task_id(_load_id);
st = index_stream->append_data(header, data);
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
if (ret) {
cond.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for append data");
}
}
return st;
signal::set_signal_task_id(_load_id);
return index_stream->append_data(header, data);
}

int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/load_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@

namespace doris {

LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
FifoThreadPool* heavy_work_pool, FifoThreadPool* light_work_pool)
: _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool) {
LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) {
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
Expand Down
9 changes: 1 addition & 8 deletions be/src/runtime/load_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class POpenStreamSinkRequest;

class LoadStreamMgr {
public:
LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool,
FifoThreadPool* light_work_pool);
LoadStreamMgr(uint32_t segment_file_writer_thread_num);
~LoadStreamMgr();

Status open_load_stream(const POpenLoadStreamRequest* request,
Expand All @@ -48,16 +47,10 @@ class LoadStreamMgr {
// only used by ut
size_t get_load_stream_num() { return _load_streams_map.size(); }

FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; }
FifoThreadPool* light_work_pool() { return _light_work_pool; }

private:
std::mutex _lock;
std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map;
std::unique_ptr<ThreadPool> _file_writer_thread_pool;

FifoThreadPool* _heavy_work_pool = nullptr;
FifoThreadPool* _light_work_pool = nullptr;
};

} // namespace doris
3 changes: 1 addition & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
: std::max(10240, CpuInfo::num_cores() * 320),
"brpc_light"),
_load_stream_mgr(new LoadStreamMgr(
exec_env->store_paths().size() * config::flush_thread_num_per_store,
&_heavy_work_pool, &_light_work_pool)) {
exec_env->store_paths().size() * config::flush_thread_num_per_store)) {
REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
[this]() { return _heavy_work_pool.get_queue_size(); });
REGISTER_HOOK_METRIC(light_work_pool_queue_size,
Expand Down
9 changes: 2 additions & 7 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,7 @@ class LoadStreamMgrTest : public testing::Test {
Handler _handler;
};

LoadStreamMgrTest()
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}
LoadStreamMgrTest() = default;

void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
Expand Down Expand Up @@ -602,7 +600,7 @@ class LoadStreamMgrTest : public testing::Test {

static_cast<void>(k_engine->start_bg_threads());

_load_stream_mgr = std::make_unique<LoadStreamMgr>(4, &_heavy_work_pool, &_light_work_pool);
_load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
_stream_service = new StreamService(_load_stream_mgr.get());
CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE));
brpc::ServerOptions server_options;
Expand Down Expand Up @@ -658,9 +656,6 @@ class LoadStreamMgrTest : public testing::Test {
brpc::Server* _server;
StreamService* _stream_service;

FifoThreadPool _heavy_work_pool;
FifoThreadPool _light_work_pool;

std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
};

Expand Down

0 comments on commit 32d8ee0

Please sign in to comment.