-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature](executor) support Workload Move Action in BE #28918
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include "agent/workload_move_action_listener.h" | ||
|
||
namespace doris { | ||
|
||
void WorkloadMoveActionListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) { | ||
for (const TopicInfo& topic_info : topic_info_list) { | ||
if (!topic_info.__isset.move_action) { | ||
continue; | ||
} | ||
FragmentMgr* fmgr = _exec_env->fragment_mgr(); | ||
|
||
TUniqueId query_id = topic_info.move_action.query_id; | ||
uint64_t dst_group_id = topic_info.move_action.workload_group_id; | ||
|
||
std::shared_ptr<QueryContext> query_ctx_ptr = nullptr; | ||
fmgr->get_query_ctx_by_query_id(query_id, &query_ctx_ptr); | ||
if (!query_ctx_ptr) { | ||
continue; | ||
} | ||
|
||
if (query_ctx_ptr->is_cancelled()) { | ||
continue; | ||
} | ||
|
||
// 1 move memory tracker | ||
std::shared_ptr<taskgroup::TaskGroup> current_group = | ||
query_ctx_ptr->get_task_group_shared_ptr(); | ||
if (!current_group) { | ||
continue; | ||
} | ||
std::shared_ptr<taskgroup::TaskGroup> dst_group_ptr = nullptr; | ||
bool move_mem_tracker_ret = | ||
_exec_env->task_group_manager()->migrate_memory_tracker_to_group( | ||
query_ctx_ptr->query_mem_tracker, current_group->id(), dst_group_id, | ||
&dst_group_ptr); | ||
if (move_mem_tracker_ret) { | ||
query_ctx_ptr->set_task_group(dst_group_ptr); | ||
} | ||
|
||
// 2 move exec/scan task | ||
bool move_task_ret = _exec_env->task_group_manager()->set_cg_task_sche_for_query_ctx( | ||
dst_group_id, query_ctx_ptr.get()); | ||
|
||
LOG(INFO) << "try move query " << print_id(query_id) << " to group " << dst_group_id | ||
<< " , move memory result=" << ((int)move_mem_tracker_ret) | ||
<< ", move cpu result=" << ((int)move_task_ret); | ||
} | ||
} | ||
|
||
} // namespace doris |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,40 @@ | ||||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||||
// or more contributor license agreements. See the NOTICE file | ||||||
// distributed with this work for additional information | ||||||
// regarding copyright ownership. The ASF licenses this file | ||||||
// to you under the Apache License, Version 2.0 (the | ||||||
// "License"); you may not use this file except in compliance | ||||||
// with the License. You may obtain a copy of the License at | ||||||
// | ||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||
// | ||||||
// Unless required by applicable law or agreed to in writing, | ||||||
// software distributed under the License is distributed on an | ||||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
// KIND, either express or implied. See the License for the | ||||||
// specific language governing permissions and limitations | ||||||
// under the License. | ||||||
|
||||||
#pragma once | ||||||
|
||||||
#include <glog/logging.h> | ||||||
|
||||||
#include "agent/topic_listener.h" | ||||||
#include "pipeline/pipeline_fragment_context.h" | ||||||
#include "pipeline/task_queue.h" | ||||||
#include "runtime/exec_env.h" | ||||||
#include "runtime/fragment_mgr.h" | ||||||
#include "runtime/task_group/task_group_manager.h" | ||||||
|
||||||
namespace doris { | ||||||
class WorkloadMoveActionListener : public TopicListener { | ||||||
public: | ||||||
~WorkloadMoveActionListener() {} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: use '= default' to define a trivial destructor [modernize-use-equals-default]
Suggested change
|
||||||
WorkloadMoveActionListener(ExecEnv* exec_env) : _exec_env(exec_env) {} | ||||||
|
||||||
void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) override; | ||||||
|
||||||
private: | ||||||
ExecEnv* _exec_env = nullptr; | ||||||
}; | ||||||
} // namespace doris |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -93,6 +93,10 @@ class TaskScheduler { | |||||
|
||||||
TaskQueue* task_queue() const { return _task_queue.get(); } | ||||||
|
||||||
void set_wg_id(uint64_t wg_id) { this->_wg_id = wg_id; } | ||||||
|
||||||
uint64_t get_wg_id() { return _wg_id; } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_wg_id' can be made const [readability-make-member-function-const]
Suggested change
|
||||||
|
||||||
private: | ||||||
std::unique_ptr<ThreadPool> _fix_thread_pool; | ||||||
std::shared_ptr<TaskQueue> _task_queue; | ||||||
|
@@ -101,6 +105,7 @@ class TaskScheduler { | |||||
std::atomic<bool> _shutdown; | ||||||
std::string _name; | ||||||
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; | ||||||
uint64_t _wg_id = -1; | ||||||
|
||||||
void _do_work(size_t index); | ||||||
// after _try_close_task, task maybe destructed. | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -153,6 +153,15 @@ class FragmentMgr : public RestMonitorIface { | |||||
|
||||||
std::string dump_pipeline_tasks(); | ||||||
|
||||||
void get_query_ctx_by_query_id(TUniqueId query_id, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_query_ctx_by_query_id' can be made static [readability-convert-member-functions-to-static]
Suggested change
|
||||||
std::shared_ptr<QueryContext>* query_ctx_ptr) { | ||||||
// todo(wb) use shard lock | ||||||
std::unique_lock<std::mutex> ctx_lock(_lock); | ||||||
if (_query_ctx_map.find(query_id) != _query_ctx_map.end()) { | ||||||
*query_ctx_ptr = _query_ctx_map[query_id]; | ||||||
} | ||||||
} | ||||||
|
||||||
private: | ||||||
void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, | ||||||
const std::unique_lock<std::mutex>& state_lock, bool is_pipeline, | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,6 +43,7 @@ namespace doris { | |||||
|
||||||
namespace pipeline { | ||||||
class PipelineFragmentContext; | ||||||
class TaskQueue; | ||||||
} // namespace pipeline | ||||||
|
||||||
struct ReportStatusRequest { | ||||||
|
@@ -149,9 +150,20 @@ class QueryContext { | |||||
|
||||||
vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; } | ||||||
|
||||||
void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; } | ||||||
void set_task_group(taskgroup::TaskGroupPtr& tg) { | ||||||
std::lock_guard<std::shared_mutex> write_lock(_task_group_lock); | ||||||
_task_group = tg; | ||||||
} | ||||||
|
||||||
taskgroup::TaskGroup* get_task_group() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_task_group' can be made static [readability-convert-member-functions-to-static]
Suggested change
|
||||||
std::shared_lock<std::shared_mutex> read_lock(_task_group_lock); | ||||||
return _task_group.get(); | ||||||
} | ||||||
|
||||||
taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); } | ||||||
taskgroup::TaskGroupPtr get_task_group_shared_ptr() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_task_group_shared_ptr' can be made static [readability-convert-member-functions-to-static]
Suggested change
|
||||||
std::shared_lock<std::shared_mutex> read_lock(_task_group_lock); | ||||||
return _task_group; | ||||||
} | ||||||
|
||||||
int execution_timeout() const { | ||||||
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout | ||||||
|
@@ -192,17 +204,31 @@ class QueryContext { | |||||
|
||||||
TUniqueId query_id() const { return _query_id; } | ||||||
|
||||||
void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) { | ||||||
_task_scheduler = task_scheduler; | ||||||
void set_task_scheduler(std::shared_ptr<pipeline::TaskScheduler>* task_scheduler) { | ||||||
std::lock_guard<std::shared_mutex> write_lock(_exec_task_sched_mutex); | ||||||
_task_scheduler = *task_scheduler; | ||||||
} | ||||||
|
||||||
pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; } | ||||||
pipeline::TaskScheduler* get_task_scheduler() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_task_scheduler' can be made static [readability-convert-member-functions-to-static]
Suggested change
|
||||||
std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex); | ||||||
if (_task_scheduler) { | ||||||
return _task_scheduler.get(); | ||||||
} | ||||||
return nullptr; | ||||||
} | ||||||
|
||||||
void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) { | ||||||
_scan_task_scheduler = scan_task_scheduler; | ||||||
pipeline::TaskQueue* get_exec_task_queue(); | ||||||
|
||||||
void set_scan_task_scheduler( | ||||||
std::shared_ptr<vectorized::SimplifiedScanScheduler>* scan_task_scheduler) { | ||||||
std::lock_guard<std::shared_mutex> write_lock(_scan_task_sched_mutex); | ||||||
_scan_task_scheduler = *scan_task_scheduler; | ||||||
} | ||||||
|
||||||
vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; } | ||||||
vectorized::SimplifiedScanScheduler* get_scan_scheduler() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_scan_scheduler' can be made static [readability-convert-member-functions-to-static]
Suggested change
|
||||||
std::shared_lock<std::shared_mutex> read_lock(_scan_task_sched_mutex); | ||||||
return _scan_task_scheduler.get(); | ||||||
} | ||||||
|
||||||
pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } | ||||||
|
||||||
|
@@ -260,7 +286,8 @@ class QueryContext { | |||||
std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller; | ||||||
vectorized::RuntimePredicate _runtime_predicate; | ||||||
|
||||||
taskgroup::TaskGroupPtr _task_group; | ||||||
std::shared_mutex _task_group_lock; | ||||||
taskgroup::TaskGroupPtr _task_group = nullptr; | ||||||
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; | ||||||
const TQueryOptions _query_options; | ||||||
|
||||||
|
@@ -269,8 +296,10 @@ class QueryContext { | |||||
// to report the real message if failed. | ||||||
Status _exec_status = Status::OK(); | ||||||
|
||||||
pipeline::TaskScheduler* _task_scheduler = nullptr; | ||||||
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; | ||||||
std::shared_mutex _exec_task_sched_mutex; | ||||||
std::shared_ptr<pipeline::TaskScheduler> _task_scheduler = nullptr; | ||||||
std::shared_mutex _scan_task_sched_mutex; | ||||||
std::shared_ptr<vectorized::SimplifiedScanScheduler> _scan_task_scheduler = nullptr; | ||||||
std::unique_ptr<pipeline::Dependency> _execution_dependency; | ||||||
}; | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: 'glog/logging.h' file not found [clang-diagnostic-error]