Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](executor) support Workload Move Action in BE #28918

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "agent/topic_subscriber.h"
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
#include "agent/workload_move_action_listener.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -72,6 +73,12 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
LOG(INFO) << "Register workload group listener";
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
std::move(wg_listener));

std::unique_ptr<TopicListener> ma_listener =
std::make_unique<WorkloadMoveActionListener>(exec_env);
LOG(INFO) << "Register workload move action listener";
_topic_subscriber->register_listener(doris::TTopicInfoType::type::MOVE_QUERY_TO_GROUP,
std::move(ma_listener));
#endif
}

Expand Down
67 changes: 67 additions & 0 deletions be/src/agent/workload_move_action_listener.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "agent/workload_move_action_listener.h"

namespace doris {

void WorkloadMoveActionListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.move_action) {
continue;
}
FragmentMgr* fmgr = _exec_env->fragment_mgr();

TUniqueId query_id = topic_info.move_action.query_id;
uint64_t dst_group_id = topic_info.move_action.workload_group_id;

std::shared_ptr<QueryContext> query_ctx_ptr = nullptr;
fmgr->get_query_ctx_by_query_id(query_id, &query_ctx_ptr);
if (!query_ctx_ptr) {
continue;
}

if (query_ctx_ptr->is_cancelled()) {
continue;
}

// 1 move memory tracker
std::shared_ptr<taskgroup::TaskGroup> current_group =
query_ctx_ptr->get_task_group_shared_ptr();
if (!current_group) {
continue;
}
std::shared_ptr<taskgroup::TaskGroup> dst_group_ptr = nullptr;
bool move_mem_tracker_ret =
_exec_env->task_group_manager()->migrate_memory_tracker_to_group(
query_ctx_ptr->query_mem_tracker, current_group->id(), dst_group_id,
&dst_group_ptr);
if (move_mem_tracker_ret) {
query_ctx_ptr->set_task_group(dst_group_ptr);
}

// 2 move exec/scan task
bool move_task_ret = _exec_env->task_group_manager()->set_cg_task_sche_for_query_ctx(
dst_group_id, query_ctx_ptr.get());

LOG(INFO) << "try move query " << print_id(query_id) << " to group " << dst_group_id
<< " , move memory result=" << ((int)move_mem_tracker_ret)
<< ", move cpu result=" << ((int)move_task_ret);
}
}

} // namespace doris
40 changes: 40 additions & 0 deletions be/src/agent/workload_move_action_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <glog/logging.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'glog/logging.h' file not found [clang-diagnostic-error]

#include <glog/logging.h>
         ^


#include "agent/topic_listener.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/task_queue.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/task_group/task_group_manager.h"

namespace doris {
class WorkloadMoveActionListener : public TopicListener {
public:
~WorkloadMoveActionListener() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use '= default' to define a trivial destructor [modernize-use-equals-default]

Suggested change
~WorkloadMoveActionListener() {}
~WorkloadMoveActionListener() = default;

WorkloadMoveActionListener(ExecEnv* exec_env) : _exec_env(exec_env) {}

void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) override;

private:
ExecEnv* _exec_env = nullptr;
};
} // namespace doris
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ Status PipelineTask::_open() {
return Status::OK();
}

void PipelineTask::set_task_queue(TaskQueue* task_queue) {
_task_queue = task_queue;
}

Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class PipelineTask {

taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;

void set_task_queue(TaskQueue* task_queue);
void set_task_queue(TaskQueue* task_queue) { this->_task_queue = task_queue; }
TaskQueue* get_task_queue() { return _task_queue; }

static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
Expand Down Expand Up @@ -288,6 +288,7 @@ class PipelineTask {
SourceState _data_state;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;

TaskQueue* _task_queue = nullptr;

// used for priority queue
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,10 @@ std::string PipelineXTask::debug_string() {

void PipelineXTask::wake_up() {
// call by dependency
static_cast<void>(get_task_queue()->push_back(this));
Status ret = query_context()->get_exec_task_queue()->push_back(this);
if (!ret.ok()) {
LOG(ERROR) << "submit to task queue failed";
}
}

} // namespace doris::pipeline
24 changes: 20 additions & 4 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
auto task = *task_itr;
task->set_state(t_state);
local_tasks.erase(task_itr++);
static_cast<void>(task->get_task_queue()->push_back(task));
Status ret = task->query_context()->get_exec_task_queue()->push_back(task);
if (!ret.ok()) {
LOG(ERROR) << "push task to queue failed";
}
}

TaskScheduler::~TaskScheduler() {
Expand Down Expand Up @@ -232,6 +235,16 @@ void TaskScheduler::_do_work(size_t index) {
if (!task) {
continue;
}

// query may be migrated between scheduler
if (task->query_context()->get_task_scheduler()->get_wg_id() != this->get_wg_id()) {
Status ret = task->query_context()->get_exec_task_queue()->push_back(task, index);
if (!ret.ok()) {
LOG(ERROR) << "push task to queue failed";
}
continue;
}

if (task->is_pipelineX() && task->is_running()) {
static_cast<void>(_task_queue->push_back(task, index));
continue;
Expand Down Expand Up @@ -334,10 +347,13 @@ void TaskScheduler::_do_work(size_t index) {
case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
break;
case PipelineTaskState::RUNNABLE:
case PipelineTaskState::RUNNABLE: {
task->set_running(false);
static_cast<void>(_task_queue->push_back(task, index));
break;
Status ret = task->query_context()->get_exec_task_queue()->push_back(task, index);
if (!ret.ok()) {
LOG(ERROR) << "push task to queue failed";
}
} break;
default:
DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state)
<< " task: " << task->debug_string();
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class TaskScheduler {

TaskQueue* task_queue() const { return _task_queue.get(); }

void set_wg_id(uint64_t wg_id) { this->_wg_id = wg_id; }

uint64_t get_wg_id() { return _wg_id; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_wg_id' can be made const [readability-make-member-function-const]

Suggested change
uint64_t get_wg_id() { return _wg_id; }
uint64_t get_wg_id() const { return _wg_id; }


private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
Expand All @@ -101,6 +105,7 @@ class TaskScheduler {
std::atomic<bool> _shutdown;
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
uint64_t _wg_id = -1;

void _do_work(size_t index);
// after _try_close_task, task maybe destructed.
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ class FragmentMgr : public RestMonitorIface {

std::string dump_pipeline_tasks();

void get_query_ctx_by_query_id(TUniqueId query_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_query_ctx_by_query_id' can be made static [readability-convert-member-functions-to-static]

Suggested change
void get_query_ctx_by_query_id(TUniqueId query_id,
static void get_query_ctx_by_query_id(TUniqueId query_id,

std::shared_ptr<QueryContext>* query_ctx_ptr) {
// todo(wb) use shard lock
std::unique_lock<std::mutex> ctx_lock(_lock);
if (_query_ctx_map.find(query_id) != _query_ctx_map.end()) {
*query_ctx_ptr = _query_ctx_map[query_id];
}
}

private:
void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock, bool is_pipeline,
Expand Down
14 changes: 14 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/task_scheduler.h"

namespace doris {

Expand Down Expand Up @@ -116,4 +117,17 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme
}
return true;
}

pipeline::TaskQueue* QueryContext::get_exec_task_queue() {
std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex);
if (_task_scheduler) {
return _task_scheduler->task_queue();
} else if (use_task_group_for_cpu_limit.load()) {
return _exec_env->pipeline_task_group_scheduler()->task_queue();
} else {
// no workload group's task queue found, then we rollback to a common scheduler
return _exec_env->pipeline_task_scheduler()->task_queue();
}
}

} // namespace doris
51 changes: 40 additions & 11 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace doris {

namespace pipeline {
class PipelineFragmentContext;
class TaskQueue;
} // namespace pipeline

struct ReportStatusRequest {
Expand Down Expand Up @@ -149,9 +150,20 @@ class QueryContext {

vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; }

void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
void set_task_group(taskgroup::TaskGroupPtr& tg) {
std::lock_guard<std::shared_mutex> write_lock(_task_group_lock);
_task_group = tg;
}

taskgroup::TaskGroup* get_task_group() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_task_group' can be made static [readability-convert-member-functions-to-static]

Suggested change
taskgroup::TaskGroup* get_task_group() {
static taskgroup::TaskGroup* get_task_group() {

std::shared_lock<std::shared_mutex> read_lock(_task_group_lock);
return _task_group.get();
}

taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
taskgroup::TaskGroupPtr get_task_group_shared_ptr() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_task_group_shared_ptr' can be made static [readability-convert-member-functions-to-static]

Suggested change
taskgroup::TaskGroupPtr get_task_group_shared_ptr() {
static taskgroup::TaskGroupPtr get_task_group_shared_ptr() {

std::shared_lock<std::shared_mutex> read_lock(_task_group_lock);
return _task_group;
}

int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
Expand Down Expand Up @@ -192,17 +204,31 @@ class QueryContext {

TUniqueId query_id() const { return _query_id; }

void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
_task_scheduler = task_scheduler;
void set_task_scheduler(std::shared_ptr<pipeline::TaskScheduler>* task_scheduler) {
std::lock_guard<std::shared_mutex> write_lock(_exec_task_sched_mutex);
_task_scheduler = *task_scheduler;
}

pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
pipeline::TaskScheduler* get_task_scheduler() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_task_scheduler' can be made static [readability-convert-member-functions-to-static]

Suggested change
pipeline::TaskScheduler* get_task_scheduler() {
static pipeline::TaskScheduler* get_task_scheduler() {

std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex);
if (_task_scheduler) {
return _task_scheduler.get();
}
return nullptr;
}

void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) {
_scan_task_scheduler = scan_task_scheduler;
pipeline::TaskQueue* get_exec_task_queue();

void set_scan_task_scheduler(
std::shared_ptr<vectorized::SimplifiedScanScheduler>* scan_task_scheduler) {
std::lock_guard<std::shared_mutex> write_lock(_scan_task_sched_mutex);
_scan_task_scheduler = *scan_task_scheduler;
}

vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
vectorized::SimplifiedScanScheduler* get_scan_scheduler() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_scan_scheduler' can be made static [readability-convert-member-functions-to-static]

Suggested change
vectorized::SimplifiedScanScheduler* get_scan_scheduler() {
static vectorized::SimplifiedScanScheduler* get_scan_scheduler() {

std::shared_lock<std::shared_mutex> read_lock(_scan_task_sched_mutex);
return _scan_task_scheduler.get();
}

pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }

Expand Down Expand Up @@ -260,7 +286,8 @@ class QueryContext {
std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller;
vectorized::RuntimePredicate _runtime_predicate;

taskgroup::TaskGroupPtr _task_group;
std::shared_mutex _task_group_lock;
taskgroup::TaskGroupPtr _task_group = nullptr;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;

Expand All @@ -269,8 +296,10 @@ class QueryContext {
// to report the real message if failed.
Status _exec_status = Status::OK();

pipeline::TaskScheduler* _task_scheduler = nullptr;
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
std::shared_mutex _exec_task_sched_mutex;
std::shared_ptr<pipeline::TaskScheduler> _task_scheduler = nullptr;
std::shared_mutex _scan_task_sched_mutex;
std::shared_ptr<vectorized::SimplifiedScanScheduler> _scan_task_scheduler = nullptr;
std::unique_ptr<pipeline::Dependency> _execution_dependency;
};

Expand Down
Loading
Loading