Skip to content

Commit

Permalink
+1
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Sep 12, 2024
1 parent d6e2018 commit 90ee340
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 28 deletions.
31 changes: 22 additions & 9 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,29 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
std::shared_ptr<QueryContext> query_context =
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(
_load_id.to_thrift());
std::shared_ptr<MemTrackerLimiter> mem_tracker = nullptr;
WorkloadGroupPtr wg_ptr = nullptr;

if (query_context != nullptr) {
_query_thread_context = {_load_id.to_thrift(), query_context->query_mem_tracker,
query_context->workload_group()};
mem_tracker = query_context->query_mem_tracker;
wg_ptr = query_context->workload_group();
} else {
// disable memtale on sink
_query_thread_context = {
_load_id.to_thrift(),
MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()))};
// when memtable on sink is not enabled, load can not find queryctx
mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()));
if (wg_id > 0) {
WorkloadGroupPtr workload_group_ptr =
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id);
_query_thread_context.set_workload_group_for_load_channel(workload_group_ptr);
if (workload_group_ptr) {
wg_ptr = workload_group_ptr;
wg_ptr->add_mem_tracker_limiter(mem_tracker);
_need_release_memtracker = true;
}
}
}
_query_thread_context = {_load_id.to_thrift(), mem_tracker, wg_ptr};

g_loadchannel_cnt << 1;
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
Expand All @@ -78,6 +85,12 @@ LoadChannel::~LoadChannel() {
rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first
<< ", num_rows_filtered: " << entry.second.second;
}
if (_need_release_memtracker) {
WorkloadGroupPtr wg_ptr = _query_thread_context.get_workload_group_ptr();
if (wg_ptr) {
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
}
}
LOG(INFO) << "load channel removed"
<< " load_id=" << _load_id << ", is high priority=" << _is_high_priority
<< ", sender_ip=" << _sender_ip << rows_str.str();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class LoadChannel {
int64_t _backend_id;

bool _enable_profile;
bool _need_release_memtracker = false;
};

inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
Expand Down
22 changes: 3 additions & 19 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,15 +388,6 @@ class QueryThreadContext {
const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: query_id(query_id), query_mem_tracker(mem_tracker) {}

~QueryThreadContext() {
if (used_for_load_channel) {
WorkloadGroupPtr wg_shared_ptr = wg_wptr.lock();
if (wg_shared_ptr) {
wg_shared_ptr->remove_mem_tracker_limiter(query_mem_tracker);
}
}
}

// Not thread safe, generally be called in class constructor, shared_ptr use_count may be
// wrong when called by multiple threads, cause crash after object be destroyed prematurely.
void init_unlocked() {
Expand All @@ -411,20 +402,13 @@ class QueryThreadContext {
#endif
}

void set_workload_group_for_load_channel(WorkloadGroupPtr wg_shared_ptr) {
if (wg_shared_ptr) {
wg_shared_ptr->add_mem_tracker_limiter(query_mem_tracker);
wg_wptr = wg_shared_ptr;
used_for_load_channel = true;
}
}
std::shared_ptr<MemTrackerLimiter> get_memory_tracker() { return query_mem_tracker; }

WorkloadGroupPtr get_workload_group_ptr() { return wg_wptr.lock(); }

TUniqueId query_id;
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
std::weak_ptr<WorkloadGroup> wg_wptr;
//NOTE(wb): currently if load channel is used for load, then worklaod group should be set manually,
// and load channel's memtracker should also be added and removed in QueryThreadContext.
bool used_for_load_channel = false;
};

class ScopedPeakMem {
Expand Down

0 comments on commit 90ee340

Please sign in to comment.