Skip to content

Commit

Permalink
[refactor](wg&memtracker) using weak ptr to delete memtracker and que…
Browse files Browse the repository at this point in the history
…ry context automatically (#41549)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->

---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Oct 9, 2024
1 parent 2957b19 commit 7229aea
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 59 deletions.
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() {
// TODO replace memory_gc_thread.

// step 6. Refresh weighted memory ratio of workload groups.
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
Expand Down
7 changes: 0 additions & 7 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
_need_release_memtracker = true;
}
}
}
Expand All @@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() {
rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first
<< ", num_rows_filtered: " << entry.second.second;
}
if (_need_release_memtracker) {
WorkloadGroupPtr wg_ptr = _query_thread_context.get_workload_group_ptr();
if (wg_ptr) {
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
}
}
LOG(INFO) << "load channel removed"
<< " load_id=" << _load_id << ", is high priority=" << _is_high_priority
<< ", sender_ip=" << _sender_ip << rows_str.str();
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class LoadChannel {
int64_t _backend_id;

bool _enable_profile;
bool _need_release_memtracker = false;
};

inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ class MemTrackerLimiter final {
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); }

// Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
std::list<std::weak_ptr<MemTrackerLimiter>>::iterator wg_tracker_limiter_group_it;

/*
* Part 3, Memory tracking method (use carefully!)
*
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ QueryContext::~QueryContext() {
uint64_t group_id = 0;
if (_workload_group) {
group_id = _workload_group->id(); // before remove
_workload_group->remove_mem_tracker_limiter(query_mem_tracker);
_workload_group->remove_query(_query_id);
}

_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
Expand Down
84 changes: 51 additions & 33 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
}
}

// MemtrackerLimiter is not removed during query context release, so that should remove it here.
int64_t WorkloadGroup::make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
int64_t used_memory = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (const auto& trackerWptr : mem_tracker_group.trackers) {
auto tracker = trackerWptr.lock();
CHECK(tracker != nullptr);
if (tracker_snapshots != nullptr) {
tracker_snapshots->insert(tracker_snapshots->end(), tracker);
for (auto trackerWptr = mem_tracker_group.trackers.begin();
trackerWptr != mem_tracker_group.trackers.end();) {
auto tracker = trackerWptr->lock();
if (tracker == nullptr) {
trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
} else {
if (tracker_snapshots != nullptr) {
tracker_snapshots->insert(tracker_snapshots->end(), tracker);
}
used_memory += tracker->consumption();
++trackerWptr;
}
used_memory += tracker->consumption();
}
}
refresh_memory(used_memory);
// refresh total memory used.
_total_mem_used = used_memory;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
return used_memory;
}
Expand All @@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}

void WorkloadGroup::refresh_memory(int64_t used_memory) {
// refresh total memory used.
_total_mem_used = used_memory;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
}
void WorkloadGroup::do_sweep() {
// Clear memtracker limiter that is registered during query or load.
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (auto trackerWptr = mem_tracker_group.trackers.begin();
trackerWptr != mem_tracker_group.trackers.end();) {
auto tracker = trackerWptr->lock();
if (tracker == nullptr) {
trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
} else {
++trackerWptr;
}
}
}

void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
// Clear query context that is registered during query context ctor
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
mem_tracker_ptr->wg_tracker_limiter_group_it =
_mem_tracker_limiter_pool[group_num].trackers.insert(
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr);
for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) {
if (iter->second.lock() == nullptr) {
iter = _query_ctxs.erase(iter);
} else {
iter++;
}
}
}

void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
_mem_tracker_limiter_pool[group_num].trackers.end()) {
_mem_tracker_limiter_pool[group_num].trackers.erase(
mem_tracker_ptr->wg_tracker_limiter_group_it);
mem_tracker_ptr->wg_tracker_limiter_group_it =
_mem_tracker_limiter_pool[group_num].trackers.end();
}
_mem_tracker_limiter_pool[group_num].trackers.insert(
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr);
}

int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) {
Expand Down Expand Up @@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile,
auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
const std::string& label) {
return fmt::format(
"{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute "
"{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, "
"Execute "
"again after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
};
auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) {
return fmt::format(
"{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again "
"{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute "
"again "
"after enough memory, details see be.INFO.",
cancel_str, label, MemCounter::print_bytes(mem_consumption),
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
Expand All @@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile,
_id, _name, _memory_limit, used_memory, need_free_mem);
Defer defer {[&]() {
LOG(INFO) << fmt::format(
"[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: "
"[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, "
"used: "
"{}, need_free_mem: {}, freed memory: {}.",
_id, _name, _memory_limit, used_memory, need_free_mem, freed_mem);
}};
Expand Down Expand Up @@ -542,7 +559,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is "
"illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;
}
} else {
Expand Down
15 changes: 2 additions & 13 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
// call make_memory_tracker_snapshots, so also refresh total memory used.
int64_t memory_used();
void refresh_memory(int64_t used_memory);

void do_sweep();

int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -132,8 +133,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr);

void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr);

// when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC
// because mem_limit is not a required property
bool is_mem_limit_valid() {
Expand All @@ -154,11 +153,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return Status::OK();
}

void remove_query(TUniqueId query_id) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_query_ctxs.erase(query_id);
}

void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
Expand All @@ -169,11 +163,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _is_shutdown && _query_ctxs.empty();
}

int query_num() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _query_ctxs.size();
}

int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc);

void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
}

void WorkloadGroupMgr::do_sweep() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (auto& [wg_id, wg] : _workload_groups) {
wg->do_sweep();
}
}

struct WorkloadGroupMemInfo {
int64_t total_mem_used = 0;
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class WorkloadGroupMgr {

WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);

void do_sweep();

void stop();

std::atomic<bool> _enable_cpu_hard_limit = false;
Expand Down

0 comments on commit 7229aea

Please sign in to comment.