diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 5746551a29f746..c5822591d4affc 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -88,11 +88,10 @@ Status AggSinkLocalState::init(RuntimeState* state, RETURN_IF_ERROR( p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i])); } - _memory_usage_counter = ADD_LABEL_COUNTER(Base::profile(), "MemoryUsage"); - _hash_table_memory_usage = - ADD_CHILD_COUNTER(Base::profile(), "HashTable", TUnit::BYTES, "MemoryUsage"); + _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", + TUnit::BYTES, "MemoryUsage", 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage"); + "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime"); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index a7b30d46117cc2..8bdd624ca484b5 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -308,7 +308,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState { RuntimeProfile::Counter* _serialize_data_timer = nullptr; RuntimeProfile::Counter* _deserialize_data_timer = nullptr; RuntimeProfile::Counter* _max_row_size_counter = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index d9923a68f24513..e9cc0cb1d06a43 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -34,8 +34,8 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); _shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size()); - _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); - _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage"); + _blocks_memory_usage = + _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); size_t agg_size = p._agg_expr_ctxs.size(); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 064d68f189a54e..c291aa9522639f 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -84,7 +84,6 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalStatecast(); _agg_functions_size = p._agg_functions.size(); - _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); _blocks_memory_usage = - profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage"); + profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); _agg_functions.resize(p._agg_functions.size()); diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index f4e2f10a7190c8..b4acb2fbce503f 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -121,7 +121,6 @@ class AnalyticLocalState final : public PipelineXLocalState _agg_arena_pool; std::vector _agg_functions; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; RuntimeProfile::Counter* _evaluation_timer = nullptr; RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1f9ba3b4203af7..9c26e97591821f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -124,7 +124,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf ""); _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime"); _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES); - _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1); _wait_queue_timer = diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 3e6486e34fdfb1..5ba8228e502779 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -178,7 +178,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalStateAddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage"); + profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1); // Build phase auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); @@ -271,7 +269,9 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, HashJoinBuildSinkLocalState> hash_table_build_process(rows, raw_ptrs, this, state->batch_size(), state); - return hash_table_build_process.template run< + auto old_hash_table_size = arg.hash_table->get_byte_size(); + auto old_key_size = arg.serialized_keys_size(true); + auto st = hash_table_build_process.template run< JoinOpType::value, has_null_value, short_circuit_for_null_in_build_side, with_other_conjuncts>( arg, @@ -279,6 +279,10 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, ? &null_map_val->get_data() : nullptr, &_shared_state->_has_null_in_build_side); + _mem_tracker->consume(arg.hash_table->get_byte_size() - + old_hash_table_size); + _mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size); + return st; }}, *_shared_state->hash_table_variants, _shared_state->join_op_variants, vectorized::make_bool_variant(_build_side_ignore_null), @@ -469,6 +473,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state._build_side_merge_block_timer); RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block)); + COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes()); + local_state._mem_tracker->consume(in_block->bytes()); if (local_state._build_side_mutable_block.rows() > std::numeric_limits::max()) { return Status::NotSupported( @@ -483,8 +489,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* DCHECK(!local_state._build_side_mutable_block.empty()); local_state._shared_state->build_block = std::make_shared( local_state._build_side_mutable_block.to_block()); - COUNTER_UPDATE(local_state._build_blocks_memory_usage, - (*local_state._shared_state->build_block).bytes()); const bool use_global_rf = local_state._parent->cast()._use_global_rf; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 7521563ecb8b07..2acc25151abe6c 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -135,7 +135,6 @@ class HashJoinBuildSinkLocalState final RuntimeProfile::Counter* _allocate_resource_timer = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage = nullptr; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index aef2e011fa0483..f7a06655b191c8 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -53,7 +53,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) _construct_mutable_join_block(); _probe_column_disguise_null.reserve(_probe_expr_ctxs.size()); _probe_arena_memory_usage = - profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage"); + profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1); // Probe phase _probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime"); @@ -320,6 +320,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc mutable_join_block, &temp_block, local_state._probe_block.rows(), _is_mark_join, _have_other_join_conjunct); + local_state._mem_tracker->set_consumption( + arg.serialized_keys_size(false)); } else { st = Status::InternalError("uninited hash table"); } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 49f3bd2269c43e..88e6c8805687a9 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -28,6 +28,7 @@ #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/operator.h" +#include "util/runtime_profile.h" #include "vec/exec/runtime_filter_consumer.h" #include "vec/exec/scan/pip_scanner_context.h" #include "vec/exec/scan/scanner_context.h" @@ -1286,11 +1287,11 @@ Status ScanLocalState::_init_profile() { _scanner_profile.reset(new RuntimeProfile("VScanner")); profile()->add_child(_scanner_profile.get(), true, nullptr); - _memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage"); - _queued_blocks_memory_usage = - _scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES, "MemoryUsage"); + _memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1); + _queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter( + "QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1); _free_blocks_memory_usage = - _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage"); + _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1); _newly_create_free_blocks_num = ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); // time of transfer thread to wait for block from scan thread diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 56a81422484bc9..465622be4aa515 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -63,7 +63,8 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true"); - _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); + _sort_blocks_memory_usage = + ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); return Status::OK(); } @@ -149,16 +150,20 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); + local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size()); + COUNTER_SET(local_state._sort_blocks_memory_usage, + (int64_t)local_state._shared_state->sorter->data_size()); RETURN_IF_CANCELLED(state); // update runtime predicate if (_use_topn_opt) { vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); if (!new_top.is_null() && new_top != local_state.old_top) { - auto& sort_description = local_state._shared_state->sorter->get_sort_description(); + const auto& sort_description = + local_state._shared_state->sorter->get_sort_description(); auto col = in_block->get_by_position(sort_description[0].column_number); bool is_reverse = sort_description[0].direction < 0; - auto query_ctx = state->get_query_ctx(); + auto* query_ctx = state->get_query_ctx(); RETURN_IF_ERROR( query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse)); local_state.old_top = std::move(new_top); diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 7069183f3b255e..d3d85a3e5c9402 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -72,7 +72,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState { // Expressions and parameters used for build _sort_description vectorized::VSortExecExprs _vsort_exec_exprs; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; + RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr; // topn top value vectorized::Field old_top {vectorized::Field::Types::Null}; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index ef17ee70ed6301..4be062b9e8c675 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -403,9 +403,9 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); _mem_tracker = std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage"); + _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); + "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); return Status::OK(); } @@ -462,9 +462,9 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); _mem_tracker = std::make_unique(_parent->get_name()); - _memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); + _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1); _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); + _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); return Status::OK(); } diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 38464cda63e609..f5ed7250618ef1 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -364,7 +364,8 @@ const std::string* RuntimeProfile::get_info_string(const std::string& key) { #define ADD_COUNTER_IMPL(NAME, T) \ RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, TUnit::type unit, \ - const std::string& parent_counter_name) { \ + const std::string& parent_counter_name, \ + int64_t level) { \ DCHECK_EQ(_is_averaged_profile, false); \ std::lock_guard l(_counter_map_lock); \ if (_counter_map.find(name) != _counter_map.end()) { \ @@ -372,7 +373,7 @@ const std::string* RuntimeProfile::get_info_string(const std::string& key) { } \ DCHECK(parent_counter_name == ROOT_COUNTER || \ _counter_map.find(parent_counter_name) != _counter_map.end()); \ - T* counter = _pool->add(new T(unit)); \ + T* counter = _pool->add(new T(unit, level)); \ _counter_map[name] = counter; \ std::set* child_counters = \ find_or_insert(&_child_counter_map, parent_counter_name, std::set()); \ diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index d5233d40f2c8e4..4cc2c2617ec45f 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -132,7 +132,8 @@ class RuntimeProfile { /// as value()) and the current value. class HighWaterMarkCounter : public Counter { public: - HighWaterMarkCounter(TUnit::type unit) : Counter(unit), current_value_(0) {} + HighWaterMarkCounter(TUnit::type unit, int64_t level = 2) + : Counter(unit, 0, level), current_value_(0) {} virtual void add(int64_t delta) { current_value_.fetch_add(delta, std::memory_order_relaxed); @@ -413,7 +414,8 @@ class RuntimeProfile { /// Adds a high water mark counter to the runtime profile. Otherwise, same behavior /// as AddCounter(). HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, TUnit::type unit, - const std::string& parent_counter_name = ""); + const std::string& parent_counter_name = "", + int64_t level = 2); // Only for create MemTracker(using profile's counter to calc consumption) std::shared_ptr AddSharedHighWaterMarkCounter( diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 41f3bd52efddd5..1536d48fe7a3a6 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -69,6 +69,8 @@ struct MethodBaseInner { const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) = 0; + virtual size_t serialized_keys_size(bool is_build) const { return 0; } + void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const uint8_t* null_map) { bucket_nums.resize(num_rows); @@ -243,6 +245,10 @@ struct MethodSerialized : public MethodBase { Base::keys = input_keys.data(); } + size_t serialized_keys_size(bool is_build) const override { + return is_build ? build_arena.size() : Base::arena.size(); + } + void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) override { @@ -277,6 +283,10 @@ struct MethodStringNoCache : public MethodBase { std::vector stored_keys; + size_t serialized_keys_size(bool is_build) const override { + return stored_keys.size() * sizeof(StringRef); + } + void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) override { @@ -430,6 +440,10 @@ struct MethodKeysFixed : public MethodBase { } } + size_t serialized_keys_size(bool is_build) const override { + return (is_build ? build_stored_keys.size() : stored_keys.size()) * + sizeof(typename Base::Key); + } void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) override { diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 9f5167bb555bf2..1939b702c69a8a 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -139,6 +139,8 @@ typename HashTableType::State ProcessHashTableProbe::_init_p false, hash_table_ctx.hash_table->get_bucket_size()); hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums, need_judge_null ? null_map : nullptr); + COUNTER_SET(_parent->_probe_arena_memory_usage, + (int64_t)hash_table_ctx.serialized_keys_size(false)); } return typename HashTableType::State(_parent->_probe_columns); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index fb5bf19015d99c..95c59094ba6837 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -141,8 +141,10 @@ struct ProcessHashTableBuild { hash_table_ctx.bucket_nums.resize(_batch_size); hash_table_ctx.bucket_nums.shrink_to_fit(); - COUNTER_UPDATE(_parent->_hash_table_memory_usage, - hash_table_ctx.hash_table->get_byte_size()); + COUNTER_SET(_parent->_hash_table_memory_usage, + (int64_t)hash_table_ctx.hash_table->get_byte_size()); + COUNTER_SET(_parent->_build_arena_memory_usage, + (int64_t)hash_table_ctx.serialized_keys_size(true)); return Status::OK(); }