Skip to content

Commit

Permalink
[chore] remove duplicate code in schema scanner (#40456)
Browse files Browse the repository at this point in the history
## Proposed changes

Issue Number: close #xxx

remove duplicate code in schema scanner
  • Loading branch information
Vallishp authored Sep 6, 2024
1 parent 83e31d9 commit c6f5fcd
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 119 deletions.
53 changes: 21 additions & 32 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/schema_scanner/schema_active_queries_scanner.h"

#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -98,41 +99,29 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
}
}

// todo(wb) reuse this callback function
auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
};
auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
int_val);
nullable_column->get_null_map_data().emplace_back(0);
};

for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];

insert_string_value(0, row.column_value[0].stringVal, _active_query_block.get());
insert_string_value(1, row.column_value[1].stringVal, _active_query_block.get());
insert_int_value(2, row.column_value[2].longVal, _active_query_block.get());
insert_int_value(3, row.column_value[3].longVal, _active_query_block.get());
insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get());
insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get());
insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get());
insert_string_value(7, row.column_value[7].stringVal, _active_query_block.get());
insert_string_value(8, row.column_value[8].stringVal, _active_query_block.get());
insert_string_value(9, row.column_value[9].stringVal, _active_query_block.get());
SchemaScannerHelper::insert_string_value(0, row.column_value[0].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(1, row.column_value[1].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_int_value(2, row.column_value[2].longVal,
_active_query_block.get());
SchemaScannerHelper::insert_int_value(3, row.column_value[3].longVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(4, row.column_value[4].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(5, row.column_value[5].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(6, row.column_value[6].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(7, row.column_value[7].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(8, row.column_value[8].stringVal,
_active_query_block.get());
SchemaScannerHelper::insert_string_value(9, row.column_value[9].stringVal,
_active_query_block.get());
}
return Status::OK();
}
Expand Down
28 changes: 4 additions & 24 deletions be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/schema_scanner/schema_routine_scanner.h"

#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -99,28 +100,6 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
return Status::InternalError<false>("routine table schema is not match for FE and BE");
}
}
auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
};
auto insert_datetime_value = [&](int col_index, const std::vector<void*>& datas,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
auto data = datas[0];
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
nullable_column->get_null_map_data().emplace_back(0);
};

for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];
Expand All @@ -132,9 +111,10 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
src[0].from_date_str(row.column_value[j].stringVal.data(),
row.column_value[j].stringVal.size());
datas[0] = src;
insert_datetime_value(j, datas, _routines_block.get());
SchemaScannerHelper::insert_datetime_value(j, datas, _routines_block.get());
} else {
insert_string_value(j, row.column_value[j].stringVal, _routines_block.get());
SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal,
_routines_block.get());
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion be/src/exec/schema_scanner/schema_scanner_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "schema_scanner_helper.h"
#include "exec/schema_scanner/schema_scanner_helper.h"

#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -59,4 +59,15 @@ void SchemaScannerHelper::insert_int_value(int col_index, int64_t int_val,
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(int_val);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_double_value(int col_index, double double_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
double_val);
nullable_column->get_null_map_data().emplace_back(0);
}
} // namespace doris
1 change: 1 addition & 0 deletions be/src/exec/schema_scanner/schema_scanner_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SchemaScannerHelper {
vectorized::Block* block);

static void insert_int_value(int col_index, int64_t int_val, vectorized::Block* block);
static void insert_double_value(int col_index, double double_val, vectorized::Block* block);
};

} // namespace doris
Expand Down
47 changes: 13 additions & 34 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <vector>

#include "common/logging.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
Expand Down Expand Up @@ -516,51 +517,29 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;

auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
int_val);
nullable_column->get_null_map_data().emplace_back(0);
};

auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
};

// block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
TQueryStatistics tqs;
qs_ctx_ptr->collect_query_statistics(&tqs);
insert_int_value(0, be_id, block);
insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block);
insert_string_value(2, query_id, block);
SchemaScannerHelper::insert_int_value(0, be_id, block);
SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block);
SchemaScannerHelper::insert_string_value(2, query_id, block);

int64_t task_time = qs_ctx_ptr->_is_query_finished
? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time
: MonotonicMillis() - qs_ctx_ptr->_query_start_time;
insert_int_value(3, task_time, block);
insert_int_value(4, tqs.cpu_ms, block);
insert_int_value(5, tqs.scan_rows, block);
insert_int_value(6, tqs.scan_bytes, block);
insert_int_value(7, tqs.max_peak_memory_bytes, block);
insert_int_value(8, tqs.current_used_memory_bytes, block);
insert_int_value(9, tqs.shuffle_send_bytes, block);
insert_int_value(10, tqs.shuffle_send_rows, block);
SchemaScannerHelper::insert_int_value(3, task_time, block);
SchemaScannerHelper::insert_int_value(4, tqs.cpu_ms, block);
SchemaScannerHelper::insert_int_value(5, tqs.scan_rows, block);
SchemaScannerHelper::insert_int_value(6, tqs.scan_bytes, block);
SchemaScannerHelper::insert_int_value(7, tqs.max_peak_memory_bytes, block);
SchemaScannerHelper::insert_int_value(8, tqs.current_used_memory_bytes, block);
SchemaScannerHelper::insert_int_value(9, tqs.shuffle_send_bytes, block);
SchemaScannerHelper::insert_int_value(10, tqs.shuffle_send_rows, block);

std::stringstream ss;
ss << qs_ctx_ptr->_query_type;
insert_string_value(11, ss.str(), block);
SchemaScannerHelper::insert_string_value(11, ss.str(), block);
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>

#include "common/logging.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "io/fs/local_file_reader.h"
#include "olap/storage_engine.h"
#include "pipeline/task_queue.h"
Expand Down
35 changes: 7 additions & 28 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <mutex>
#include <unordered_map>

#include "exec/schema_scanner/schema_scanner_helper.h"
#include "pipeline/task_scheduler.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
Expand Down Expand Up @@ -246,28 +247,6 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
}

void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
int_val);
nullable_column->get_null_map_data().emplace_back(0);
};

auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
double_val);
nullable_column->get_null_map_data().emplace_back(0);
};

int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
int cpu_num = CpuInfo::num_cores();
cpu_num = cpu_num <= 0 ? 1 : cpu_num;
Expand All @@ -276,18 +255,18 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
block->reserve(_workload_groups.size());
for (const auto& [id, wg] : _workload_groups) {
insert_int_value(0, be_id, block);
insert_int_value(1, wg->id(), block);
insert_int_value(2, wg->get_mem_used(), block);
SchemaScannerHelper::insert_int_value(0, be_id, block);
SchemaScannerHelper::insert_int_value(1, wg->id(), block);
SchemaScannerHelper::insert_int_value(2, wg->get_mem_used(), block);

double cpu_usage_p =
(double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100;
cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0;

insert_double_value(3, cpu_usage_p, block);
SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block);

insert_int_value(4, wg->get_local_scan_bytes_per_second(), block);
insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int_value(4, wg->get_local_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block);
}
}

Expand Down

0 comments on commit c6f5fcd

Please sign in to comment.