From d4c1b39d039f064d2016d9727ea67ce9d6d482b1 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Tue, 24 Sep 2024 16:34:32 +0800 Subject: [PATCH] [fix](multi table) restrict the multi tables load memory under high concurrency with a large number of tables (#39992) (#41131) pick (#39992) BE node was killed by OOM-killer when use multi table load under high concurrency with a large number of tables(128 concurrency and every concurrency load 200 tables). This pr restricts the multi tables load memory under this issue. If memory reaches hard limit, new task will be rejected and return directly. --- be/src/runtime/exec_env_init.cpp | 4 ++-- .../routine_load_task_executor.cpp | 18 ++++++++++++++++-- .../routine_load/routine_load_task_executor.h | 5 ++++- .../routine_load_task_executor_test.cpp | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 5d2ab598b33538..84d0684f17d8db 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -239,7 +239,7 @@ Status ExecEnv::_init(const std::vector& store_paths, new BrpcClientCache(config::function_service_protocol); _stream_load_executor = StreamLoadExecutor::create_shared(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); - RETURN_IF_ERROR(_routine_load_task_executor->init()); + RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(store_paths); _group_commit_mgr = new GroupCommitMgr(this); @@ -540,7 +540,7 @@ void ExecEnv::init_mem_tracker() { _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); _stream_load_pipe_tracker = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe"); + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); } void ExecEnv::_register_metrics() { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 9b475ed2133148..e12ef7ff6df831 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -75,7 +75,8 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() { _task_map.clear(); } -Status RoutineLoadTaskExecutor::init() { +Status RoutineLoadTaskExecutor::init(int64_t process_mem_limit) { + _load_mem_limit = process_mem_limit * config::load_process_max_memory_limit_percent / 100; return ThreadPoolBuilder("routine_load") .set_min_threads(0) .set_max_threads(config::max_routine_load_thread_pool_size) @@ -210,7 +211,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { return Status::OK(); } - if (_task_map.size() >= config::max_routine_load_thread_pool_size) { + if (_task_map.size() >= config::max_routine_load_thread_pool_size || _reach_memory_limit()) { LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id) << ", job id: " << task.job_id << ", queue size: " << _thread_pool->get_queue_size() @@ -305,6 +306,19 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { } } +bool RoutineLoadTaskExecutor::_reach_memory_limit() { + bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); + auto current_load_mem_value = + MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value(); + if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) { + LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit + << " current_load_mem_value: " << current_load_mem_value + << " _load_mem_limit: " << _load_mem_limit; + return true; + } + return false; +} + void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, DataConsumerPool* consumer_pool, ExecFinishCallback cb) { #define HANDLE_ERROR(stmt, err_msg) \ diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index f16ef80ef76f8e..0e597d796c9f77 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -51,7 +51,7 @@ class RoutineLoadTaskExecutor { ~RoutineLoadTaskExecutor(); - Status init(); + Status init(int64_t process_mem_limit); void stop(); @@ -86,6 +86,7 @@ class RoutineLoadTaskExecutor { // create a dummy StreamLoadContext for PKafkaMetaProxyRequest Status _prepare_ctx(const PKafkaMetaProxyRequest& request, std::shared_ptr ctx); + bool _reach_memory_limit(); private: ExecEnv* _exec_env = nullptr; @@ -95,6 +96,8 @@ class RoutineLoadTaskExecutor { std::mutex _lock; // task id -> load context std::unordered_map> _task_map; + + int64_t _load_mem_limit = -1; }; } // namespace doris diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index f95fdcfdadfb8a..338b82c6eba3a2 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -94,7 +94,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { RoutineLoadTaskExecutor executor(&_env); Status st; - st = executor.init(); + st = executor.init(1024 * 1024); EXPECT_TRUE(st.ok()); // submit task st = executor.submit_task(task);