diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 0bf3f8c4e0931d..592e08f019938a 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _agg_data = Base::_shared_state->agg_data.get(); _agg_arena_pool = Base::_shared_state->agg_arena_pool.get(); _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); @@ -231,15 +231,16 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() { _agg_arena_pool->size() + Base::_shared_state->aggregate_data_container->memory_usage() - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume( + auto hash_table_memory_usage = data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); + Base::_shared_state->mem_usage_record.used_in_state; + Base::_mem_tracker->consume(arena_memory_usage); + Base::_mem_tracker->consume(hash_table_memory_usage); + Base::_memory_used_counter->set(Base::_mem_tracker->consumption()); + Base::_peak_memory_usage_counter->set( + Base::_mem_tracker->peak_consumption()); _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_UPDATE( - _hash_table_memory_usage, - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); + COUNTER_UPDATE(_hash_table_memory_usage, hash_table_memory_usage); Base::_shared_state->mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); Base::_shared_state->mem_usage_record.used_in_arena = @@ -426,6 +427,8 @@ void AggSinkLocalState::_update_memusage_without_key() { auto arena_memory_usage = _agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena; Base::_mem_tracker->consume(arena_memory_usage); + Base::_memory_used_counter->set(Base::_mem_tracker->consumption()); + Base::_peak_memory_usage_counter->set(Base::_mem_tracker->peak_consumption()); _serialize_key_arena_memory_usage->add(arena_memory_usage); Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size(); } diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 3bdda31308ff86..9d0e2bcc58a61d 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -460,18 +460,16 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { if (_shared_state->do_sort_limit && _shared_state->do_limit_filter(block, block->rows())) { vectorized::Block::filter_block_internal(block, _shared_state->need_computes); if (auto rows = block->rows()) { - _num_rows_returned += rows; + add_num_rows_returned(rows); COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } else { reached_limit(block, eos); } } else { if (auto rows = block->rows()) { - _num_rows_returned += rows; + add_num_rows_returned(rows); COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } } diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index e9276e4fa048c8..602cd0cf6acb43 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -31,7 +31,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _blocks_memory_usage = - _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); return Status::OK(); } diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 73dc5d03d64ffe..f0a2bfc1ada95f 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -161,7 +161,7 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _blocks_memory_usage = - profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); return Status::OK(); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 5aa27b51c45095..7aa9cd39f7badb 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -114,7 +114,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::Cancelled("Expected {} {} to be returned by expression {}", to_string_lambda(_assertion), _desired_num_rows, _subquery_string); } - COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 70b73225f060e8..11a97a532fc629 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -136,7 +136,7 @@ bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() { // were aggregated into it. Exclude passed through rows from this calculation since // they were not in hash tables. const int64_t input_rows = _input_num_rows; - const int64_t aggregated_input_rows = input_rows - _num_rows_returned; + const int64_t aggregated_input_rows = input_rows - num_rows_returned(); // TODO chenhao // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; double current_reduction = @@ -433,8 +433,8 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc in_block, local_state._aggregated_block.get())); // set limit and reach limit if (_limit != -1 && - (local_state._num_rows_returned + local_state._aggregated_block->rows()) > _limit) { - auto limit_rows = _limit - local_state._num_rows_returned; + (local_state.num_rows_returned() + local_state._aggregated_block->rows()) > _limit) { + auto limit_rows = _limit - local_state.num_rows_returned(); local_state._aggregated_block->set_num_rows(limit_rows); local_state._reach_limit = true; } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index e0a3725ad65e6d..27cae7a4e08960 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -166,6 +166,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); } + _parent->memory_used_counter()->update(request.block->ByteSizeLong()); _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; if (_queue_dependency && _total_queue_size > _queue_capacity) { @@ -204,6 +205,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } + _parent->memory_used_counter()->update(request.block_holder->get_block()->ByteSizeLong()); _instance_to_broadcast_package_queue[ins_id].emplace(request); } if (send_now) { @@ -303,6 +305,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block) { + _parent->memory_used_counter()->update(-request.block->ByteSizeLong()); static_cast(brpc_request->release_block()); } q.pop(); @@ -386,6 +389,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block_holder->get_block()) { + _parent->memory_used_counter()->update( + -request.block_holder->get_block()->ByteSizeLong()); static_cast(brpc_request->release_block()); } broadcast_q.pop(); @@ -435,8 +440,30 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; _turn_off_channel(id, true); - std::queue> empty; - swap(empty, _instance_to_broadcast_package_queue[id]); + std::queue>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + for (; !broadcast_q.empty(); broadcast_q.pop()) { + if (broadcast_q.front().block_holder->get_block()) { + _parent->memory_used_counter()->update( + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } + } + { + std::queue> empty; + swap(empty, broadcast_q); + } + + std::queue>& q = _instance_to_package_queue[id]; + for (; !q.empty(); q.pop()) { + if (q.front().block) { + _parent->memory_used_counter()->update(-q.front().block->ByteSizeLong()); + } + } + + { + std::queue> empty; + swap(empty, q); + } } bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 518620ba6b4d3e..e4206b62ab2157 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -461,6 +461,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR( local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); } + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows( state, local_state.channels, local_state._partition_count, @@ -470,7 +474,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block state, local_state.channel_shared_ptrs, local_state._partition_count, local_state._partitioner->get_channel_ids().get(), rows, block, eos)); } + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage); } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } // check out of limit RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr convert_block = std::make_shared(); @@ -506,7 +519,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block // when send data we still use block RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, channel2rows, block, eos)); + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } { SCOPED_TIMER(local_state._split_block_hash_compute_timer); RETURN_IF_ERROR( @@ -517,6 +539,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(channel_add_rows_with_idx( state, local_state.channels, local_state.channels.size(), assignments, block, eos)); + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage); } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index cf2055ec47b071..85696ec68a6d2a 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -62,8 +62,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( - state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), - profile(), p.is_merging()); + state, this, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), + p.num_senders(), profile(), p.is_merging()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); @@ -169,9 +169,8 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block *eos = true; auto limit = _limit - local_state.num_rows_returned(); block->set_num_rows(limit); - local_state.set_num_rows_returned(_limit); + local_state.add_num_rows_returned(limit); } - COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); } return status; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 93d22850dfcbb1..ba280df65e7269 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -72,11 +72,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo } _build_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _build_arena_memory_usage = - profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1); + profile()->AddHighWaterMarkCounter("MemoryUsageBuildKeyArena", TUnit::BYTES, "", 1); // Build phase auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); @@ -321,6 +321,8 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, _mem_tracker->consume(arg.hash_table->get_byte_size() - old_hash_table_size); _mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size); + _memory_used_counter->set(_mem_tracker->consumption()); + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); return st; }}, *_shared_state->hash_table_variants, _shared_state->join_op_variants, @@ -542,7 +544,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. - local_state._build_side_mem_used += in_block->allocated_bytes(); if (local_state._build_side_mutable_block.empty()) { auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename( @@ -569,12 +570,16 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* std::to_string(std::numeric_limits::max())); } - local_state._mem_tracker->consume(in_block->bytes()); - COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes()); - + auto old_block_mem_usage = local_state._build_side_mutable_block.allocated_bytes(); SCOPED_TIMER(local_state._build_side_merge_block_timer); RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow( std::move(*in_block))); + auto new_block_mem_usage = local_state._build_side_mutable_block.allocated_bytes(); + local_state._mem_tracker->consume(new_block_mem_usage - old_block_mem_usage); + local_state._memory_used_counter->set(local_state._mem_tracker->consumption()); + local_state._peak_memory_usage_counter->set( + local_state._mem_tracker->peak_consumption()); + COUNTER_SET(local_state._build_blocks_memory_usage, (int64_t)new_block_mem_usage); } } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index a544cdcf4563a4..78c063e3b39afc 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -74,8 +74,6 @@ class HashJoinBuildSinkLocalState final std::vector _key_columns_holder; bool _should_build_hash_table = true; - int64_t _build_side_mem_used = 0; - int64_t _build_side_last_mem_used = 0; size_t _build_side_rows = 0; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index f91e1eaa2a1b17..9e901727e218b6 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -54,7 +54,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) _construct_mutable_join_block(); _probe_column_disguise_null.reserve(_probe_expr_ctxs.size()); _probe_arena_memory_usage = - profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1); // Probe phase _probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime"); @@ -500,7 +500,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids)); if (&local_state._probe_block != input_block) { + auto old_block_mem_usage = local_state._probe_block.allocated_bytes(); input_block->swap(local_state._probe_block); + auto new_block_mem_usage = local_state._probe_block.allocated_bytes(); + local_state._mem_tracker->consume(new_block_mem_usage - old_block_mem_usage); } } return Status::OK(); diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 965d62192b2fed..bf4a4d5763c02e 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -132,7 +132,6 @@ struct ProcessHashTableProbe { bool _need_calculate_build_index_has_zero = true; bool* _has_null_in_build_side; - RuntimeProfile::Counter* _rows_returned_counter = nullptr; RuntimeProfile::Counter* _search_hashtable_timer = nullptr; RuntimeProfile::Counter* _init_probe_side_timer = nullptr; RuntimeProfile::Counter* _build_side_output_timer = nullptr; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 6bb5a2006ab9b0..0e0d88dd396d75 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -51,7 +51,6 @@ ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeLocalState _left_output_slot_flags(parent->left_output_slot_flags()), _right_output_slot_flags(parent->right_output_slot_flags()), _has_null_in_build_side(parent->has_null_in_build_side()), - _rows_returned_counter(parent->_rows_returned_counter), _search_hashtable_timer(parent->_search_hashtable_timer), _init_probe_side_timer(parent->_init_probe_side_timer), _build_side_output_timer(parent->_build_side_output_timer), diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 4a93bac67fe477..9144d4628c35d7 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -329,8 +329,8 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized:: } void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) { - if (_parent->_limit != -1 and _num_rows_returned + block->rows() >= _parent->_limit) { - block->set_num_rows(_parent->_limit - _num_rows_returned); + if (_parent->_limit != -1 and num_rows_returned() + block->rows() >= _parent->_limit) { + block->set_num_rows(_parent->_limit - num_rows_returned()); *eos = true; } @@ -345,9 +345,8 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) }); if (auto rows = block->rows()) { - _num_rows_returned += rows; + COUNTER_UPDATE(_rows_returned_counter, rows); COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } @@ -425,8 +424,7 @@ PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* } PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) - : _num_rows_returned(0), - _rows_returned_counter(nullptr), + : _rows_returned_counter(nullptr), _peak_memory_usage_counter(nullptr), _parent(parent), _state(state) { @@ -469,9 +467,9 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); _mem_tracker = std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); - _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1); + _peak_memory_usage_counter = + _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); return Status::OK(); } @@ -504,9 +502,6 @@ Status PipelineXLocalState::close(RuntimeState* state) { if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - if (_rows_returned_counter != nullptr) { - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - } if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } @@ -549,9 +544,9 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSink _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); _mem_tracker = std::make_unique(_parent->get_name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1); _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); return Status::OK(); } @@ -581,6 +576,7 @@ template Status StatefulOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + auto old_block_mem_usage = local_state._child_block->allocated_bytes(); if (need_more_input_data(state)) { local_state._child_block->clear_column_data( OperatorX::_child->row_desc().num_materialized_slots()); @@ -606,6 +602,8 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori *eos = false; } } + auto new_block_mem_usage = local_state._child_block->allocated_bytes(); + local_state._mem_tracker->consume(new_block_mem_usage - old_block_mem_usage); return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index eca7d608437b06..1d054ad00740f5 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -156,16 +156,16 @@ class PipelineXLocalStateBase { RuntimeProfile* profile() { return _runtime_profile.get(); } MemTracker* mem_tracker() { return _mem_tracker.get(); } - RuntimeProfile::Counter* rows_returned_counter() { return _rows_returned_counter; } RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } vectorized::VExprContextSPtrs& projections() { return _projections; } - [[nodiscard]] int64_t num_rows_returned() const { return _num_rows_returned; } - void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; } - void set_num_rows_returned(int64_t value) { _num_rows_returned = value; } + [[nodiscard]] int64_t num_rows_returned() const { return _rows_returned_counter->value(); } + void add_num_rows_returned(int64_t delta) { COUNTER_UPDATE(_rows_returned_counter, delta); } [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0; @@ -182,7 +182,6 @@ class PipelineXLocalStateBase { friend class OperatorXBase; ObjectPool* _pool = nullptr; - int64_t _num_rows_returned {0}; std::unique_ptr _runtime_profile; @@ -341,6 +340,8 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } virtual std::vector dependencies() const { return {nullptr}; } // override in exchange sink , AsyncWriterSink diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index fbabdbdc8f85fe..eba9e8472d3158 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -113,9 +113,9 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _partition_exprs_num = p._partition_exprs_num; _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); _serialize_key_arena_memory_usage = - _profile->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageHashTable", TUnit::BYTES, 1); _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index f2cd8dea0b943c..0a4eb9cf3546e7 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -59,7 +59,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: } if (!output_block->empty()) { COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - local_state._num_rows_returned += output_block->rows(); + local_state.add_num_rows_returned(output_block->rows()); } return Status::OK(); } @@ -81,7 +81,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: } if (!output_block->empty()) { COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - local_state._num_rows_returned += output_block->rows(); + local_state.add_num_rows_returned(output_block->rows()); } return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 469716b7a22182..1db0680e03ae03 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -81,10 +81,10 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat void PartitionedAggSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique("internal_profile"); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", + TUnit::BYTES, "", 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); @@ -110,8 +110,8 @@ void PartitionedAggSinkLocalState::_init_counters() { } while (false) void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_hash_table_memory_usage, "HashTable"); - UPDATE_PROFILE(_serialize_key_arena_memory_usage, "SerializeKeyArena"); + UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable"); + UPDATE_PROFILE(_serialize_key_arena_memory_usage, "MemoryUsageSerializeKeyArena"); UPDATE_PROFILE(_build_timer, "BuildTime"); UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime"); UPDATE_PROFILE(_merge_timer, "MergeTime"); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a7297be493f804..0c94491986cf63 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -80,7 +80,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state if (inner_sink_state_) { auto inner_sink_state = assert_cast(inner_sink_state_); - return inner_sink_state->_build_side_mem_used; + return inner_sink_state->_build_blocks_memory_usage->value(); } } return 0; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index d355d99c2e352f..dba4f27af7c385 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -231,7 +231,6 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp output_block->columns())); *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); - COUNTER_SET(local_state._rows_returned_counter, local_state._num_rows_returned); return Status::OK(); } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index b07942b9ab1c05..4ed4aa90ce8f73 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -31,7 +31,7 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); return Status::OK(); } @@ -117,8 +117,9 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { - COUNTER_UPDATE(local_state._sort_blocks_memory_usage, (int64_t)in_block->bytes()); RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); + local_state._sort_blocks_memory_usage->set( + (int64_t)local_state._shared_state->sorter->data_size()); local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size()); RETURN_IF_CANCELLED(state); @@ -142,6 +143,8 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read()); local_state._dependency->set_ready_to_read(); } + local_state._memory_used_counter->set(local_state._mem_tracker->consumption()); + local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption()); return Status::OK(); } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 4bf1ab04efb628..beb4e34ae483de 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -51,7 +51,7 @@ void SpillSortSinkLocalState::_init_counters() { _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime"); _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime"); _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); _spill_merge_sort_timer = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", "Spill", 1); @@ -70,7 +70,7 @@ void SpillSortSinkLocalState::_init_counters() { void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime"); UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime"); - UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks"); + UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks"); } Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index dfbe42c637ea56..7eb8594b8a0e2a 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -87,10 +87,10 @@ Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_init_timer); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime"); diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index c14d119e0fee25..78067b9b18106c 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instanc } std::shared_ptr VDataStreamMgr::create_recvr( - RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging) { + RuntimeState* state, pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging) { DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; - std::shared_ptr recvr(new VDataStreamRecvr(this, state, row_desc, + std::shared_ptr recvr(new VDataStreamRecvr(this, parent, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, profile)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 09e347fcfb2a7a..bd5e6f9b91ee57 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -40,6 +40,9 @@ class RuntimeState; class RowDescriptor; class RuntimeProfile; class PTransmitDataParams; +namespace pipeline { +class ExchangeLocalState; +} namespace vectorized { class VDataStreamRecvr; @@ -50,6 +53,7 @@ class VDataStreamMgr { ~VDataStreamMgr(); std::shared_ptr create_recvr(RuntimeState* state, + pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1ca6bb7f2c5931..d2f79b8529e3ac 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -96,6 +96,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); _block_queue.pop_front(); + _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -207,6 +208,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } + _recvr->_parent->memory_used_counter()->update(block_byte_size); add_blocks_memory_usage(block_byte_size); return Status::OK(); } @@ -245,6 +247,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _record_debug_info(); try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); + _recvr->_parent->memory_used_counter()->update(block_mem_size); add_blocks_memory_usage(block_mem_size); } } @@ -315,12 +318,13 @@ void VDataStreamRecvr::SenderQueue::close() { _block_queue.clear(); } -VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, - const RowDescriptor& row_desc, +VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile) : HasTaskExecutionCtx(state), _mgr(stream_mgr), + _parent(parent), _query_thread_context(state->query_id(), state->query_mem_tracker(), state->get_query_ctx()->workload_group()), _fragment_instance_id(fragment_instance_id), @@ -352,9 +356,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta } // Initialize the counters - _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); - _peak_memory_usage_counter = - _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES); _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); @@ -417,7 +418,7 @@ std::shared_ptr VDataStreamRecvr::get_local_channel_depend } Status VDataStreamRecvr::get_next(Block* block, bool* eos) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + _parent->peak_memory_usage_counter()->set(_mem_tracker->peak_consumption()); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); @@ -492,8 +493,8 @@ void VDataStreamRecvr::close() { _mgr = nullptr; _merger.reset(); - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + if (_parent->peak_memory_usage_counter()) { + _parent->peak_memory_usage_counter()->set(_mem_tracker->peak_consumption()); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e8dcfdedba5fb9..b2d76590ba2717 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -69,7 +69,8 @@ class VDataStreamRecvr; class VDataStreamRecvr : public HasTaskExecutionCtx { public: class SenderQueue; - VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc, + VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile); @@ -120,6 +121,8 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { // DataStreamMgr instance used to create this recvr. (Not owned) VDataStreamMgr* _mgr = nullptr; + pipeline::ExchangeLocalState* _parent = nullptr; + QueryThreadContext _query_thread_context; // Fragment and node id of the destination exchange node this receiver is used by. @@ -152,8 +155,6 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { RuntimeProfile::Counter* _data_arrival_timer = nullptr; RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; // Number of rows received RuntimeProfile::Counter* _rows_produced_counter = nullptr; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fb2f24ee0e1817..1b98bbc31162e4 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -129,6 +129,12 @@ std::shared_ptr PipChannel::get_local_channel_dependency() Channel::_parent->sender_id()); } +int64_t PipChannel::mem_usage() const { + auto* mutable_block = Channel::_serializer.get_block(); + int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0; + return mem_usage; +} + Status PipChannel::send_remote_block(PBlock* block, bool eos, Status exec_status) { COUNTER_UPDATE(Channel::_parent->blocks_sent_counter(), 1); std::unique_ptr pblock_ptr; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 7c86a62519a851..2b839686dc8289 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -276,6 +276,8 @@ class PipChannel final : public Channel { ~PipChannel() override { delete Channel::_ch_cur_pb_block; } + int64_t mem_usage() const; + void ch_roll_pb_block() override { // We have two choices here. // 1. Use a PBlock pool and fetch an available PBlock if we need one. In this way, we can