From aca8406e319530030e065a34f19a496fab86e85b Mon Sep 17 00:00:00 2001 From: wangbo Date: Fri, 22 Dec 2023 17:05:50 +0800 Subject: [PATCH] [refactor](executor)remove scan group #28847 --- be/src/common/config.cpp | 2 - be/src/common/config.h | 2 - be/src/runtime/task_group/task_group.cpp | 5 - be/src/runtime/task_group/task_group.h | 6 - be/src/vec/exec/scan/scan_task_queue.cpp | 221 --------------------- be/src/vec/exec/scan/scan_task_queue.h | 99 --------- be/src/vec/exec/scan/scanner_context.cpp | 4 - be/src/vec/exec/scan/scanner_context.h | 1 - be/src/vec/exec/scan/scanner_scheduler.cpp | 44 ---- be/src/vec/exec/scan/scanner_scheduler.h | 13 +- 10 files changed, 1 insertion(+), 396 deletions(-) delete mode 100644 be/src/vec/exec/scan/scan_task_queue.cpp delete mode 100644 be/src/vec/exec/scan/scan_task_queue.h diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 03eaee7b23c6d6..ecc44a08e47c2a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -970,8 +970,6 @@ DEFINE_Bool(enable_fuzzy_mode, "false"); DEFINE_Bool(enable_debug_points, "false"); DEFINE_Int32(pipeline_executor_size, "0"); -DEFINE_Bool(enable_workload_group_for_scan, "false"); -DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000"); // 128 MB DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728"); diff --git a/be/src/common/config.h b/be/src/common/config.h index e011073d44dad5..a9508c6e8af3ff 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1024,8 +1024,6 @@ DECLARE_Bool(enable_fuzzy_mode); DECLARE_Bool(enable_debug_points); DECLARE_Int32(pipeline_executor_size); -DECLARE_Bool(enable_workload_group_for_scan); -DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 137f5ea2345ee6..9e86f8b831b3dc 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -33,7 +33,6 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" #include "util/parse_util.h" -#include "vec/exec/scan/scan_task_queue.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris { @@ -102,7 +101,6 @@ std::string TaskGroupEntity::debug_string() const { } template class TaskGroupEntity>; -template class TaskGroupEntity; TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) : _id(tg_info.id), @@ -112,7 +110,6 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) _enable_memory_overcommit(tg_info.enable_memory_overcommit), _cpu_share(tg_info.cpu_share), _task_entity(this, "pipeline task entity"), - _local_scan_entity(this, "local scan entity"), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), _cpu_hard_limit(tg_info.cpu_hard_limit) {} @@ -150,8 +147,6 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { } ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share( tg_info, &_task_entity); - ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share( - tg_info, &_local_scan_entity); } int64_t TaskGroup::memory_used() { diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index f1c8523664e273..04dbf518f0d58a 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -43,7 +43,6 @@ namespace taskgroup { class TaskGroup; struct TaskGroupInfo; -class ScanTaskQueue; template class TaskGroupEntity { @@ -88,9 +87,6 @@ class TaskGroupEntity { using TaskGroupPipelineTaskEntity = TaskGroupEntity>; using TGPTEntityPtr = TaskGroupPipelineTaskEntity*; -using TaskGroupScanTaskEntity = TaskGroupEntity; -using TGSTEntityPtr = TaskGroupScanTaskEntity*; - struct TgTrackerLimiterGroup { std::unordered_set> trackers; std::mutex group_lock; @@ -101,7 +97,6 @@ class TaskGroup : public std::enable_shared_from_this { explicit TaskGroup(const TaskGroupInfo& tg_info); TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; } - TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; } int64_t version() const { return _version; } @@ -155,7 +150,6 @@ class TaskGroup : public std::enable_shared_from_this { bool _enable_memory_overcommit; std::atomic _cpu_share; TaskGroupPipelineTaskEntity _task_entity; - TaskGroupScanTaskEntity _local_scan_entity; std::vector _mem_tracker_limiter_pool; std::atomic _cpu_hard_limit; }; diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp deleted file mode 100644 index 7c2068e5715718..00000000000000 --- a/be/src/vec/exec/scan/scan_task_queue.cpp +++ /dev/null @@ -1,221 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "scan_task_queue.h" - -#include "pipeline/pipeline_task.h" -#include "runtime/task_group/task_group.h" -#include "vec/exec/scan/scanner_context.h" - -namespace doris { -namespace taskgroup { -static void empty_function() {} -ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {} - -ScanTask::ScanTask(WorkFunction scan_func, - std::shared_ptr scanner_context, - TGSTEntityPtr scan_entity, int priority) - : scan_func(std::move(scan_func)), - scanner_context(scanner_context), - scan_entity(scan_entity), - priority(priority) {} - -ScanTaskQueue::ScanTaskQueue() : _queue(config::doris_scanner_thread_pool_queue_size) {} - -Status ScanTaskQueue::try_push_back(ScanTask scan_task) { - if (_queue.try_put(std::move(scan_task))) { - VLOG_DEBUG << "try_push_back scan task " << scan_task.scanner_context->ctx_id << " " - << scan_task.priority; - return Status::OK(); - } else { - return Status::InternalError("failed to submit scan task to ScanTaskQueue"); - } -} -bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) { - auto r = _queue.blocking_get(scan_task, timeout_ms); - if (r) { - VLOG_DEBUG << "try get scan task " << scan_task->scanner_context->ctx_id << " " - << scan_task->priority; - } - return r; -} - -ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {} -ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default; - -void ScanTaskTaskGroupQueue::close() { - std::unique_lock lock(_rs_mutex); - _closed = true; - _wait_task.notify_all(); -} - -bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { - std::unique_lock lock(_rs_mutex); - taskgroup::TGSTEntityPtr entity = nullptr; - while (entity == nullptr) { - if (_closed) { - return false; - } - if (_group_entities.empty()) { - _wait_task.wait_for(lock, std::chrono::milliseconds( - config::workload_group_scan_task_wait_timeout_ms)); - } else { - entity = _next_tg_entity(); - if (!entity) { - _wait_task.wait_for(lock, - std::chrono::milliseconds( - config::workload_group_scan_task_wait_timeout_ms)); - } - } - } - DCHECK(entity->task_size() > 0); - if (entity->task_size() == 1) { - _dequeue_task_group(entity); - } - return entity->task_queue()->try_get( - scan_task, config::workload_group_scan_task_wait_timeout_ms /* timeout_ms */); -} - -bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { - auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity(); - std::unique_lock lock(_rs_mutex); - auto status = entity->task_queue()->try_push_back(scan_task); - if (!status.ok()) { - LOG(WARNING) << "try_push_back scan task fail: " << status; - return false; - } - if (_group_entities.find(entity) == _group_entities.end()) { - _enqueue_task_group(entity); - } - _wait_task.notify_one(); - return true; -} - -void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t time_spent) { - auto* entity = scan_task.scan_entity; - std::unique_lock lock(_rs_mutex); - auto find_entity = _group_entities.find(entity); - bool is_in_queue = find_entity != _group_entities.end(); - VLOG_DEBUG << "scan task task group queue update_statistics " << entity->debug_string() - << ", in queue:" << is_in_queue << ", time_spent: " << time_spent; - if (is_in_queue) { - _group_entities.erase(entity); - } - entity->incr_runtime_ns(time_spent); - if (is_in_queue) { - _group_entities.emplace(entity); - _update_min_tg(); - } -} - -void ScanTaskTaskGroupQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TGSTEntityPtr entity) { - std::unique_lock lock(_rs_mutex); - bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); - if (is_in_queue) { - _group_entities.erase(entity); - _total_cpu_share -= entity->cpu_share(); - } - entity->check_and_update_cpu_share(task_group_info); - if (is_in_queue) { - _group_entities.emplace(entity); - _total_cpu_share += entity->cpu_share(); - } -} - -void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) { - _total_cpu_share += tg_entity->cpu_share(); - // TODO llj tg If submitted back to this queue from the scanner thread, `adjust_vruntime_ns` - // should be avoided. - /** - * If a task group entity leaves task queue for a long time, its v runtime will be very - * small. This can cause it to preempt too many execution time. So, in order to avoid this - * situation, it is necessary to adjust the task group's v runtime. - * */ - auto old_v_ns = tg_entity->vruntime_ns(); - auto* min_entity = _min_tg_entity.load(); - if (min_entity) { - auto min_tg_v = min_entity->vruntime_ns(); - auto ideal_r = _ideal_runtime_ns(tg_entity) / 2; - uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v; - if (new_vruntime_ns > old_v_ns) { - VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " << new_vruntime_ns; - tg_entity->adjust_vruntime_ns(new_vruntime_ns); - } - } else if (old_v_ns < _min_tg_v_runtime_ns) { - VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " << _min_tg_v_runtime_ns; - tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns); - } - _group_entities.emplace(tg_entity); - VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string() - << ", group entity size: " << _group_entities.size(); - _update_min_tg(); -} - -void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) { - _total_cpu_share -= tg_entity->cpu_share(); - _group_entities.erase(tg_entity); - VLOG_DEBUG << "scan task group queue dequeue tg " << tg_entity->debug_string() - << ", group entity size: " << _group_entities.size(); - _update_min_tg(); -} - -TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() { - taskgroup::TGSTEntityPtr res = nullptr; - for (auto* entity : _group_entities) { - res = entity; - break; - } - return res; -} - -uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity) const { - // Scan task does not have time slice, so we use pipeline task's instead. - return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / - _total_cpu_share; -} - -void ScanTaskTaskGroupQueue::_update_min_tg() { - auto* min_entity = _next_tg_entity(); - _min_tg_entity = min_entity; - if (min_entity) { - auto min_v_runtime = min_entity->vruntime_ns(); - if (min_v_runtime > _min_tg_v_runtime_ns) { - _min_tg_v_runtime_ns = min_v_runtime; - } - } -} - -bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()( - const taskgroup::TGSTEntityPtr& lhs_ptr, const taskgroup::TGSTEntityPtr& rhs_ptr) const { - auto lhs_val = lhs_ptr->vruntime_ns(); - auto rhs_val = rhs_ptr->vruntime_ns(); - if (lhs_val != rhs_val) { - return lhs_val < rhs_val; - } else { - auto l_share = lhs_ptr->cpu_share(); - auto r_share = rhs_ptr->cpu_share(); - if (l_share != r_share) { - return l_share < r_share; - } else { - return lhs_ptr->task_group_id() < rhs_ptr->task_group_id(); - } - } -} - -} // namespace taskgroup -} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h deleted file mode 100644 index 18ef18872edbc6..00000000000000 --- a/be/src/vec/exec/scan/scan_task_queue.h +++ /dev/null @@ -1,99 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#pragma once - -#include "olap/tablet.h" -#include "runtime/task_group/task_group.h" -#include "util/blocking_priority_queue.hpp" - -namespace doris { -namespace vectorized { -class ScannerContext; -}; - -namespace taskgroup { - -using WorkFunction = std::function; - -// Like PriorityThreadPool::Task -struct ScanTask { - ScanTask(); - ScanTask(WorkFunction scan_func, std::shared_ptr scanner_context, - TGSTEntityPtr scan_entity, int priority); - bool operator<(const ScanTask& o) const { return priority < o.priority; } - ScanTask& operator++() { - priority += 2; - return *this; - } - - WorkFunction scan_func; - std::shared_ptr scanner_context = nullptr; - TGSTEntityPtr scan_entity; - int priority; -}; - -// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly? -class ScanTaskQueue { -public: - ScanTaskQueue(); - Status try_push_back(ScanTask); - bool try_get(ScanTask* scan_task, uint32_t timeout_ms); - int size() { return _queue.get_size(); } - -private: - BlockingPriorityQueue _queue; -}; - -// Like TaskGroupTaskQueue -class ScanTaskTaskGroupQueue { -public: - explicit ScanTaskTaskGroupQueue(size_t core_size); - ~ScanTaskTaskGroupQueue(); - - void close(); - bool take(ScanTask* scan_task); - bool push_back(ScanTask); - - void update_statistics(ScanTask task, int64_t time_spent); - - void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, taskgroup::TGSTEntityPtr); - -private: - TGSTEntityPtr _task_entity(ScanTask& scan_task); - void _enqueue_task_group(TGSTEntityPtr); - void _dequeue_task_group(TGSTEntityPtr); - TGSTEntityPtr _next_tg_entity(); - uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const; - void _update_min_tg(); - - // Like cfs rb tree in sched_entity - struct TaskGroupSchedEntityComparator { - bool operator()(const taskgroup::TGSTEntityPtr&, const taskgroup::TGSTEntityPtr&) const; - }; - using ResouceGroupSet = std::set; - ResouceGroupSet _group_entities; - std::condition_variable _wait_task; - std::mutex _rs_mutex; - bool _closed = false; - int _total_cpu_share = 0; - std::atomic _min_tg_entity = nullptr; - uint64_t _min_tg_v_runtime_ns = 0; - size_t _core_size; -}; - -} // namespace taskgroup -} // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 99f645ca9e574b..5ad2dbec5b69fa 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -601,10 +601,6 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current } } -taskgroup::TaskGroup* ScannerContext::get_task_group() const { - return _state->get_query_ctx()->get_task_group(); -} - template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* parent, RuntimeState* state); template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* state); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index a64b544471218c..ba9c1fdee10a5b 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -167,7 +167,6 @@ class ScannerContext : public std::enable_shared_from_this { return blocks_num; } - taskgroup::TaskGroup* get_task_group() const; SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } void reschedule_scanner_ctx(); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2e4db75a241b2c..e8d7f8a7139a6d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -35,7 +35,6 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" -#include "scan_task_queue.h" #include "util/async_io.h" // IWYU pragma: keep #include "util/blocking_queue.hpp" #include "util/cpu_info.h" @@ -88,18 +87,15 @@ void ScannerScheduler::stop() { _is_closed = true; - _task_group_local_scan_queue->close(); _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); _limited_scan_thread_pool->shutdown(); - _group_local_scan_thread_pool->shutdown(); _scheduler_pool->wait(); _local_scan_thread_pool->join(); _remote_scan_thread_pool->join(); _limited_scan_thread_pool->wait(); - _group_local_scan_thread_pool->wait(); LOG(INFO) << "ScannerScheduler stopped"; } @@ -136,19 +132,6 @@ Status ScannerScheduler::init(ExecEnv* env) { .set_max_threads(config::doris_scanner_thread_pool_thread_num) .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) .build(&_limited_scan_thread_pool)); - - // 5. task group local scan - _task_group_local_scan_queue = std::make_unique( - config::doris_scanner_thread_pool_thread_num); - static_cast(ThreadPoolBuilder("local_scan_group") - .set_min_threads(config::doris_scanner_thread_pool_thread_num) - .set_max_threads(config::doris_scanner_thread_pool_thread_num) - .build(&_group_local_scan_thread_pool)); - for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { - static_cast(_group_local_scan_thread_pool->submit_func([this] { - this->_task_group_scanner_scan(this, _task_group_local_scan_queue.get()); - })); - } _register_metrics(); _is_init = true; return Status::OK(); @@ -251,13 +234,6 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); - } else if (ctx->get_task_group() && config::enable_workload_group_for_scan) { - auto work_func = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - taskgroup::ScanTask scan_task = { - work_func, ctx, ctx->get_task_group()->local_scan_task_entity(), nice}; - ret = _task_group_local_scan_queue->push_back(scan_task); } else { PriorityThreadPool::Task task; task.work_function = [this, scanner = *iter, ctx] { @@ -427,22 +403,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ctx->push_back_scanner_and_reschedule(scanner); } -void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, - taskgroup::ScanTaskTaskGroupQueue* scan_queue) { - while (!_is_closed) { - taskgroup::ScanTask scan_task; - auto success = scan_queue->take(&scan_task); - if (success) { - int64_t time_spent = 0; - { - SCOPED_RAW_TIMER(&time_spent); - scan_task.scan_func(); - } - scan_queue->update_statistics(scan_task, time_spent); - } - } -} - void ScannerScheduler::_register_metrics() { REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size, [this]() { return _local_scan_thread_pool->get_queue_size(); }); @@ -456,10 +416,6 @@ void ScannerScheduler::_register_metrics() { [this]() { return _limited_scan_thread_pool->get_queue_size(); }); REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, [this]() { return _limited_scan_thread_pool->num_threads(); }); - REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size, - [this]() { return _group_local_scan_thread_pool->get_queue_size(); }) - REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num, - [this]() { return _group_local_scan_thread_pool->num_threads(); }); } void ScannerScheduler::_deregister_metrics() { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 91d341613dfce1..eb4d1380e3947c 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -21,7 +21,6 @@ #include #include "common/status.h" -#include "scan_task_queue.h" #include "util/threadpool.h" #include "vec/exec/scan/vscanner.h" @@ -31,9 +30,7 @@ class ExecEnv; namespace vectorized { class VScanner; } // namespace vectorized -namespace taskgroup { -class ScanTaskTaskGroupQueue; -} + template class BlockingQueue; } // namespace doris @@ -72,9 +69,6 @@ class ScannerScheduler { std::unique_ptr new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); - taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() { - return _task_group_local_scan_queue.get(); - } int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; } @@ -87,8 +81,6 @@ class ScannerScheduler { void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr ctx, VScannerSPtr scanner); - void _task_group_scanner_scan(ScannerScheduler* scheduler, - taskgroup::ScanTaskTaskGroupQueue* scan_queue); void _register_metrics(); static void _deregister_metrics(); @@ -115,9 +107,6 @@ class ScannerScheduler { std::unique_ptr _remote_scan_thread_pool; std::unique_ptr _limited_scan_thread_pool; - std::unique_ptr _task_group_local_scan_queue; - std::unique_ptr _group_local_scan_thread_pool; - // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false;