Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](workloadgroup) use slot num to control memory distribution among one workload group #38237

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
<< print_id(query_ctx->query_id());
}
return Status::OK();
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
std::string err_msg = fmt::format(
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
"{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), print_bytes(limit()),
label(), type_string(_type), print_bytes(_limit),
print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()),
BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
Expand Down Expand Up @@ -694,7 +694,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
seek_num++;
// 32M small query does not cancel
if (tracker->consumption() <= 33554432 ||
tracker->consumption() < tracker->limit()) {
tracker->consumption() < tracker->_limit) {
small_num++;
continue;
}
Expand All @@ -704,7 +704,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
continue;
}
auto overcommit_ratio = int64_t(
(static_cast<double>(tracker->consumption()) / tracker->limit()) *
(static_cast<double>(tracker->consumption()) / tracker->_limit) *
10000);
max_pq.emplace(overcommit_ratio, tracker->label());
query_consumption[tracker->label()] = tracker->consumption();
Expand Down
15 changes: 7 additions & 8 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,16 @@ class MemTrackerLimiter final : public MemTracker {

void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; }
int64_t group_num() const { return _group_num; }
bool has_limit() const { return _limit >= 0; }
int64_t limit() const { return _limit; }
bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); }

bool try_consume(int64_t bytes) const {
bool try_consume(int64_t bytes, bool overcommit) const {
if (UNLIKELY(bytes == 0)) {
return true;
}
bool st = true;
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
st = _consumption->try_add(bytes, _limit);
} else {
if (_limit <= 0 || bytes <= 0 || overcommit) {
_consumption->add(bytes);
} else {
st = _consumption->try_add(bytes, _limit);
}
if (st && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
Expand All @@ -122,6 +119,8 @@ class MemTrackerLimiter final : public MemTracker {
return st;
}

void reset_mem_limit(int64_t new_limit) { _limit = new_limit; }

Status check_limit(int64_t bytes = 0);
bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; }

Expand Down Expand Up @@ -236,7 +235,7 @@ class MemTrackerLimiter final : public MemTracker {

private:
// Limit on memory consumption, in bytes.
int64_t _limit;
std::atomic<int64_t> _limit;

// Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp.
int64_t _group_num;
Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,13 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved memory.
flush_untracked_mem();
if (!_limiter_tracker_raw->try_consume(size)) {
auto wg_ptr = _wg_wptr.lock();
bool overcommit = wg_ptr == nullptr ? false : wg_ptr->enable_memory_overcommit();
// If the related wg is enable overcommit, then not check query memlimit.
// Only process limit affects.
if (!_limiter_tracker_raw->try_consume(size, overcommit)) {
return false;
}
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
_limiter_tracker_raw->release(size); // rollback
Expand Down
19 changes: 9 additions & 10 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,26 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,

void QueryContext::_init_query_mem_tracker() {
bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
int64_t _bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
if (_bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
int64_t bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
if (bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< ". Using process memory limit instead";
_bytes_limit = MemInfo::mem_limit();
bytes_limit = MemInfo::mem_limit();
}
if (_query_options.query_type == TQueryType::SELECT) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
_bytes_limit);
bytes_limit);
} else if (_query_options.query_type == TQueryType::LOAD) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
_bytes_limit);
bytes_limit);
} else { // EXTERNAL
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
_bytes_limit);
bytes_limit);
}
if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
Expand All @@ -150,10 +150,9 @@ QueryContext::~QueryContext() {
std::string mem_tracker_msg;
if (query_mem_tracker->peak_consumption() != 0) {
mem_tracker_msg = fmt::format(
", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
", deregister query/load memory tracker, queryId={}, CurrUsed={}, "
"PeakUsed={}",
print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()),
MemTracker::print_bytes(query_mem_tracker->consumption()),
print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->consumption()),
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
}
uint64_t group_id = 0;
Expand Down
23 changes: 19 additions & 4 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ class QueryContext {

std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; }

int64_t mem_limit() const { return _bytes_limit; }

void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
Expand All @@ -240,8 +238,26 @@ class QueryContext {
return _running_big_mem_op_num.load(std::memory_order_relaxed);
}

void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
void set_mem_limit(int64_t new_mem_limit) {
// Temp logic need spill threshold, but it is useless and will remove it.
_spill_threshold = new_mem_limit;
query_mem_tracker->reset_mem_limit(new_mem_limit);
}

std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return query_mem_tracker; }

// Should remove this method when using reserve logic
int64_t spill_threshold() { return _spill_threshold; }

int32_t get_slot_count() {
return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1;
}

bool enable_query_slot_hard_limit() {
return _query_options.__isset.enable_query_slot_hard_limit
? _query_options.enable_query_slot_hard_limit
: false;
}
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
Expand Down Expand Up @@ -271,7 +287,6 @@ class QueryContext {
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
MonotonicStopWatch _query_watcher;
int64_t _bytes_limit = 0;
bool _is_pipeline = false;
bool _is_nereids = false;
std::atomic<int> _running_big_mem_op_num = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ int64_t RuntimeState::get_load_mem_limit() {
if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) {
return _query_options.load_mem_limit;
} else {
return _query_mem_tracker->limit();
return 0;
}
}

Expand Down
16 changes: 12 additions & 4 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {

AttachTask::AttachTask(RuntimeState* runtime_state) {
signal::set_signal_is_nereids(runtime_state->is_nereids());
QueryThreadContext query_thread_context = {runtime_state->query_id(),
runtime_state->query_mem_tracker(),
runtime_state->get_query_ctx()->workload_group()};
init(query_thread_context);
// RuntimeState not always has query ctx.
// For example during push handler or schema change
if (runtime_state->get_query_ctx() == nullptr) {
QueryThreadContext query_thread_context = {runtime_state->query_id(),
runtime_state->query_mem_tracker()};
init(query_thread_context);
} else {
QueryThreadContext query_thread_context = {
runtime_state->query_id(), runtime_state->query_mem_tracker(),
runtime_state->get_query_ctx()->workload_group()};
init(query_thread_context);
}
}

AttachTask::AttachTask(const QueryThreadContext& query_thread_context) {
Expand Down
12 changes: 11 additions & 1 deletion be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
// This is a invalid value, and should ignore this value during usage
const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;

WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
: _id(tg_info.id),
Expand Down Expand Up @@ -122,6 +124,7 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_spill_high_watermark = tg_info.spill_high_watermark;
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
_total_query_slot_count = tg_info.total_query_slot_count;
} else {
return;
}
Expand Down Expand Up @@ -384,6 +387,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
remote_read_bytes_per_second = tworkload_group_info.remote_read_bytes_per_second;
}

// 16 total slots
int total_query_slot_count = TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.total_query_slot_count) {
total_query_slot_count = tworkload_group_info.total_query_slot_count;
}

return {.id = tg_id,
.name = name,
.cpu_share = cpu_share,
Expand All @@ -398,7 +407,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.spill_low_watermark = spill_low_watermark,
.spill_high_watermark = spill_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second};
.remote_read_bytes_per_second = remote_read_bytes_per_second,
.total_query_slot_count = total_query_slot_count};
}

void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) {
Expand Down
16 changes: 14 additions & 2 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,22 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
}
int spill_threashold_high_water_mark() const {

int spill_threshold_high_water_mark() const {
return _spill_high_watermark.load(std::memory_order_relaxed);
}

void set_weighted_memory_ratio(double ratio);
int total_query_slot_count() const {
return _total_query_slot_count.load(std::memory_order_relaxed);
}

bool add_wg_refresh_interval_memory_growth(int64_t size) {
// If a group is enable memory overcommit, then not need check the limit
// It is always true, and it will only fail when process memory is not
// enough.
if (_enable_memory_overcommit) {
return true;
}
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
Expand Down Expand Up @@ -230,6 +240,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::atomic<int> _spill_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};
std::atomic<int> _total_query_slot_count = 0;

// means workload group is mark dropped
// new query can not submit
Expand Down Expand Up @@ -273,6 +284,7 @@ struct WorkloadGroupInfo {
const int spill_high_watermark = 0;
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
const int total_query_slot_count = 0;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
Expand Down
Loading
Loading