Skip to content

Commit

Permalink
+1
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Aug 10, 2024
1 parent f2037d0 commit f3f42f0
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"MEMORY_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CPU_USAGE", TYPE_VARCHAR, sizeof(StringRef), false},
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
};
Expand Down
7 changes: 0 additions & 7 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,6 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<ui
}
}
SCOPED_RAW_TIMER(&_write_memtable_time);
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (_state && _state->get_query_ctx()) {
_state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time());
}
}};
return _memtable_writer->write(block, row_idxs);
}

Expand Down
11 changes: 5 additions & 6 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
nullable_column->get_null_map_data().emplace_back(0);
};

auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) {
auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
double_val);
nullable_column->get_null_map_data().emplace_back(0);
};

Expand All @@ -295,10 +295,9 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {

double cpu_usage_p =
(double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100;
std::stringstream cpu_usage_ss;
cpu_usage_ss << std::fixed << std::setprecision(2) << cpu_usage_p << "%";
cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0;

insert_string_value(3, cpu_usage_ss.str(), block);
insert_double_value(3, cpu_usage_p, block);

insert_int_value(4, wg->get_local_scan_bytes_per_second(), block);
insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block);
Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime/workload_management/io_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "runtime/workload_management/io_throttle.h"

#include "util/defer_op.h"
#include "util/time.h"

namespace doris {
Expand Down Expand Up @@ -56,6 +57,11 @@ bool IOThrottle::try_acquire() {
}

void IOThrottle::update_next_io_time(int64_t io_bytes) {
Defer defer {[&]() {
if (io_bytes > 0) {
(*_io_adder) << io_bytes;
}
}};
if (_io_bytes_per_second_limit <= 0 || io_bytes <= 0) {
return;
}
Expand All @@ -72,7 +78,6 @@ void IOThrottle::update_next_io_time(int64_t io_bytes) {
}
_next_io_time_micros += ret < 1 ? static_cast<int64_t>(1) : static_cast<int64_t>(ret);
}
(*_io_adder) << io_bytes;
}

void IOThrottle::set_io_bytes_per_second(int64_t io_bytes_per_second) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
DCHECK(_dependency);
if (_writer_status.ok()) {
while (true) {
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (state && state->get_query_ctx()) {
state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time());
}
}};
if (!_eos && _data_queue.empty() && _writer_status.ok()) {
std::unique_lock l(_m);
while (!_eos && _data_queue.empty() && _writer_status.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ public class SchemaTable extends Table {
builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("MEMORY_USAGE_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("CPU_USAGE", ScalarType.createVarchar(256))
.column("CPU_USAGE_PERCENT", ScalarType.createType(PrimitiveType.DOUBLE))
.column("LOCAL_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ triggers
user_privileges
views
workload_group_privileges
workload_group_resource_usage
workload_groups
workload_policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ triggers
user_privileges
views
workload_group_privileges
workload_group_resource_usage
workload_groups
workload_policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ triggers
user_privileges
views
workload_group_privileges
workload_group_resource_usage
workload_groups
workload_policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ triggers
user_privileges
views
workload_group_privileges
workload_group_resource_usage
workload_groups
workload_policy

Expand Down

0 comments on commit f3f42f0

Please sign in to comment.