diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index 46522a36242fc1..2b516fc6fdac2b 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -17,6 +17,7 @@ #include "exec/schema_scanner/schema_active_queries_scanner.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -98,41 +99,29 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { } } - // todo(wb) reuse this callback function - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; - insert_string_value(0, row.column_value[0].stringVal, _active_query_block.get()); - insert_string_value(1, row.column_value[1].stringVal, _active_query_block.get()); - insert_int_value(2, row.column_value[2].longVal, _active_query_block.get()); - insert_int_value(3, row.column_value[3].longVal, _active_query_block.get()); - insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get()); - insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get()); - insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get()); - insert_string_value(7, row.column_value[7].stringVal, _active_query_block.get()); - insert_string_value(8, row.column_value[8].stringVal, _active_query_block.get()); - insert_string_value(9, row.column_value[9].stringVal, _active_query_block.get()); + SchemaScannerHelper::insert_string_value(0, row.column_value[0].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(1, row.column_value[1].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_int_value(2, row.column_value[2].longVal, + _active_query_block.get()); + SchemaScannerHelper::insert_int_value(3, row.column_value[3].longVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(4, row.column_value[4].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(5, row.column_value[5].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(6, row.column_value[6].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(7, row.column_value[7].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(8, row.column_value[8].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(9, row.column_value[9].stringVal, + _active_query_block.get()); } return Status::OK(); } diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp index 8c263c99d2d6c8..adb18450f26490 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -17,6 +17,7 @@ #include "exec/schema_scanner/schema_routine_scanner.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -99,28 +100,6 @@ Status SchemaRoutinesScanner::get_block_from_fe() { return Status::InternalError("routine table schema is not match for FE and BE"); } } - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - auto insert_datetime_value = [&](int col_index, const std::vector& datas, - vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - auto data = datas[0]; - reinterpret_cast*>(col_ptr)->insert_data( - reinterpret_cast(data), 0); - nullable_column->get_null_map_data().emplace_back(0); - }; for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; @@ -132,9 +111,10 @@ Status SchemaRoutinesScanner::get_block_from_fe() { src[0].from_date_str(row.column_value[j].stringVal.data(), row.column_value[j].stringVal.size()); datas[0] = src; - insert_datetime_value(j, datas, _routines_block.get()); + SchemaScannerHelper::insert_datetime_value(j, datas, _routines_block.get()); } else { - insert_string_value(j, row.column_value[j].stringVal, _routines_block.get()); + SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal, + _routines_block.get()); } } } diff --git a/be/src/exec/schema_scanner/schema_scanner_helper.cpp b/be/src/exec/schema_scanner/schema_scanner_helper.cpp index b7d7b085f78cdc..fc42044a29c63f 100644 --- a/be/src/exec/schema_scanner/schema_scanner_helper.cpp +++ b/be/src/exec/schema_scanner/schema_scanner_helper.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "schema_scanner_helper.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -59,4 +59,15 @@ void SchemaScannerHelper::insert_int_value(int col_index, int64_t int_val, reinterpret_cast*>(col_ptr)->insert_value(int_val); nullable_column->get_null_map_data().emplace_back(0); } + +void SchemaScannerHelper::insert_double_value(int col_index, double double_val, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + double_val); + nullable_column->get_null_map_data().emplace_back(0); +} } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_scanner_helper.h b/be/src/exec/schema_scanner/schema_scanner_helper.h index 126ad33e5f28a3..c9fe8881ddb06e 100644 --- a/be/src/exec/schema_scanner/schema_scanner_helper.h +++ b/be/src/exec/schema_scanner/schema_scanner_helper.h @@ -36,6 +36,7 @@ class SchemaScannerHelper { vectorized::Block* block); static void insert_int_value(int col_index, int64_t int_val, vectorized::Block* block); + static void insert_double_value(int col_index, double double_val, vectorized::Block* block); }; } // namespace doris diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 5c83a6d8041a74..84a552e45173c4 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -35,6 +35,7 @@ #include #include "common/logging.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" @@ -516,51 +517,29 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo std::shared_lock read_lock(_qs_ctx_map_lock); int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { TQueryStatistics tqs; qs_ctx_ptr->collect_query_statistics(&tqs); - insert_int_value(0, be_id, block); - insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); - insert_string_value(2, query_id, block); + SchemaScannerHelper::insert_int_value(0, be_id, block); + SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); + SchemaScannerHelper::insert_string_value(2, query_id, block); int64_t task_time = qs_ctx_ptr->_is_query_finished ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time : MonotonicMillis() - qs_ctx_ptr->_query_start_time; - insert_int_value(3, task_time, block); - insert_int_value(4, tqs.cpu_ms, block); - insert_int_value(5, tqs.scan_rows, block); - insert_int_value(6, tqs.scan_bytes, block); - insert_int_value(7, tqs.max_peak_memory_bytes, block); - insert_int_value(8, tqs.current_used_memory_bytes, block); - insert_int_value(9, tqs.shuffle_send_bytes, block); - insert_int_value(10, tqs.shuffle_send_rows, block); + SchemaScannerHelper::insert_int_value(3, task_time, block); + SchemaScannerHelper::insert_int_value(4, tqs.cpu_ms, block); + SchemaScannerHelper::insert_int_value(5, tqs.scan_rows, block); + SchemaScannerHelper::insert_int_value(6, tqs.scan_bytes, block); + SchemaScannerHelper::insert_int_value(7, tqs.max_peak_memory_bytes, block); + SchemaScannerHelper::insert_int_value(8, tqs.current_used_memory_bytes, block); + SchemaScannerHelper::insert_int_value(9, tqs.shuffle_send_bytes, block); + SchemaScannerHelper::insert_int_value(10, tqs.shuffle_send_rows, block); std::stringstream ss; ss << qs_ctx_ptr->_query_type; - insert_string_value(11, ss.str(), block); + SchemaScannerHelper::insert_string_value(11, ss.str(), block); } } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index e263685d07f47e..60a17cc1f439ba 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -27,6 +27,7 @@ #include #include "common/logging.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "io/fs/local_file_reader.h" #include "olap/storage_engine.h" #include "pipeline/task_queue.h" diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 314a2b87841e1d..393d327e7c42ca 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -22,6 +22,7 @@ #include #include +#include "exec/schema_scanner/schema_scanner_helper.h" #include "pipeline/task_scheduler.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" @@ -246,28 +247,6 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { } void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - - auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - double_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; int cpu_num = CpuInfo::num_cores(); cpu_num = cpu_num <= 0 ? 1 : cpu_num; @@ -276,18 +255,18 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { std::shared_lock r_lock(_group_mutex); block->reserve(_workload_groups.size()); for (const auto& [id, wg] : _workload_groups) { - insert_int_value(0, be_id, block); - insert_int_value(1, wg->id(), block); - insert_int_value(2, wg->get_mem_used(), block); + SchemaScannerHelper::insert_int_value(0, be_id, block); + SchemaScannerHelper::insert_int_value(1, wg->id(), block); + SchemaScannerHelper::insert_int_value(2, wg->get_mem_used(), block); double cpu_usage_p = (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0; - insert_double_value(3, cpu_usage_p, block); + SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block); - insert_int_value(4, wg->get_local_scan_bytes_per_second(), block); - insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int_value(4, wg->get_local_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block); } }