Skip to content

Commit

Permalink
Add workload group resource usage
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Aug 9, 2024
1 parent eea9446 commit f2037d0
Show file tree
Hide file tree
Showing 22 changed files with 338 additions and 48 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "exec/schema_scanner/schema_variables_scanner.h"
#include "exec/schema_scanner/schema_views_scanner.h"
#include "exec/schema_scanner/schema_workload_group_privileges.h"
#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h"
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
Expand Down Expand Up @@ -230,6 +231,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaTableOptionsScanner::create_unique();
case TSchemaTableType::SCH_WORKLOAD_GROUP_PRIVILEGES:
return SchemaWorkloadGroupPrivilegesScanner::create_unique();
case TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE:
return SchemaBackendWorkloadGroupResourceUsage::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace doris {
std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(StringRef), false},
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h"

#include <iomanip>
#include <iostream>

#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"MEMORY_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CPU_USAGE", TYPE_VARCHAR, sizeof(StringRef), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE) {}

SchemaBackendWorkloadGroupResourceUsage::~SchemaBackendWorkloadGroupResourceUsage() {}

Status SchemaBackendWorkloadGroupResourceUsage::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
return Status::OK();
}

Status SchemaBackendWorkloadGroupResourceUsage::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

if (_block == nullptr) {
_block = vectorized::Block::create_unique();

for (int i = 0; i < _s_tbls_columns.size(); ++i) {
TypeDescriptor descriptor(_s_tbls_columns[i].type);
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type,
_s_tbls_columns[i].name));
}

ExecEnv::GetInstance()->workload_group_mgr()->get_wg_resource_usage(_block.get());
_total_rows = _block->rows();
}

if (_row_idx == _total_rows) {
*eos = true;
return Status::OK();
}

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
return Status::OK();
}

} // namespace doris
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <vector>

#include "common/status.h"
#include "exec/schema_scanner.h"

namespace doris {
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized

class SchemaBackendWorkloadGroupResourceUsage : public SchemaScanner {
ENABLE_FACTORY_CREATOR(SchemaBackendWorkloadGroupResourceUsage);

public:
SchemaBackendWorkloadGroupResourceUsage();
~SchemaBackendWorkloadGroupResourceUsage() override;

Status start(RuntimeState* state) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

private:
int _block_rows_limit = 4096;
int _row_idx = 0;
int _total_rows = 0;
std::unique_ptr<vectorized::Block> _block = nullptr;
};
}; // namespace doris
3 changes: 3 additions & 0 deletions be/src/io/fs/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void BeConfDataDirReader::init_be_conf_data_dir(
data_dir_info.path = store_paths[i].path;
data_dir_info.storage_medium = store_paths[i].storage_medium;
data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR;
data_dir_info.bvar_name = "local_data_dir_" + std::to_string(i);
be_config_data_dir_list.push_back(data_dir_info);
}

Expand All @@ -75,6 +76,7 @@ void BeConfDataDirReader::init_be_conf_data_dir(
data_dir_info.path = spill_store_paths[i].path;
data_dir_info.storage_medium = spill_store_paths[i].storage_medium;
data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR;
data_dir_info.bvar_name = "spill_data_dir_" + std::to_string(i);
be_config_data_dir_list.push_back(data_dir_info);
}

Expand All @@ -83,6 +85,7 @@ void BeConfDataDirReader::init_be_conf_data_dir(
data_dir_info.path = cache_paths[i].path;
data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE;
data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR;
data_dir_info.bvar_name = "local_cache_dir_" + std::to_string(i);
be_config_data_dir_list.push_back(data_dir_info);
}
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<ui
}
}
SCOPED_RAW_TIMER(&_write_memtable_time);
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (_state && _state->get_query_ctx()) {
_state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time());
}
}};
return _memtable_writer->write(block, row_idxs);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct DataDirInfo {
bool is_used = false; // whether available mark
TStorageMedium::type storage_medium = TStorageMedium::HDD; // Storage medium type: SSD|HDD
DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR;
std::string bvar_name;
};
struct PredicateFilterInfo {
int type = 0;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ Status PipelineTask::execute(bool* eos) {
if (cpu_qs) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
query_context()->update_wg_cpu_adder(delta_cpu_time);
}};
if (_wait_to_start()) {
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ class QueryContext {
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;

void update_wg_cpu_adder(int64_t delta_cpu_time) {
if (_workload_group != nullptr) {
_workload_group->update_cpu_adder(delta_cpu_time);
}
}

private:
int _timeout_second;
TUniqueId _query_id;
Expand Down
36 changes: 22 additions & 14 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,22 @@
__VA_ARGS__; \
} while (0)

#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
if (auto* t_ctx = doris::thread_context(true)) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
} \
} \
#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_local_scan_io_adder(*bytes_read); \
} \
} \
}

#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
Expand Down Expand Up @@ -276,6 +278,12 @@ class ThreadContext {
return nullptr;
}

void update_total_local_scan_io_adder(size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_total_local_scan_io_adder(bytes_read);
}
}

int thread_local_handle_count = 0;
int skip_memory_check = 0;
int skip_large_memory_check = 0;
Expand Down
28 changes: 25 additions & 3 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,18 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>();
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>();
_scan_io_throttle_map[data_dir.path] =
std::make_shared<IOThrottle>(_name, data_dir.bvar_name + "_read_bytes");
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>(_name, "remote_read_bytes");
_mem_used_status = std::make_unique<bvar::Status<int64_t>>(_name, "memory_used", 0);
_cpu_usage_adder = std::make_unique<bvar::Adder<uint64_t>>(_name, "cpu_usage_adder");
_cpu_usage_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<uint64_t>>>(
_name, "cpu_usage", _cpu_usage_adder.get(), 10);
_total_local_scan_io_adder =
std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes");
_total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1);
}

std::string WorkloadGroup::debug_string() const {
Expand Down Expand Up @@ -136,6 +145,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
}
}
refresh_memory(used_memory);
_mem_used_status->set_value(used_memory);
return used_memory;
}

Expand Down Expand Up @@ -585,6 +595,18 @@ std::shared_ptr<IOThrottle> WorkloadGroup::get_remote_scan_io_throttle() {
return _remote_scan_io_throttle;
}

void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) {
(*_cpu_usage_adder) << (uint64_t)delta_cpu_time;
}

void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) {
(*_total_local_scan_io_adder) << scan_bytes;
}

int64_t WorkloadGroup::get_remote_scan_bytes_per_second() {
return _remote_scan_io_throttle->get_bvar_io_per_second();
}

void WorkloadGroup::try_stop_schedulers() {
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
if (_task_sched) {
Expand Down
19 changes: 19 additions & 0 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <bvar/bvar.h>
#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
Expand Down Expand Up @@ -195,6 +196,17 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info);

void update_cpu_adder(int64_t delta_cpu_time);

void update_total_local_scan_io_adder(size_t scan_bytes);

int64_t get_mem_used() { return _mem_used_status->get_value(); }
uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); }
int64_t get_local_scan_bytes_per_second() {
return _total_local_scan_io_per_second->get_value();
}
int64_t get_remote_scan_bytes_per_second();

private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
const uint64_t _id;
Expand Down Expand Up @@ -234,6 +246,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};

// bvar metric
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _cpu_usage_per_second;
std::unique_ptr<bvar::Adder<size_t>> _total_local_scan_io_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _total_local_scan_io_per_second;
};

using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
Expand Down
Loading

0 comments on commit f2037d0

Please sign in to comment.