diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 5da49758865c1c..27fbfb71d7f516 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() { // TODO replace memory_gc_thread. // step 6. Refresh weighted memory ratio of workload groups. + doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep(); doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit(); // step 7. Analyze blocking queries. diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index f8c11639719303..1ac7753b19784b 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig if (workload_group_ptr) { wg_ptr = workload_group_ptr; wg_ptr->add_mem_tracker_limiter(mem_tracker); - _need_release_memtracker = true; } } } @@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() { rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first << ", num_rows_filtered: " << entry.second.second; } - if (_need_release_memtracker) { - WorkloadGroupPtr wg_ptr = _query_thread_context.get_workload_group_ptr(); - if (wg_ptr) { - wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker()); - } - } LOG(INFO) << "load channel removed" << " load_id=" << _load_id << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip << rows_str.str(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 6fad8c536ec4fa..6c150ed74d9126 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -127,7 +127,6 @@ class LoadChannel { int64_t _backend_id; bool _enable_profile; - bool _need_release_memtracker = false; }; inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index faf354cca4cbf3..251a7c25a741fc 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -123,9 +123,6 @@ class MemTrackerLimiter final { bool is_query_cancelled() { return _is_query_cancelled; } void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } - // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. - std::list>::iterator wg_tracker_limiter_group_it; - /* * Part 3, Memory tracking method (use carefully!) * diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 8931854897e168..c602dc683feddb 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -159,8 +159,6 @@ QueryContext::~QueryContext() { uint64_t group_id = 0; if (_workload_group) { group_id = _workload_group->id(); // before remove - _workload_group->remove_mem_tracker_limiter(query_mem_tracker); - _workload_group->remove_query(_query_id); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6f3b51f09fd1f2..0488e9ec83c6c2 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { } } +// MemtrackerLimiter is not removed during query context release, so that should remove it here. int64_t WorkloadGroup::make_memory_tracker_snapshots( std::list>* tracker_snapshots) { int64_t used_memory = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard l(mem_tracker_group.group_lock); - for (const auto& trackerWptr : mem_tracker_group.trackers) { - auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - if (tracker_snapshots != nullptr) { - tracker_snapshots->insert(tracker_snapshots->end(), tracker); + for (auto trackerWptr = mem_tracker_group.trackers.begin(); + trackerWptr != mem_tracker_group.trackers.end();) { + auto tracker = trackerWptr->lock(); + if (tracker == nullptr) { + trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); + } else { + if (tracker_snapshots != nullptr) { + tracker_snapshots->insert(tracker_snapshots->end(), tracker); + } + used_memory += tracker->consumption(); + ++trackerWptr; } - used_memory += tracker->consumption(); } } - refresh_memory(used_memory); + // refresh total memory used. + _total_mem_used = used_memory; + // reserve memory is recorded in the query mem tracker + // and _total_mem_used already contains all the current reserve memory. + // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. + _wg_refresh_interval_memory_growth.store(0.0); _mem_used_status->set_value(used_memory); return used_memory; } @@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() { return make_memory_tracker_snapshots(nullptr); } -void WorkloadGroup::refresh_memory(int64_t used_memory) { - // refresh total memory used. - _total_mem_used = used_memory; - // reserve memory is recorded in the query mem tracker - // and _total_mem_used already contains all the current reserve memory. - // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. - _wg_refresh_interval_memory_growth.store(0.0); -} +void WorkloadGroup::do_sweep() { + // Clear memtracker limiter that is registered during query or load. + for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { + std::lock_guard l(mem_tracker_group.group_lock); + for (auto trackerWptr = mem_tracker_group.trackers.begin(); + trackerWptr != mem_tracker_group.trackers.end();) { + auto tracker = trackerWptr->lock(); + if (tracker == nullptr) { + trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); + } else { + ++trackerWptr; + } + } + } -void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { + // Clear query context that is registered during query context ctor std::unique_lock wlock(_mutex); - auto group_num = mem_tracker_ptr->group_num(); - std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); - mem_tracker_ptr->wg_tracker_limiter_group_it = - _mem_tracker_limiter_pool[group_num].trackers.insert( - _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); + for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) { + if (iter->second.lock() == nullptr) { + iter = _query_ctxs.erase(iter); + } else { + iter++; + } + } } -void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { +void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { std::unique_lock wlock(_mutex); auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); - if (mem_tracker_ptr->wg_tracker_limiter_group_it != - _mem_tracker_limiter_pool[group_num].trackers.end()) { - _mem_tracker_limiter_pool[group_num].trackers.erase( - mem_tracker_ptr->wg_tracker_limiter_group_it); - mem_tracker_ptr->wg_tracker_limiter_group_it = - _mem_tracker_limiter_pool[group_num].trackers.end(); - } + _mem_tracker_limiter_pool[group_num].trackers.insert( + _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); } int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) { @@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( - "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute " + "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, " + "Execute " "again after enough memory, details see be.INFO.", cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); }; auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( - "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again " + "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute " + "again " "after enough memory, details see be.INFO.", cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); @@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, _id, _name, _memory_limit, used_memory, need_free_mem); Defer defer {[&]() { LOG(INFO) << fmt::format( - "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: " + "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, " + "used: " "{}, need_free_mem: {}, freed memory: {}.", _id, _name, _memory_limit, used_memory, need_free_mem, freed_mem); }}; @@ -542,7 +559,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e _cgroup_cpu_ctl->update_cpu_soft_limit( CgroupCpuCtl::cpu_soft_limit_default_value()); } else { - LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " + LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is " + "illegal: " << cpu_hard_limit << ", gid=" << tg_id; } } else { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 2fbb4dd303059c..933c5afdb4ebe8 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -89,7 +89,8 @@ class WorkloadGroup : public std::enable_shared_from_this { std::list>* tracker_snapshots); // call make_memory_tracker_snapshots, so also refresh total memory used. int64_t memory_used(); - void refresh_memory(int64_t used_memory); + + void do_sweep(); int spill_threshold_low_water_mark() const { return _spill_low_watermark.load(std::memory_order_relaxed); @@ -132,8 +133,6 @@ class WorkloadGroup : public std::enable_shared_from_this { void add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); - void remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); - // when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC // because mem_limit is not a required property bool is_mem_limit_valid() { @@ -154,11 +153,6 @@ class WorkloadGroup : public std::enable_shared_from_this { return Status::OK(); } - void remove_query(TUniqueId query_id) { - std::unique_lock wlock(_mutex); - _query_ctxs.erase(query_id); - } - void shutdown() { std::unique_lock wlock(_mutex); _is_shutdown = true; @@ -169,11 +163,6 @@ class WorkloadGroup : public std::enable_shared_from_this { return _is_shutdown && _query_ctxs.empty(); } - int query_num() { - std::shared_lock r_lock(_mutex); - return _query_ctxs.size(); - } - int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc); void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 65a8e3685c80ed..003f07f1db0c4a 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -136,6 +136,13 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i << ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size; } +void WorkloadGroupMgr::do_sweep() { + std::shared_lock r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + wg->do_sweep(); + } +} + struct WorkloadGroupMemInfo { int64_t total_mem_used = 0; std::list> tracker_snapshots = diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index d8547c3383e219..f76e98d26063ba 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -50,6 +50,8 @@ class WorkloadGroupMgr { WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id); + void do_sweep(); + void stop(); std::atomic _enable_cpu_hard_limit = false;