diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 5f8d5c8c494142..315e76a2426d00 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -199,7 +199,7 @@ void LocalExchangeSharedState::sub_running_source_operators( LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) { source_deps.resize(num_instances, nullptr); - mem_trackers.resize(num_instances, nullptr); + mem_counters.resize(num_instances, nullptr); } vectorized::MutableColumns AggSharedState::_get_keys_hash_table() { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 863458d3bdec3f..63fa8ac8c79695 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -324,11 +324,6 @@ struct AggSharedState : public BasicSharedState { vectorized::Sizes offsets_of_aggregate_states; std::vector make_nullable_keys; - struct MemoryRecord { - int64_t used_in_arena {}; - int64_t used_in_state {}; - }; - MemoryRecord mem_usage_record; bool agg_data_created_without_key = false; bool enable_spill = false; bool reach_limit = false; @@ -829,7 +824,7 @@ struct LocalExchangeSharedState : public BasicSharedState { LocalExchangeSharedState(int num_instances); ~LocalExchangeSharedState() override; std::unique_ptr exchanger {}; - std::vector mem_trackers; + std::vector mem_counters; std::atomic mem_usage = 0; // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; @@ -865,13 +860,15 @@ struct LocalExchangeSharedState : public BasicSharedState { } void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) { - mem_trackers[channel_id]->consume(delta); + mem_counters[channel_id]->update(delta); if (update_total_mem_usage) { add_total_mem_usage(delta, channel_id); } } - void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); } + void sub_mem_usage(int channel_id, size_t delta) { + mem_counters[channel_id]->update(-(int64_t)delta); + } virtual void add_total_mem_usage(size_t delta, int channel_id) { if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 0bf3f8c4e0931d..444891b9c5eeed 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _agg_data = Base::_shared_state->agg_data.get(); _agg_arena_pool = Base::_shared_state->agg_arena_pool.get(); _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); - _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); + _serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); @@ -227,24 +227,17 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() { }, [&](auto& agg_method) -> void { auto& data = *agg_method.hash_table; - auto arena_memory_usage = + int64_t arena_memory_usage = _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage() - - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume( - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_UPDATE( - _hash_table_memory_usage, - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - Base::_shared_state->mem_usage_record.used_in_state = - data.get_buffer_size_in_bytes(); - Base::_shared_state->mem_usage_record.used_in_arena = - _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage(); + _shared_state->aggregate_data_container->memory_usage(); + int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes(); + + COUNTER_SET(_memory_used_counter, + arena_memory_usage + hash_table_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); + + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); + COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); }}, _agg_data->method_variant); } @@ -423,11 +416,10 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { } void AggSinkLocalState::_update_memusage_without_key() { - auto arena_memory_usage = - _agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size(); + int64_t arena_memory_usage = _agg_arena_pool->size(); + COUNTER_SET(_memory_used_counter, arena_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage); + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) { @@ -876,8 +868,6 @@ Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { std::vector tmp_deserialize_buffer; _deserialize_buffer.swap(tmp_deserialize_buffer); - Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state + - Base::_shared_state->mem_usage_record.used_in_arena); return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 97440de3f09e4c..cbc638781e8ad3 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -109,7 +109,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { RuntimeProfile::Counter* _max_row_size_counter = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; - RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; + RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr; bool _should_limit_output = false; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 3bdda31308ff86..bc2d83a2782ad5 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -461,8 +461,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { vectorized::Block::filter_block_internal(block, _shared_state->need_computes); if (auto rows = block->rows()) { _num_rows_returned += rows; - COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } else { reached_limit(block, eos); @@ -470,8 +468,6 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { } else { if (auto rows = block->rows()) { _num_rows_returned += rows; - COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } } diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index e9276e4fa048c8..1da5c1f7c35445 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -30,8 +30,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); - _blocks_memory_usage = - _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + _blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); return Status::OK(); } @@ -312,8 +311,10 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block local_state._shared_state->ordey_by_column_idxs[i] = result_col_id; } - local_state.mem_tracker()->consume(input_block->allocated_bytes()); - local_state._blocks_memory_usage->add(input_block->allocated_bytes()); + int64_t block_mem_usage = input_block->allocated_bytes(); + COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage); + COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value()); + COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage); //TODO: if need improvement, the is a tips to maintain a free queue, //so the memory could reuse, no need to new/delete again; diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index cf2892eb7e6ceb..741243e7465e22 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -58,7 +58,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState _agg_expr_ctxs; }; diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 73dc5d03d64ffe..134a0ad82d7a05 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -161,7 +161,7 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _blocks_memory_usage = - profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); return Status::OK(); } @@ -443,7 +443,6 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) { Status AnalyticLocalState::output_current_block(vectorized::Block* block) { block->swap(std::move(_shared_state->input_blocks[_output_block_index])); _blocks_memory_usage->add(-block->allocated_bytes()); - mem_tracker()->consume(-block->allocated_bytes()); if (_shared_state->origin_cols.size() < block->columns()) { block->erase_not_in(_shared_state->origin_cols); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 5aa27b51c45095..71688ff530d02e 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -114,8 +114,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::Cancelled("Expected {} {} to be returned by expression {}", to_string_lambda(_assertion), _desired_num_rows, _subquery_string); } - COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); return Status::OK(); diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index 5f8c5befc6a2b9..4f953dc225dd4b 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -156,7 +156,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b local_state._current_query_cache_rows += output_block->rows(); auto mem_consume = output_block->allocated_bytes(); local_state._current_query_cache_bytes += mem_consume; - local_state._mem_tracker->consume(mem_consume); if (_cache_param.entry_max_bytes < local_state._current_query_cache_bytes || _cache_param.entry_max_rows < local_state._current_query_cache_rows) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 70b73225f060e8..ed4a39f10e2307 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -460,7 +460,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc block->columns())); } local_state.add_num_rows_returned(block->rows()); - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); // If the limit is not reached, it is important to ensure that _aggregated_block is empty // because it may still contain data. // However, if the limit is reached, there is no need to output data even if some exists. diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index e0a3725ad65e6d..ff06bc37e5d084 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -165,6 +165,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); + COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); + COUNTER_SET(_parent->peak_memory_usage_counter(), + _parent->memory_used_counter()->value()); } _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; @@ -303,6 +306,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block) { + COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong()); static_cast(brpc_request->release_block()); } q.pop(); @@ -435,8 +439,30 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; _turn_off_channel(id, true); - std::queue> empty; - swap(empty, _instance_to_broadcast_package_queue[id]); + std::queue>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + for (; !broadcast_q.empty(); broadcast_q.pop()) { + if (broadcast_q.front().block_holder->get_block()) { + COUNTER_UPDATE(_parent->memory_used_counter(), + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } + } + { + std::queue> empty; + swap(empty, broadcast_q); + } + + std::queue>& q = _instance_to_package_queue[id]; + for (; !q.empty(); q.pop()) { + if (q.front().block) { + COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); + } + } + + { + std::queue> empty; + swap(empty, q); + } } bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 518620ba6b4d3e..bcb4b46b97c6af 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -95,7 +95,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]); } } - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); // Make sure brpc stub is ready before execution. for (int i = 0; i < channels.size(); ++i) { @@ -112,7 +111,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); int local_size = 0; for (int i = 0; i < channels.size(); ++i) { @@ -338,7 +336,6 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { Status ExchangeSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); _state = state; - _mem_tracker = std::make_unique("ExchangeSinkOperatorX:"); _compression_type = state->fragement_transmission_compression_type(); if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { if (_output_tuple_id == -1) { @@ -368,7 +365,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); - local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption()); bool all_receiver_eof = true; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { @@ -379,6 +375,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (all_receiver_eof) { return Status::EndOfFile("all data stream channels EOF"); } + Defer defer([&]() { + COUNTER_SET(local_state._peak_memory_usage_counter, + local_state._memory_used_counter->value()); + }); if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { // 1. serialize depends on it is not local exchange @@ -397,7 +397,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); bool serialized = false; RETURN_IF_ERROR(local_state._serializer.next_serialized_block( block, block_holder->get_block(), local_state.channels.size(), &serialized, @@ -420,7 +419,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (channel->is_local()) { status = channel->send_local_block(&cur_block); } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); @@ -442,7 +440,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto status = current_channel->send_local_block(block); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(local_state._serializer.serialize_block( block, current_channel->ch_cur_pb_block())); auto status = @@ -458,8 +455,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto rows = block->rows(); { SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR( - local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); + } + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); } if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows( @@ -470,7 +470,19 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block state, local_state.channel_shared_ptrs, local_state._partition_count, local_state._partitioner->get_channel_ids().get(), rows, block, eos)); } + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + COUNTER_UPDATE(local_state.memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); + COUNTER_SET(local_state.peak_memory_usage_counter(), + local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } // check out of limit RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr convert_block = std::make_shared(); @@ -506,17 +518,36 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block // when send data we still use block RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, channel2rows, block, eos)); + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + COUNTER_UPDATE(local_state.memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); + COUNTER_SET(local_state.peak_memory_usage_counter(), + local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } { SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR( - local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); } std::vector> assignments = local_state.scale_writer_partitioning_exchanger->accept(block); RETURN_IF_ERROR(channel_add_rows_with_idx( state, local_state.channels, local_state.channels.size(), assignments, block, eos)); + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + COUNTER_UPDATE(local_state.memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); + COUNTER_SET(local_state.peak_memory_usage_counter(), + local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel @@ -528,7 +559,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block auto status = current_channel->send_local_block(block); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(local_state._serializer.serialize_block( block, current_channel->ch_cur_pb_block())); auto status = diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index adf8a3424706d2..67075915b0469a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -252,7 +252,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _dests; - std::unique_ptr _mem_tracker; // Identifier of the destination plan node. const PlanNodeId _dest_node_id; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index cf2055ec47b071..ca5194e7077f1f 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -62,8 +62,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( - state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), - profile(), p.is_merging()); + state, this, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), + p.num_senders(), profile(), p.is_merging()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); @@ -171,8 +171,6 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block block->set_num_rows(limit); local_state.set_num_rows_returned(_limit); } - COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); } return status; } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 6db49bb7ab1089..a8d5145d90da60 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -46,8 +46,6 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { _vpartition = std::make_unique(p._schema, p._partition); RETURN_IF_ERROR(_vpartition->init()); _state = state; - // profile must add to state's object pool - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); _block_convertor = std::make_unique(p._output_tuple_desc); _block_convertor->init_autoinc_info(p._schema->db_id(), p._schema->table_id(), @@ -276,7 +274,6 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); - SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get()); if (!local_state._load_block_queue) { RETURN_IF_ERROR(local_state._initialize_load_queue()); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 93d22850dfcbb1..79aae546fbfbed 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -72,11 +72,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo } _build_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _build_arena_memory_usage = - profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildKeyArena", TUnit::BYTES, 1); // Build phase auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); @@ -292,41 +292,41 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, // Get the key column that needs to be built Status st = _extract_join_column(block, null_map_val, raw_ptrs, _build_col_ids); - st = std::visit( - vectorized::Overload { - [&](std::monostate& arg, auto join_op, auto has_null_value, - auto short_circuit_for_null_in_build_side, - auto with_other_conjuncts) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - return Status::OK(); - }, - [&](auto&& arg, auto&& join_op, auto has_null_value, - auto short_circuit_for_null_in_build_side, - auto with_other_conjuncts) -> Status { - using HashTableCtxType = std::decay_t; - using JoinOpType = std::decay_t; - ProcessHashTableBuild hash_table_build_process( - rows, raw_ptrs, this, state->batch_size(), state); - auto old_hash_table_size = arg.hash_table->get_byte_size(); - auto old_key_size = arg.serialized_keys_size(true); - auto st = hash_table_build_process.template run< - JoinOpType::value, has_null_value, - short_circuit_for_null_in_build_side, with_other_conjuncts>( - arg, - has_null_value || short_circuit_for_null_in_build_side - ? &null_map_val->get_data() - : nullptr, - &_shared_state->_has_null_in_build_side); - _mem_tracker->consume(arg.hash_table->get_byte_size() - - old_hash_table_size); - _mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size); - return st; - }}, - *_shared_state->hash_table_variants, _shared_state->join_op_variants, - vectorized::make_bool_variant(_build_side_ignore_null), - vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side), - vectorized::make_bool_variant((p._have_other_join_conjunct))); + st = std::visit(vectorized::Overload { + [&](std::monostate& arg, auto join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side, + auto with_other_conjuncts) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + return Status::OK(); + }, + [&](auto&& arg, auto&& join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side, + auto with_other_conjuncts) -> Status { + using HashTableCtxType = std::decay_t; + using JoinOpType = std::decay_t; + ProcessHashTableBuild hash_table_build_process( + rows, raw_ptrs, this, state->batch_size(), state); + auto st = hash_table_build_process.template run< + JoinOpType::value, has_null_value, + short_circuit_for_null_in_build_side, with_other_conjuncts>( + arg, + has_null_value || short_circuit_for_null_in_build_side + ? &null_map_val->get_data() + : nullptr, + &_shared_state->_has_null_in_build_side); + COUNTER_SET(_memory_used_counter, + _build_blocks_memory_usage->value() + + (int64_t)(arg.hash_table->get_byte_size() + + arg.serialized_keys_size(true))); + COUNTER_SET(_peak_memory_usage_counter, + _memory_used_counter->value()); + return st; + }}, + *_shared_state->hash_table_variants, _shared_state->join_op_variants, + vectorized::make_bool_variant(_build_side_ignore_null), + vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side), + vectorized::make_bool_variant((p._have_other_join_conjunct))); return st; } @@ -542,7 +542,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. - local_state._build_side_mem_used += in_block->allocated_bytes(); if (local_state._build_side_mutable_block.empty()) { auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename( @@ -569,12 +568,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* std::to_string(std::numeric_limits::max())); } - local_state._mem_tracker->consume(in_block->bytes()); - COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes()); - SCOPED_TIMER(local_state._build_side_merge_block_timer); RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow( std::move(*in_block))); + int64_t blocks_mem_usage = local_state._build_side_mutable_block.allocated_bytes(); + COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage); + COUNTER_SET(local_state._peak_memory_usage_counter, blocks_mem_usage); + COUNTER_SET(local_state._build_blocks_memory_usage, blocks_mem_usage); } } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index a544cdcf4563a4..b5cf2fe342bd2f 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -74,8 +74,6 @@ class HashJoinBuildSinkLocalState final std::vector _key_columns_holder; bool _should_build_hash_table = true; - int64_t _build_side_mem_used = 0; - int64_t _build_side_last_mem_used = 0; size_t _build_side_rows = 0; @@ -103,7 +101,7 @@ class HashJoinBuildSinkLocalState final RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; - RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage = nullptr; + RuntimeProfile::Counter* _build_arena_memory_usage = nullptr; }; class HashJoinBuildSinkOperatorX final diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index f91e1eaa2a1b17..fc1b1cccb1c849 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -54,7 +54,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) _construct_mutable_join_block(); _probe_column_disguise_null.reserve(_probe_expr_ctxs.size()); _probe_arena_memory_usage = - profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1); // Probe phase _probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime"); @@ -304,8 +304,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc mutable_join_block, &temp_block, local_state._probe_block.rows(), _is_mark_join, _have_other_join_conjunct); - local_state._mem_tracker->set_consumption( - arg.serialized_keys_size(false)); } else { st = Status::InternalError("uninited hash table"); } @@ -501,6 +499,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu if (&local_state._probe_block != input_block) { input_block->swap(local_state._probe_block); + COUNTER_SET(local_state._memory_used_counter, + (int64_t)local_state._probe_block.allocated_bytes()); + COUNTER_SET(local_state._peak_memory_usage_counter, + local_state._memory_used_counter->value()); } } return Status::OK(); diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 965d62192b2fed..bf4a4d5763c02e 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -132,7 +132,6 @@ struct ProcessHashTableProbe { bool _need_calculate_build_index_has_zero = true; bool* _has_null_in_build_side; - RuntimeProfile::Counter* _rows_returned_counter = nullptr; RuntimeProfile::Counter* _search_hashtable_timer = nullptr; RuntimeProfile::Counter* _init_probe_side_timer = nullptr; RuntimeProfile::Counter* _build_side_output_timer = nullptr; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 6bb5a2006ab9b0..8bee9fc14aeac4 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -51,7 +51,6 @@ ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeLocalState _left_output_slot_flags(parent->left_output_slot_flags()), _right_output_slot_flags(parent->right_output_slot_flags()), _has_null_in_build_side(parent->has_null_in_build_side()), - _rows_returned_counter(parent->_rows_returned_counter), _search_hashtable_timer(parent->_search_hashtable_timer), _init_probe_side_timer(parent->_init_probe_side_timer), _build_side_output_timer(parent->_build_side_output_timer), @@ -177,8 +176,10 @@ typename HashTableType::State ProcessHashTableProbe::_init_probe_sid false, hash_table_ctx.hash_table->get_bucket_size()); hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums, need_judge_null ? null_map : nullptr); - COUNTER_SET(_parent->_probe_arena_memory_usage, - (int64_t)hash_table_ctx.serialized_keys_size(false)); + int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false); + COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage); + COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage); + COUNTER_SET(_parent->_peak_memory_usage_counter, _parent->_memory_used_counter->value()); } return typename HashTableType::State(_parent->_probe_columns); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 1028bca7ce2ca4..49dc384a4e7d87 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -99,7 +99,6 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, local_state._output_expr_contexts, *output_block, block, true)); vectorized::materialize_block_inplace(*block); } - COUNTER_UPDATE(local_state._rows_returned_counter, block->rows()); return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 4a93bac67fe477..5a13fdcbd8482f 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -315,17 +315,28 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized:: } }); + Status status; auto* local_state = state->get_local_state(operator_id()); + Defer defer([&]() { + if (status.ok()) { + if (auto rows = block->rows()) { + COUNTER_UPDATE(local_state->_rows_returned_counter, rows); + COUNTER_UPDATE(local_state->_blocks_returned_counter, 1); + } + } + }); if (_output_row_descriptor) { local_state->clear_origin_block(); - auto status = get_block(state, &local_state->_origin_block, eos); + status = get_block(state, &local_state->_origin_block, eos); if (UNLIKELY(!status.ok())) { return status; } - return do_projections(state, &local_state->_origin_block, block); + status = do_projections(state, &local_state->_origin_block, block); + return status; } - local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption()); - return get_block(state, block, eos); + status = get_block(state, block, eos); + local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value()); + return status; } void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) { @@ -346,8 +357,6 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) if (auto rows = block->rows()) { _num_rows_returned += rows; - COUNTER_UPDATE(_blocks_returned_counter, 1); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); } } @@ -468,10 +477,9 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); - _mem_tracker = std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); - _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1); + _peak_memory_usage_counter = + _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); return Status::OK(); } @@ -504,11 +512,8 @@ Status PipelineXLocalState::close(RuntimeState* state) { if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - if (_rows_returned_counter != nullptr) { - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - } if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + _peak_memory_usage_counter->set(_memory_used_counter->value()); } _closed = true; // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). @@ -548,10 +553,9 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSink _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); - _mem_tracker = std::make_unique(_parent->get_name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1); _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); return Status::OK(); } @@ -564,7 +568,7 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Status e COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + _peak_memory_usage_counter->set(_memory_used_counter->value()); } _closed = true; return Status::OK(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index eca7d608437b06..f65df75e095090 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -155,10 +155,9 @@ class PipelineXLocalStateBase { void reached_limit(vectorized::Block* block, bool* eos); RuntimeProfile* profile() { return _runtime_profile.get(); } - MemTracker* mem_tracker() { return _mem_tracker.get(); } - RuntimeProfile::Counter* rows_returned_counter() { return _rows_returned_counter; } - RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } @@ -180,16 +179,14 @@ class PipelineXLocalStateBase { protected: friend class OperatorXBase; + template + friend class ScanOperatorX; ObjectPool* _pool = nullptr; int64_t _num_rows_returned {0}; std::unique_ptr _runtime_profile; - // Record this node memory size. it is expected that artificial guarantees are accurate, - // which will providea reference for operator memory. - std::unique_ptr _mem_tracker; - std::shared_ptr _query_statistics = nullptr; RuntimeProfile::Counter* _rows_returned_counter = nullptr; @@ -334,13 +331,14 @@ class PipelineXSinkLocalStateBase { DataSinkOperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } RuntimeProfile* profile() { return _profile; } - MemTracker* mem_tracker() { return _mem_tracker.get(); } [[nodiscard]] RuntimeProfile* faker_runtime_profile() const { return _faker_runtime_profile.get(); } RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } virtual std::vector dependencies() const { return {nullptr}; } // override in exchange sink , AsyncWriterSink @@ -352,7 +350,6 @@ class PipelineXSinkLocalStateBase { DataSinkOperatorXBase* _parent = nullptr; RuntimeState* _state = nullptr; RuntimeProfile* _profile = nullptr; - std::unique_ptr _mem_tracker; // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed = false; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index fbabdbdc8f85fe..eba9e8472d3158 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -113,9 +113,9 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _partition_exprs_num = p._partition_exprs_num; _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT); _serialize_key_arena_memory_usage = - _profile->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageHashTable", TUnit::BYTES, 1); _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index f2cd8dea0b943c..6d355477ab871c 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -58,7 +58,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: } } if (!output_block->empty()) { - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); local_state._num_rows_returned += output_block->rows(); } return Status::OK(); @@ -80,7 +79,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: local_state._sort_idx >= local_state._shared_state->partition_sorts.size(); } if (!output_block->empty()) { - COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); local_state._num_rows_returned += output_block->rows(); } return Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 469716b7a22182..ab0a43f4a635cf 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -81,10 +81,10 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat void PartitionedAggSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique("internal_profile"); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); @@ -110,8 +110,8 @@ void PartitionedAggSinkLocalState::_init_counters() { } while (false) void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) { - UPDATE_PROFILE(_hash_table_memory_usage, "HashTable"); - UPDATE_PROFILE(_serialize_key_arena_memory_usage, "SerializeKeyArena"); + UPDATE_PROFILE(_hash_table_memory_usage, "MemoryUsageHashTable"); + UPDATE_PROFILE(_serialize_key_arena_memory_usage, "MemoryUsageSerializeKeyArena"); UPDATE_PROFILE(_build_timer, "BuildTime"); UPDATE_PROFILE(_serialize_key_timer, "SerializeKeyTime"); UPDATE_PROFILE(_merge_timer, "MergeTime"); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 018d63a6deebb1..0e56acc1c574b2 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -557,8 +557,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: } { SCOPED_TIMER(local_state._partition_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, input_block, - local_state._mem_tracker.get())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, input_block)); } std::vector> partition_indexes(_partition_count); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a7297be493f804..ce7198aa4f5f00 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -80,7 +80,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state if (inner_sink_state_) { auto inner_sink_state = assert_cast(inner_sink_state_); - return inner_sink_state->_build_side_mem_used; + return inner_sink_state->_build_blocks_memory_usage->value(); } } return 0; @@ -161,7 +161,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta { SCOPED_TIMER(_partition_timer); - (void)_partitioner->do_partitioning(state, &sub_block, _mem_tracker.get()); + (void)_partitioner->do_partitioning(state, &sub_block); } const auto* channel_ids = _partitioner->get_channel_ids().get(); @@ -334,7 +334,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, { /// TODO: DO NOT execute build exprs twice(when partition and building hash table) SCOPED_TIMER(_partition_timer); - RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block, _mem_tracker.get())); + RETURN_IF_ERROR(_partitioner->do_partitioning(state, in_block)); } auto& p = _parent->cast(); diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index d355d99c2e352f..dba4f27af7c385 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -231,7 +231,6 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp output_block->columns())); *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); - COUNTER_SET(local_state._rows_returned_counter, local_state._num_rows_returned); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 28dbd01280f3c8..2b0d82ee788e48 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -359,7 +359,15 @@ class ScanOperatorX : public OperatorX { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) override { - return get_block(state, block, eos); + Status status = get_block(state, block, eos); + if (status.ok()) { + if (auto rows = block->rows()) { + auto* local_state = state->get_local_state(operator_id()); + COUNTER_UPDATE(local_state->_rows_returned_counter, rows); + COUNTER_UPDATE(local_state->_blocks_returned_counter, 1); + } + } + return status; } [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index b07942b9ab1c05..ee8689a8084e5c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -31,7 +31,7 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); return Status::OK(); } @@ -117,9 +117,12 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { - COUNTER_UPDATE(local_state._sort_blocks_memory_usage, (int64_t)in_block->bytes()); RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); - local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size()); + int64_t data_size = local_state._shared_state->sorter->data_size(); + COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); + COUNTER_SET(local_state._memory_used_counter, data_size); + COUNTER_SET(local_state._peak_memory_usage_counter, data_size); + RETURN_IF_CANCELLED(state); if (state->get_query_ctx()->has_runtime_predicate(_node_id)) { diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 02a99e183c852e..9357c21db47fe7 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -55,6 +55,7 @@ Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* bl SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos)); local_state.reached_limit(block, eos); + LOG(INFO) << "SortSourceOperatorX::get_block, block: " << block->dump_data(); return Status::OK(); } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 4bf1ab04efb628..267bcc83aad92c 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -51,7 +51,7 @@ void SpillSortSinkLocalState::_init_counters() { _partial_sort_timer = ADD_TIMER(_profile, "PartialSortTime"); _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime"); _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); _spill_merge_sort_timer = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", "Spill", 1); @@ -70,7 +70,7 @@ void SpillSortSinkLocalState::_init_counters() { void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE(_partial_sort_timer, "PartialSortTime"); UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime"); - UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks"); + UPDATE_PROFILE(_sort_blocks_memory_usage, "MemoryUsageSortBlocks"); } Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { @@ -156,8 +156,12 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink", { return Status::InternalError("fault_inject spill_sort_sink sink failed"); }); RETURN_IF_ERROR(_sort_sink_operator->sink(local_state._runtime_state.get(), in_block, false)); - local_state._mem_tracker->set_consumption( - local_state._shared_state->in_mem_shared_state->sorter->data_size()); + + int64_t data_size = local_state._shared_state->in_mem_shared_state->sorter->data_size(); + COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); + COUNTER_SET(local_state._memory_used_counter, data_size); + COUNTER_SET(local_state._peak_memory_usage_counter, data_size); + if (eos) { if (local_state._shared_state->is_spilled) { if (revocable_mem_size(state) > 0) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index dfbe42c637ea56..b7da212f01949a 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -87,10 +87,10 @@ Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_init_timer); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( - "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + "MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime"); @@ -357,10 +357,10 @@ Status StreamingAggLocalState::_merge_without_key(vectorized::Block* block) { } void StreamingAggLocalState::_update_memusage_without_key() { - auto arena_memory_usage = _agg_arena_pool->size() - _mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - _mem_usage_record.used_in_arena = _agg_arena_pool->size(); + int64_t arena_memory_usage = _agg_arena_pool->size(); + COUNTER_SET(_memory_used_counter, arena_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage); + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* block) { @@ -372,28 +372,25 @@ Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* b } void StreamingAggLocalState::_update_memusage_with_serialized_key() { - std::visit( - vectorized::Overload { - [&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - }, - [&](auto& agg_method) -> void { - auto& data = *agg_method.hash_table; - auto arena_memory_usage = _agg_arena_pool->size() + - _aggregate_data_container->memory_usage() - - _mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() - - _mem_usage_record.used_in_state); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_UPDATE( - _hash_table_memory_usage, - data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); - _mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); - _mem_usage_record.used_in_arena = - _agg_arena_pool->size() + _aggregate_data_container->memory_usage(); - }}, - _agg_data->method_variant); + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + auto& data = *agg_method.hash_table; + int64_t arena_memory_usage = _agg_arena_pool->size() + + _aggregate_data_container->memory_usage(); + int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes(); + + COUNTER_SET(_memory_used_counter, + arena_memory_usage + hash_table_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, + arena_memory_usage + hash_table_memory_usage); + + COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); + COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); + }}, + _agg_data->method_variant); } template @@ -515,7 +512,6 @@ Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block, // pre stream agg need use _num_row_return to decide whether to do pre stream agg _cur_num_rows_returned += output_block->rows(); _make_nullable_output_key(output_block); - // COUNTER_SET(_rows_returned_counter, _num_rows_returned); _executor->update_memusage(this); return Status::OK(); } @@ -1255,7 +1251,6 @@ Status StreamingAggLocalState::close(RuntimeState* state) { std::vector tmp_deserialize_buffer; _deserialize_buffer.swap(tmp_deserialize_buffer); - Base::_mem_tracker->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); /// _hash_table_size_counter may be null if prepare failed. if (_hash_table_size_counter) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index c37fa5cbd881ca..9a84b694635a46 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -173,12 +173,6 @@ class StreamingAggLocalState final : public PipelineXLocalState }; std::unique_ptr _executor = nullptr; - struct MemoryRecord { - MemoryRecord() : used_in_arena(0), used_in_state(0) {} - int64_t used_in_arena; - int64_t used_in_state; - }; - MemoryRecord _mem_usage_record; std::unique_ptr _child_block = nullptr; bool _child_eos = false; std::unique_ptr _pre_aggregated_block = nullptr; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 2d20b8f365cd7d..c4832b9958c00d 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -26,7 +26,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _channel_id = info.task_idx; - _shared_state->mem_trackers[_channel_id] = _mem_tracker.get(); + _shared_state->mem_counters[_channel_id] = _memory_used_counter; _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); _get_block_failed_counter = @@ -105,8 +105,8 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c _exchanger->data_queue_debug_string(_channel_id)); size_t i = 0; fmt::format_to(debug_string_buffer, ", MemTrackers: "); - for (auto* mem_tracker : _shared_state->mem_trackers) { - fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption()); + for (auto* mem_counter : _shared_state->mem_counters) { + fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_counter->value()); i++; } return fmt::to_string(debug_string_buffer); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index f4630f328bb70d..34b7fb503b519d 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -118,8 +118,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, } { SCOPED_TIMER(local_state._compute_hash_value_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block, - local_state.mem_tracker())); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block)); } { SCOPED_TIMER(local_state._distribute_timer); diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp deleted file mode 100644 index 796e6c166e04fe..00000000000000 --- a/be/src/runtime/memory/mem_tracker.cpp +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from - -#include "runtime/memory/mem_tracker.h" - -#include - -namespace doris { - -constexpr size_t MEM_TRACKERS_GROUP_NUM = 1000; -std::atomic mem_tracker_group_counter(0); -bvar::Adder g_memtracker_cnt("memtracker_cnt"); - -std::vector MemTracker::mem_tracker_pool(MEM_TRACKERS_GROUP_NUM); - -MemTracker::MemTracker(const std::string& label) { - _label = label; - _group_num = mem_tracker_group_counter.fetch_add(1) % MEM_TRACKERS_GROUP_NUM; - { - std::lock_guard l(mem_tracker_pool[_group_num].group_lock); - _trackers_group_it = mem_tracker_pool[_group_num].trackers.insert( - mem_tracker_pool[_group_num].trackers.end(), this); - } - g_memtracker_cnt << 1; -} - -MemTracker::~MemTracker() { - if (_group_num != -1) { - std::lock_guard l(mem_tracker_pool[_group_num].group_lock); - if (_trackers_group_it != mem_tracker_pool[_group_num].trackers.end()) { - mem_tracker_pool[_group_num].trackers.erase(_trackers_group_it); - _trackers_group_it = mem_tracker_pool[_group_num].trackers.end(); - } - g_memtracker_cnt << -1; - } -} - -} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 82b05fe544afc8..36dfa8e44f1d6d 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -33,8 +33,8 @@ namespace doris { class MemTracker final { public: MemTracker() = default; - MemTracker(const std::string& label); - ~MemTracker(); + MemTracker(std::string label) : _label(std::move(label)) {}; + ~MemTracker() = default; void consume(int64_t bytes) { _mem_counter.add(bytes); } void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } @@ -53,21 +53,6 @@ class MemTracker final { private: MemCounter _mem_counter; std::string _label {"None"}; - - /* - * Save all MemTrackers, used by dump memory info. - */ - struct TrackersGroup { - std::list trackers; - std::mutex group_lock; - }; - // Each group corresponds to several MemCountes and has a lock. - // Multiple groups are used to reduce the impact of locks. - static std::vector mem_tracker_pool; - // Group number in mem_tracker_pool, generated by the timestamp. - int64_t _group_num {-1}; - // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. - std::list::iterator _trackers_group_it; }; } // namespace doris diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index 0d6165b75556f6..89656a74508052 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -25,8 +25,8 @@ namespace doris::vectorized { template -Status Partitioner::do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const { +Status Partitioner::do_partitioning(RuntimeState* state, + Block* block) const { int rows = block->rows(); if (rows > 0) { @@ -38,10 +38,7 @@ Status Partitioner::do_partitioning(RuntimeState* sta _hash_vals.resize(rows); std::fill(_hash_vals.begin(), _hash_vals.end(), 0); auto* __restrict hashes = _hash_vals.data(); - { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker); - RETURN_IF_ERROR(_get_partition_column_result(block, result)); - } + { RETURN_IF_ERROR(_get_partition_column_result(block, result)); } for (int j = 0; j < result_size; ++j) { _do_hash(unpack_if_const(block->get_by_position(result[j]).column).first, hashes, j); } @@ -50,10 +47,7 @@ Status Partitioner::do_partitioning(RuntimeState* sta hashes[i] = ChannelIds()(hashes[i], _partition_count); } - { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker); - Block::erase_useless_column(block, column_to_keep); - } + { Block::erase_useless_column(block, column_to_keep); } } return Status::OK(); } diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 3152edb5cb57c7..5607a83327b119 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -48,8 +48,7 @@ class PartitionerBase { virtual Status open(RuntimeState* state) = 0; - virtual Status do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const = 0; + virtual Status do_partitioning(RuntimeState* state, Block* block) const = 0; virtual ChannelField get_channel_ids() const = 0; @@ -75,8 +74,7 @@ class Partitioner : public PartitionerBase { Status open(RuntimeState* state) override { return VExpr::open(_partition_expr_ctxs, state); } - Status do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const override; + Status do_partitioning(RuntimeState* state, Block* block) const override; ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(HashValueType)}; diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index c14d119e0fee25..78067b9b18106c 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instanc } std::shared_ptr VDataStreamMgr::create_recvr( - RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging) { + RuntimeState* state, pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging) { DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; - std::shared_ptr recvr(new VDataStreamRecvr(this, state, row_desc, + std::shared_ptr recvr(new VDataStreamRecvr(this, parent, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, profile)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 09e347fcfb2a7a..bd5e6f9b91ee57 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -40,6 +40,9 @@ class RuntimeState; class RowDescriptor; class RuntimeProfile; class PTransmitDataParams; +namespace pipeline { +class ExchangeLocalState; +} namespace vectorized { class VDataStreamRecvr; @@ -50,6 +53,7 @@ class VDataStreamMgr { ~VDataStreamMgr(); std::shared_ptr create_recvr(RuntimeState* state, + pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1ca6bb7f2c5931..f1dfbbf304763f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -96,6 +96,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); _block_queue.pop_front(); + _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -207,6 +208,9 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } + _recvr->_parent->memory_used_counter()->update(block_byte_size); + _recvr->_parent->peak_memory_usage_counter()->set( + _recvr->_parent->memory_used_counter()->value()); add_blocks_memory_usage(block_byte_size); return Status::OK(); } @@ -245,6 +249,9 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _record_debug_info(); try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); + _recvr->_parent->memory_used_counter()->update(block_mem_size); + _recvr->_parent->peak_memory_usage_counter()->set( + _recvr->_parent->memory_used_counter()->value()); add_blocks_memory_usage(block_mem_size); } } @@ -315,12 +322,13 @@ void VDataStreamRecvr::SenderQueue::close() { _block_queue.clear(); } -VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, - const RowDescriptor& row_desc, +VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile) : HasTaskExecutionCtx(state), _mgr(stream_mgr), + _parent(parent), _query_thread_context(state->query_id(), state->query_mem_tracker(), state->get_query_ctx()->workload_group()), _fragment_instance_id(fragment_instance_id), @@ -352,9 +360,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta } // Initialize the counters - _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); - _peak_memory_usage_counter = - _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES); _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); @@ -417,7 +422,6 @@ std::shared_ptr VDataStreamRecvr::get_local_channel_depend } Status VDataStreamRecvr::get_next(Block* block, bool* eos) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); @@ -492,9 +496,6 @@ void VDataStreamRecvr::close() { _mgr = nullptr; _merger.reset(); - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); - } } void VDataStreamRecvr::set_sink_dep_always_ready() const { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e8dcfdedba5fb9..b2d76590ba2717 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -69,7 +69,8 @@ class VDataStreamRecvr; class VDataStreamRecvr : public HasTaskExecutionCtx { public: class SenderQueue; - VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc, + VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile); @@ -120,6 +121,8 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { // DataStreamMgr instance used to create this recvr. (Not owned) VDataStreamMgr* _mgr = nullptr; + pipeline::ExchangeLocalState* _parent = nullptr; + QueryThreadContext _query_thread_context; // Fragment and node id of the destination exchange node this receiver is used by. @@ -152,8 +155,6 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { RuntimeProfile::Counter* _data_arrival_timer = nullptr; RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; // Number of rows received RuntimeProfile::Counter* _rows_produced_counter = nullptr; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fb2f24ee0e1817..3ee973e3d6d820 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -39,7 +39,6 @@ #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "runtime/descriptors.h" -#include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "runtime/types.h" @@ -129,6 +128,12 @@ std::shared_ptr PipChannel::get_local_channel_dependency() Channel::_parent->sender_id()); } +int64_t PipChannel::mem_usage() const { + auto* mutable_block = Channel::_serializer.get_block(); + int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0; + return mem_usage; +} + Status PipChannel::send_remote_block(PBlock* block, bool eos, Status exec_status) { COUNTER_UPDATE(Channel::_parent->blocks_sent_counter(), 1); std::unique_ptr pblock_ptr; @@ -165,7 +170,6 @@ Status PipChannel::send_current_block(bool eos, Status exec_status) { if (Channel::is_local()) { return Channel::send_local_block(exec_status, eos); } - SCOPED_CONSUME_MEM_TRACKER(Channel::_parent->mem_tracker()); RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status)); return Status::OK(); } @@ -177,7 +181,6 @@ Status Channel::send_current_block(bool eos, Status exec_status) { if (is_local()) { return send_local_block(exec_status, eos); } - SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (eos) { RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); } @@ -323,7 +326,6 @@ Status Channel::close_internal(Status exec_status) { if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) { status = send_current_block(true, exec_status); } else { - SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (is_local()) { if (_recvr_is_valid()) { _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); @@ -369,12 +371,10 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest bool* serialized, bool eos, const std::vector* rows) { if (_mutable_block == nullptr) { - SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); _mutable_block = MutableBlock::create_unique(block->clone_empty()); } { - SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (rows) { if (!rows->empty()) { SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 7c86a62519a851..2b839686dc8289 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -276,6 +276,8 @@ class PipChannel final : public Channel { ~PipChannel() override { delete Channel::_ch_cur_pb_block; } + int64_t mem_usage() const; + void ch_roll_pb_block() override { // We have two choices here. // 1. Use a PBlock pool and fetch an available PBlock if we need one. In this way, we can