diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 2ddb3db295b487..a1021b616cc9ac 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -49,6 +49,7 @@ #include "exec/schema_scanner/schema_variables_scanner.h" #include "exec/schema_scanner/schema_views_scanner.h" #include "exec/schema_scanner/schema_workload_group_privileges.h" +#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h" #include "exec/schema_scanner/schema_workload_groups_scanner.h" #include "exec/schema_scanner/schema_workload_sched_policy_scanner.h" #include "olap/hll.h" @@ -230,6 +231,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaTableOptionsScanner::create_unique(); case TSchemaTableType::SCH_WORKLOAD_GROUP_PRIVILEGES: return SchemaWorkloadGroupPrivilegesScanner::create_unique(); + case TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE: + return SchemaBackendWorkloadGroupResourceUsage::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index b35e84a9f9c9f4..74e95f4203217c 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -27,7 +27,7 @@ namespace doris { std::vector SchemaBackendActiveTasksScanner::_s_tbls_columns = { // name, type, size - {"BE_ID", TYPE_BIGINT, sizeof(StringRef), false}, + {"BE_ID", TYPE_BIGINT, sizeof(int64_t), false}, {"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false}, {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, {"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, diff --git a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp new file mode 100644 index 00000000000000..5f9814383eae4e --- /dev/null +++ b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h" + +#include +#include + +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "runtime/workload_group/workload_group_manager.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector SchemaBackendWorkloadGroupResourceUsage::_s_tbls_columns = { + // name, type, size + {"BE_ID", TYPE_BIGINT, sizeof(int64_t), false}, + {"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false}, + {"MEMORY_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"CPU_USAGE", TYPE_VARCHAR, sizeof(StringRef), false}, + {"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false}, + {"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false}, +}; + +SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE) {} + +SchemaBackendWorkloadGroupResourceUsage::~SchemaBackendWorkloadGroupResourceUsage() {} + +Status SchemaBackendWorkloadGroupResourceUsage::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + return Status::OK(); +} + +Status SchemaBackendWorkloadGroupResourceUsage::get_next_block_internal(vectorized::Block* block, + bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_block == nullptr) { + _block = vectorized::Block::create_unique(); + + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = + vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type, + _s_tbls_columns[i].name)); + } + + ExecEnv::GetInstance()->workload_group_mgr()->get_wg_resource_usage(_block.get()); + _total_rows = _block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows)); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.h b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.h new file mode 100644 index 00000000000000..236dd69999fbb3 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaBackendWorkloadGroupResourceUsage : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaBackendWorkloadGroupResourceUsage); + +public: + SchemaBackendWorkloadGroupResourceUsage(); + ~SchemaBackendWorkloadGroupResourceUsage() override; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _block = nullptr; +}; +}; // namespace doris \ No newline at end of file diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index 35cf2cc627e5e7..b4f144a633048e 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -67,6 +67,7 @@ void BeConfDataDirReader::init_be_conf_data_dir( data_dir_info.path = store_paths[i].path; data_dir_info.storage_medium = store_paths[i].storage_medium; data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR; + data_dir_info.bvar_name = "local_data_dir_" + std::to_string(i); be_config_data_dir_list.push_back(data_dir_info); } @@ -75,6 +76,7 @@ void BeConfDataDirReader::init_be_conf_data_dir( data_dir_info.path = spill_store_paths[i].path; data_dir_info.storage_medium = spill_store_paths[i].storage_medium; data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR; + data_dir_info.bvar_name = "spill_data_dir_" + std::to_string(i); be_config_data_dir_list.push_back(data_dir_info); } @@ -83,6 +85,7 @@ void BeConfDataDirReader::init_be_conf_data_dir( data_dir_info.path = cache_paths[i].path; data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE; data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR; + data_dir_info.bvar_name = "local_cache_dir_" + std::to_string(i); be_config_data_dir_list.push_back(data_dir_info); } } diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 4b9f4231bf1745..92261e767aaf11 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -163,6 +163,13 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vectorget_query_ctx()) { + _state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time()); + } + }}; return _memtable_writer->write(block, row_idxs); } diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index df2c3b3e145c57..b6e336722f3eeb 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -71,6 +71,7 @@ struct DataDirInfo { bool is_used = false; // whether available mark TStorageMedium::type storage_medium = TStorageMedium::HDD; // Storage medium type: SSD|HDD DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR; + std::string bvar_name; }; struct PredicateFilterInfo { int type = 0; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index da581629e21940..8692075622a906 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -302,6 +302,7 @@ Status PipelineTask::execute(bool* eos) { if (cpu_qs) { cpu_qs->add_cpu_nanos(delta_cpu_time); } + query_context()->update_wg_cpu_adder(delta_cpu_time); }}; if (_wait_to_start()) { return Status::OK(); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1bbccfd33b7403..3241010c20ede5 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -250,6 +250,12 @@ class QueryContext { // only for file scan node std::map file_scan_range_params_map; + void update_wg_cpu_adder(int64_t delta_cpu_time) { + if (_workload_group != nullptr) { + _workload_group->update_cpu_adder(delta_cpu_time); + } + } + private: int _timeout_second; TUniqueId _query_id; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 8cf6892dd39e8a..c54b1a6892bd9e 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -114,20 +114,22 @@ __VA_ARGS__; \ } while (0) -#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ - std::shared_ptr iot = nullptr; \ - if (auto* t_ctx = doris::thread_context(true)) { \ - iot = t_ctx->get_local_scan_io_throttle(data_dir); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - } \ - } \ +#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ + std::shared_ptr iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx) { \ + iot = t_ctx->get_local_scan_io_throttle(data_dir); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->update_total_local_scan_io_adder(*bytes_read); \ + } \ + } \ } #define LIMIT_REMOTE_SCAN_IO(bytes_read) \ @@ -276,6 +278,12 @@ class ThreadContext { return nullptr; } + void update_total_local_scan_io_adder(size_t bytes_read) { + if (std::shared_ptr wg_ptr = _wg_wptr.lock()) { + wg_ptr->update_total_local_scan_io_adder(bytes_read); + } + } + int thread_local_handle_count = 0; int skip_memory_check = 0; int skip_large_memory_check = 0; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index dffaf3a940c68c..b2f9541231a30c 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -69,9 +69,18 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) { std::vector& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { - _scan_io_throttle_map[data_dir.path] = std::make_shared(); - } - _remote_scan_io_throttle = std::make_shared(); + _scan_io_throttle_map[data_dir.path] = + std::make_shared(_name, data_dir.bvar_name + "_read_bytes"); + } + _remote_scan_io_throttle = std::make_shared(_name, "remote_read_bytes"); + _mem_used_status = std::make_unique>(_name, "memory_used", 0); + _cpu_usage_adder = std::make_unique>(_name, "cpu_usage_adder"); + _cpu_usage_per_second = std::make_unique>>( + _name, "cpu_usage", _cpu_usage_adder.get(), 10); + _total_local_scan_io_adder = + std::make_unique>(_name, "total_local_read_bytes"); + _total_local_scan_io_per_second = std::make_unique>>( + _name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1); } std::string WorkloadGroup::debug_string() const { @@ -136,6 +145,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots( } } refresh_memory(used_memory); + _mem_used_status->set_value(used_memory); return used_memory; } @@ -585,6 +595,18 @@ std::shared_ptr WorkloadGroup::get_remote_scan_io_throttle() { return _remote_scan_io_throttle; } +void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) { + (*_cpu_usage_adder) << (uint64_t)delta_cpu_time; +} + +void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) { + (*_total_local_scan_io_adder) << scan_bytes; +} + +int64_t WorkloadGroup::get_remote_scan_bytes_per_second() { + return _remote_scan_io_throttle->get_bvar_io_per_second(); +} + void WorkloadGroup::try_stop_schedulers() { std::lock_guard wlock(_task_sched_lock); if (_task_sched) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 0cb8355400815c..3561098b6ce29c 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -195,6 +196,17 @@ class WorkloadGroup : public std::enable_shared_from_this { void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info); + void update_cpu_adder(int64_t delta_cpu_time); + + void update_total_local_scan_io_adder(size_t scan_bytes); + + int64_t get_mem_used() { return _mem_used_status->get_value(); } + uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); } + int64_t get_local_scan_bytes_per_second() { + return _total_local_scan_io_per_second->get_value(); + } + int64_t get_remote_scan_bytes_per_second(); + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -234,6 +246,13 @@ class WorkloadGroup : public std::enable_shared_from_this { std::map> _scan_io_throttle_map; std::shared_ptr _remote_scan_io_throttle {nullptr}; + + // bvar metric + std::unique_ptr> _mem_used_status; + std::unique_ptr> _cpu_usage_adder; + std::unique_ptr>> _cpu_usage_per_second; + std::unique_ptr> _total_local_scan_io_adder; + std::unique_ptr>> _total_local_scan_io_per_second; }; using WorkloadGroupPtr = std::shared_ptr; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 6a196497e724a8..96ef36b1da5fc0 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -28,6 +28,7 @@ #include "util/mem_info.h" #include "util/threadpool.h" #include "util/time.h" +#include "vec/core/block.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris { @@ -257,6 +258,53 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { } } +void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { + auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + int_val); + nullable_column->get_null_map_data().emplace_back(0); + }; + + auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); + }; + + int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + int cpu_num = CpuInfo::num_cores(); + cpu_num = cpu_num <= 0 ? 1 : cpu_num; + uint64_t total_cpu_time_ns_per_second = cpu_num * 1000000000ll; + + std::shared_lock r_lock(_group_mutex); + block->reserve(_workload_groups.size()); + for (const auto& [id, wg] : _workload_groups) { + insert_int_value(0, be_id, block); + insert_int_value(1, wg->id(), block); + insert_int_value(2, wg->get_mem_used(), block); + + double cpu_usage_p = + (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; + std::stringstream cpu_usage_ss; + cpu_usage_ss << std::fixed << std::setprecision(2) << cpu_usage_p << "%"; + + insert_string_value(3, cpu_usage_ss.str(), block); + + insert_int_value(4, wg->get_local_scan_bytes_per_second(), block); + insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block); + } +} + void WorkloadGroupMgr::stop() { for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index f7f02bf63e6997..15740d061adc94 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -27,6 +27,10 @@ namespace doris { class CgroupCpuCtl; +namespace vectorized { +class Block; +} // namespace vectorized + namespace pipeline { class TaskScheduler; class MultiCoreTaskQueue; @@ -56,6 +60,8 @@ class WorkloadGroupMgr { void refresh_wg_weighted_memory_limit(); + void get_wg_resource_usage(vectorized::Block* block); + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups; diff --git a/be/src/runtime/workload_management/io_throttle.cpp b/be/src/runtime/workload_management/io_throttle.cpp index 3a8256eee3746d..4833f2ec54cdd2 100644 --- a/be/src/runtime/workload_management/io_throttle.cpp +++ b/be/src/runtime/workload_management/io_throttle.cpp @@ -21,8 +21,14 @@ namespace doris { +IOThrottle::IOThrottle(std::string prefix, std::string name) { + _io_adder = std::make_unique>(prefix, name); + _io_adder_per_second = std::make_unique>>( + prefix, name + "_per_second", _io_adder.get(), 1); +} + bool IOThrottle::acquire(int64_t block_timeout_ms) { - if (_io_bytes_per_second < 0) { + if (_io_bytes_per_second_limit < 0) { return true; } @@ -42,7 +48,7 @@ bool IOThrottle::acquire(int64_t block_timeout_ms) { } bool IOThrottle::try_acquire() { - if (_io_bytes_per_second < 0) { + if (_io_bytes_per_second_limit < 0) { return true; } std::unique_lock w_lock(_mutex); @@ -50,24 +56,27 @@ bool IOThrottle::try_acquire() { } void IOThrottle::update_next_io_time(int64_t io_bytes) { - if (_io_bytes_per_second <= 0 || io_bytes <= 0) { + if (_io_bytes_per_second_limit <= 0 || io_bytes <= 0) { return; } - int64_t read_bytes_per_second = _io_bytes_per_second; - std::unique_lock w_lock(_mutex); - double io_bytes_float = static_cast(io_bytes); - double ret = (io_bytes_float / static_cast(read_bytes_per_second)) * - static_cast(MICROS_PER_SEC); - int64_t current_time = GetCurrentTimeMicros(); + int64_t read_bytes_per_second = _io_bytes_per_second_limit; + { + std::unique_lock w_lock(_mutex); + double io_bytes_float = static_cast(io_bytes); + double ret = (io_bytes_float / static_cast(read_bytes_per_second)) * + static_cast(MICROS_PER_SEC); + int64_t current_time = GetCurrentTimeMicros(); - if (current_time > _next_io_time_micros) { - _next_io_time_micros = current_time; + if (current_time > _next_io_time_micros) { + _next_io_time_micros = current_time; + } + _next_io_time_micros += ret < 1 ? static_cast(1) : static_cast(ret); } - _next_io_time_micros += ret < 1 ? static_cast(1) : static_cast(ret); + (*_io_adder) << io_bytes; } void IOThrottle::set_io_bytes_per_second(int64_t io_bytes_per_second) { - _io_bytes_per_second = io_bytes_per_second; + _io_bytes_per_second_limit = io_bytes_per_second; } }; // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/io_throttle.h b/be/src/runtime/workload_management/io_throttle.h index 691255d23c48c4..ce62c65d7a9eeb 100644 --- a/be/src/runtime/workload_management/io_throttle.h +++ b/be/src/runtime/workload_management/io_throttle.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include @@ -25,16 +26,9 @@ namespace doris { -class IOThrottle; - -struct IOThrottleCtx { - IOThrottle* io_throttle = nullptr; - int io_block_timeout; -}; - class IOThrottle { public: - IOThrottle() = default; + IOThrottle(std::string prefix, std::string name); ~IOThrottle() = default; @@ -47,12 +41,16 @@ class IOThrottle { void set_io_bytes_per_second(int64_t read_bytes_per_second); - int64_t get_io_bytes_per_second() { return _io_bytes_per_second; } + size_t get_bvar_io_per_second() { return _io_adder_per_second->get_value(); } private: std::mutex _mutex; std::condition_variable wait_condition; int64_t _next_io_time_micros {0}; - std::atomic _io_bytes_per_second {10485760}; + std::atomic _io_bytes_per_second_limit {10485760}; + + // bvar monitor + std::unique_ptr> _io_adder; + std::unique_ptr>> _io_adder_per_second; }; }; // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index f4210b79ea803e..6232be473026a6 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -262,4 +262,13 @@ void VScanner::_collect_profile_before_close() { _state->update_num_rows_load_unselected(_counter.num_rows_unselected); } +void VScanner::update_scan_cpu_timer() { + int64_t cpu_time = _cpu_watch.elapsed_time(); + _scan_cpu_timer += cpu_time; + _query_statistics->add_cpu_nanos(cpu_time); + if (_state && _state->get_query_ctx()) { + _state->get_query_ctx()->update_wg_cpu_adder(cpu_time); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 5eae2a089544cf..19c37f6fc21e55 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -127,11 +127,7 @@ class VScanner { int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } - void update_scan_cpu_timer() { - int64_t cpu_time = _cpu_watch.elapsed_time(); - _scan_cpu_timer += cpu_time; - _query_statistics->add_cpu_nanos(cpu_time); - } + void update_scan_cpu_timer(); RuntimeState* runtime_state() { return _state; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index d0a3a3728b2d6b..7c661861b3fbb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -80,7 +80,10 @@ public enum SchemaTableType { SCH_TABLE_OPTIONS("TABLE_OPTIONS", "TABLE_OPTIONS", TSchemaTableType.SCH_TABLE_OPTIONS), SCH_WORKLOAD_GROUP_PRIVILEGES("WORKLOAD_GROUP_PRIVILEGES", - "WORKLOAD_GROUP_PRIVILEGES", TSchemaTableType.SCH_WORKLOAD_GROUP_PRIVILEGES); + "WORKLOAD_GROUP_PRIVILEGES", TSchemaTableType.SCH_WORKLOAD_GROUP_PRIVILEGES), + + SCH_WORKLOAD_GROUP_RESOURCE_USAGE("WORKLOAD_GROUP_RESOURCE_USAGE", + "WORKLOAD_GROUP_RESOURCE_USAGE", TSchemaTableType.SCH_WORKLOAD_GROUP_RESOURCE_USAGE); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 8802d2665269e7..26db6f67afdf80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -540,6 +540,16 @@ public class SchemaTable extends Table { .column("IS_GRANTABLE", ScalarType.createVarchar(IS_GRANTABLE_LEN)) .build()) ) + .put("workload_group_resource_usage", + new SchemaTable(SystemIdGenerator.getNextId(), "workload_group_resource_usage", TableType.SCHEMA, + builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("MEMORY_USAGE_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("CPU_USAGE", ScalarType.createVarchar(256)) + .column("LOCAL_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) + .column("REMOTE_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) + .build()) + ) .build(); private boolean fetchAllFe = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index ab2798e2ba799a..cf5c85e98b7d85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -65,6 +65,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { BEACKEND_ID_COLUMN_SET.add("backend_id"); BACKEND_TABLE.add("backend_active_tasks"); + BACKEND_TABLE.add("workload_group_resource_usage"); BEACKEND_ID_COLUMN_SET.add("be_id"); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 20042adc42ebd0..f97d09f68a5d1c 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -134,7 +134,8 @@ enum TSchemaTableType { SCH_PROCS_PRIV, SCH_WORKLOAD_POLICY, SCH_TABLE_OPTIONS, - SCH_WORKLOAD_GROUP_PRIVILEGES; + SCH_WORKLOAD_GROUP_PRIVILEGES, + SCH_WORKLOAD_GROUP_RESOURCE_USAGE; } enum THdfsCompression {