From 32d8ee064980bf88f98f1ef5d9576b80cd593e94 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 20 Dec 2023 19:51:49 +0800 Subject: [PATCH] [improve](move-memtable) remove heavy work pool from load stream --- be/src/runtime/load_stream.cpp | 58 ++++------------------------ be/src/runtime/load_stream_mgr.cpp | 4 +- be/src/runtime/load_stream_mgr.h | 9 +---- be/src/service/internal_service.cpp | 3 +- be/test/runtime/load_stream_test.cpp | 9 +---- 5 files changed, 13 insertions(+), 70 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 9d05d48f54ceac7..5aa2e6ce2e7bd5d 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -302,37 +302,13 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t return Status::OK(); } - Status st = Status::OK(); - { - bthread::Mutex mutex; - std::unique_lock lock(mutex); - bthread::ConditionVariable cond; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer( - [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() { - signal::set_signal_task_id(_load_id); - for (auto& it : _index_streams_map) { - st = it.second->close(_tablets_to_commit, success_tablet_ids, - failed_tablet_ids); - if (!st.ok()) { - std::unique_lock lock(mutex); - cond.notify_one(); - return; - } - } - LOG(INFO) << "close load " << *this - << ", success_tablet_num=" << success_tablet_ids->size() - << ", failed_tablet_num=" << failed_tablet_ids->size(); - std::unique_lock lock(mutex); - cond.notify_one(); - }); - if (ret) { - cond.wait(lock); - } else { - return Status::Error( - "there is not enough thread resource for close load"); - } + signal::set_signal_task_id(_load_id); + for (auto& it : _index_streams_map) { + RETURN_IF_ERROR(it.second->close(_tablets_to_commit, success_tablet_ids, failed_tablet_ids)); } - return st; + LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() + << ", failed_tablet_num=" << failed_tablet_ids->size(); + return Status::OK(); } void LoadStream::_report_result(StreamId stream, const Status& st, @@ -418,26 +394,8 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) index_stream = it->second; } - Status st = Status::OK(); - { - bthread::Mutex mutex; - std::unique_lock lock(mutex); - bthread::ConditionVariable cond; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer( - [this, &index_stream, &header, &data, &mutex, &cond, &st] { - signal::set_signal_task_id(_load_id); - st = index_stream->append_data(header, data); - std::unique_lock lock(mutex); - cond.notify_one(); - }); - if (ret) { - cond.wait(lock); - } else { - return Status::Error( - "there is not enough thread resource for append data"); - } - } - return st; + signal::set_signal_task_id(_load_id); + return index_stream->append_data(header, data); } int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index b3553046aec9f89..ded41df0945cedf 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -32,9 +32,7 @@ namespace doris { -LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num, - FifoThreadPool* heavy_work_pool, FifoThreadPool* light_work_pool) - : _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool) { +LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) { static_cast(ThreadPoolBuilder("SegmentFileWriterThreadPool") .set_min_threads(segment_file_writer_thread_num) .set_max_threads(segment_file_writer_thread_num) diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index 466a23c8c5cfd34..47f32e8fd7d9a21 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -34,8 +34,7 @@ class POpenStreamSinkRequest; class LoadStreamMgr { public: - LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool, - FifoThreadPool* light_work_pool); + LoadStreamMgr(uint32_t segment_file_writer_thread_num); ~LoadStreamMgr(); Status open_load_stream(const POpenLoadStreamRequest* request, @@ -48,16 +47,10 @@ class LoadStreamMgr { // only used by ut size_t get_load_stream_num() { return _load_streams_map.size(); } - FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; } - FifoThreadPool* light_work_pool() { return _light_work_pool; } - private: std::mutex _lock; std::unordered_map _load_streams_map; std::unique_ptr _file_writer_thread_pool; - - FifoThreadPool* _heavy_work_pool = nullptr; - FifoThreadPool* _light_work_pool = nullptr; }; } // namespace doris diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 24152c67089178c..b0cbfaae2eb83b6 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -211,8 +211,7 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : std::max(10240, CpuInfo::num_cores() * 320), "brpc_light"), _load_stream_mgr(new LoadStreamMgr( - exec_env->store_paths().size() * config::flush_thread_num_per_store, - &_heavy_work_pool, &_light_work_pool)) { + exec_env->store_paths().size() * config::flush_thread_num_per_store)) { REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 7cba945dd230c2b..871aebec95be9ce 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -490,9 +490,7 @@ class LoadStreamMgrTest : public testing::Test { Handler _handler; }; - LoadStreamMgrTest() - : _heavy_work_pool(4, 32, "load_stream_test_heavy"), - _light_work_pool(4, 32, "load_stream_test_light") {} + LoadStreamMgrTest() = default; void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) { butil::IOBuf append_buf; @@ -602,7 +600,7 @@ class LoadStreamMgrTest : public testing::Test { static_cast(k_engine->start_bg_threads()); - _load_stream_mgr = std::make_unique(4, &_heavy_work_pool, &_light_work_pool); + _load_stream_mgr = std::make_unique(4); _stream_service = new StreamService(_load_stream_mgr.get()); CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE)); brpc::ServerOptions server_options; @@ -658,9 +656,6 @@ class LoadStreamMgrTest : public testing::Test { brpc::Server* _server; StreamService* _stream_service; - FifoThreadPool _heavy_work_pool; - FifoThreadPool _light_work_pool; - std::unique_ptr _load_stream_mgr; };