Skip to content

Commit

Permalink
[fix](multi table) restrict the multi tables load memory under high c…
Browse files Browse the repository at this point in the history
…oncurrency with a large number of tables (apache#39992) (apache#41131)

pick (apache#39992)

BE node was killed by OOM-killer when use multi table load under high
concurrency with a large number of tables(128 concurrency and every
concurrency load 200 tables).

This pr restricts the multi tables load memory under this issue. If
memory reaches hard limit, new task will be rejected and return
directly.
  • Loading branch information
sollhui authored Sep 24, 2024
1 parent 3ad9dce commit d4c1b39
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
4 changes: 2 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
Expand Down Expand Up @@ -540,7 +540,7 @@ void ExecEnv::init_mem_tracker() {
_s3_file_buffer_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer");
_stream_load_pipe_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe");
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe");
}

void ExecEnv::_register_metrics() {
Expand Down
18 changes: 16 additions & 2 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}

Status RoutineLoadTaskExecutor::init() {
Status RoutineLoadTaskExecutor::init(int64_t process_mem_limit) {
_load_mem_limit = process_mem_limit * config::load_process_max_memory_limit_percent / 100;
return ThreadPoolBuilder("routine_load")
.set_min_threads(0)
.set_max_threads(config::max_routine_load_thread_pool_size)
Expand Down Expand Up @@ -210,7 +211,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
return Status::OK();
}

if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
if (_task_map.size() >= config::max_routine_load_thread_pool_size || _reach_memory_limit()) {
LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id)
<< ", job id: " << task.job_id
<< ", queue size: " << _thread_pool->get_queue_size()
Expand Down Expand Up @@ -305,6 +306,19 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
}
}

bool RoutineLoadTaskExecutor::_reach_memory_limit() {
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
auto current_load_mem_value =
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value();
if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
<< " current_load_mem_value: " << current_load_mem_value
<< " _load_mem_limit: " << _load_mem_limit;
return true;
}
return false;
}

void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
DataConsumerPool* consumer_pool, ExecFinishCallback cb) {
#define HANDLE_ERROR(stmt, err_msg) \
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RoutineLoadTaskExecutor {

~RoutineLoadTaskExecutor();

Status init();
Status init(int64_t process_mem_limit);

void stop();

Expand Down Expand Up @@ -86,6 +86,7 @@ class RoutineLoadTaskExecutor {
// create a dummy StreamLoadContext for PKafkaMetaProxyRequest
Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
std::shared_ptr<StreamLoadContext> ctx);
bool _reach_memory_limit();

private:
ExecEnv* _exec_env = nullptr;
Expand All @@ -95,6 +96,8 @@ class RoutineLoadTaskExecutor {
std::mutex _lock;
// task id -> load context
std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>> _task_map;

int64_t _load_mem_limit = -1;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {

RoutineLoadTaskExecutor executor(&_env);
Status st;
st = executor.init();
st = executor.init(1024 * 1024);
EXPECT_TRUE(st.ok());
// submit task
st = executor.submit_task(task);
Expand Down

0 comments on commit d4c1b39

Please sign in to comment.