Skip to content

Commit

Permalink
[fix](counters) fix MemoryUsage and PeakMemoryUsage counters
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Oct 9, 2024
1 parent 4562b9a commit 69f6048
Show file tree
Hide file tree
Showing 30 changed files with 162 additions and 89 deletions.
23 changes: 13 additions & 10 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
_hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT);
_hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable",
TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
"MemoryUsageSerializeKeyArena", TUnit::BYTES, "", 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
Expand Down Expand Up @@ -231,15 +231,16 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
auto hash_table_memory_usage =
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
Base::_shared_state->mem_usage_record.used_in_state;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(hash_table_memory_usage);
Base::_memory_used_counter->set(Base::_mem_tracker->consumption());
Base::_peak_memory_usage_counter->set(
Base::_mem_tracker->peak_consumption());
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_UPDATE(
_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
COUNTER_UPDATE(_hash_table_memory_usage, hash_table_memory_usage);
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
Expand Down Expand Up @@ -426,6 +427,8 @@ void AggSinkLocalState::_update_memusage_without_key() {
auto arena_memory_usage =
_agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_memory_used_counter->set(Base::_mem_tracker->consumption());
Base::_peak_memory_usage_counter->set(Base::_mem_tracker->peak_consumption());
_serialize_key_arena_memory_usage->add(arena_memory_usage);
Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size();
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,18 +460,16 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) {
if (_shared_state->do_sort_limit && _shared_state->do_limit_filter(block, block->rows())) {
vectorized::Block::filter_block_internal(block, _shared_state->need_computes);
if (auto rows = block->rows()) {
_num_rows_returned += rows;
add_num_rows_returned(rows);
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
} else {
reached_limit(block, eos);
}
} else {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
add_num_rows_returned(rows);
COUNTER_UPDATE(_blocks_returned_counter, 1);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_blocks_memory_usage =
_profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
_profile->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_blocks_memory_usage =
profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
return Status::OK();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::Cancelled("Expected {} {} to be returned by expression {}",
to_string_lambda(_assertion), _desired_num_rows, _subquery_string);
}
COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
// were aggregated into it. Exclude passed through rows from this calculation since
// they were not in hash tables.
const int64_t input_rows = _input_num_rows;
const int64_t aggregated_input_rows = input_rows - _num_rows_returned;
const int64_t aggregated_input_rows = input_rows - num_rows_returned();
// TODO chenhao
// const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
double current_reduction =
Expand Down Expand Up @@ -433,8 +433,8 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc
in_block, local_state._aggregated_block.get()));
// set limit and reach limit
if (_limit != -1 &&
(local_state._num_rows_returned + local_state._aggregated_block->rows()) > _limit) {
auto limit_rows = _limit - local_state._num_rows_returned;
(local_state.num_rows_returned() + local_state._aggregated_block->rows()) > _limit) {
auto limit_rows = _limit - local_state.num_rows_returned();
local_state._aggregated_block->set_num_rows(limit_rows);
local_state._reach_limit = true;
}
Expand Down
31 changes: 29 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
}
_parent->memory_used_counter()->update(request.block->ByteSizeLong());
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
Expand Down Expand Up @@ -204,6 +205,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
request.block_holder->get_block()->be_exec_version()));
}
_parent->memory_used_counter()->update(request.block_holder->get_block()->ByteSizeLong());
_instance_to_broadcast_package_queue[ins_id].emplace(request);
}
if (send_now) {
Expand Down Expand Up @@ -303,6 +305,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
_parent->memory_used_counter()->update(-request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
Expand Down Expand Up @@ -386,6 +389,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block_holder->get_block()) {
_parent->memory_used_counter()->update(
-request.block_holder->get_block()->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
broadcast_q.pop();
Expand Down Expand Up @@ -435,8 +440,30 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
_turn_off_channel(id, true);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
_parent->memory_used_counter()->update(
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
_parent->memory_used_counter()->update(-q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
}
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
Expand Down
27 changes: 27 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(
local_state._partitioner->do_partitioning(state, block, _mem_tracker.get()));
}
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
old_channel_mem_usage += channel->mem_usage();
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(
state, local_state.channels, local_state._partition_count,
Expand All @@ -470,7 +474,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
state, local_state.channel_shared_ptrs, local_state._partition_count,
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
}
int64_t new_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
new_channel_mem_usage += channel->mem_usage();
}
local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage);
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
old_channel_mem_usage += channel->mem_usage();
}
// check out of limit
RETURN_IF_ERROR(local_state._send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>();
Expand Down Expand Up @@ -506,7 +519,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
// when send data we still use block
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels,
channel2rows, block, eos));
int64_t new_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
new_channel_mem_usage += channel->mem_usage();
}
local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage);
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
old_channel_mem_usage += channel->mem_usage();
}
{
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
RETURN_IF_ERROR(
Expand All @@ -517,6 +539,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(channel_add_rows_with_idx(
state, local_state.channels, local_state.channels.size(), assignments, block, eos));

int64_t new_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
new_channel_mem_usage += channel->mem_usage();
}
local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage);
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
// 1. select channel
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging());
state, this, p.input_row_desc(), state->fragment_instance_id(), p.node_id(),
p.num_senders(), profile(), p.is_merging());
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
Expand Down Expand Up @@ -169,9 +169,8 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
*eos = true;
auto limit = _limit - local_state.num_rows_returned();
block->set_num_rows(limit);
local_state.set_num_rows_returned(_limit);
local_state.add_num_rows_returned(limit);
}
COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
}
return status;
Expand Down
19 changes: 12 additions & 7 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
}

_build_blocks_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1);
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1);
_hash_table_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1);
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_build_arena_memory_usage =
profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1);
profile()->AddHighWaterMarkCounter("MemoryUsageBuildKeyArena", TUnit::BYTES, "", 1);

// Build phase
auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile();
Expand Down Expand Up @@ -321,6 +321,8 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
_mem_tracker->consume(arg.hash_table->get_byte_size() -
old_hash_table_size);
_mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size);
_memory_used_counter->set(_mem_tracker->consumption());
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
return st;
}},
*_shared_state->hash_table_variants, _shared_state->join_op_variants,
Expand Down Expand Up @@ -542,7 +544,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
local_state._build_side_mem_used += in_block->allocated_bytes();

if (local_state._build_side_mutable_block.empty()) {
auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename(
Expand All @@ -569,12 +570,16 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
std::to_string(std::numeric_limits<uint32_t>::max()));
}

local_state._mem_tracker->consume(in_block->bytes());
COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes());

auto old_block_mem_usage = local_state._build_side_mutable_block.allocated_bytes();
SCOPED_TIMER(local_state._build_side_merge_block_timer);
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
std::move(*in_block)));
auto new_block_mem_usage = local_state._build_side_mutable_block.allocated_bytes();
local_state._mem_tracker->consume(new_block_mem_usage - old_block_mem_usage);
local_state._memory_used_counter->set(local_state._mem_tracker->consumption());
local_state._peak_memory_usage_counter->set(
local_state._mem_tracker->peak_consumption());
COUNTER_SET(local_state._build_blocks_memory_usage, (int64_t)new_block_mem_usage);
}
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ class HashJoinBuildSinkLocalState final
std::vector<vectorized::ColumnPtr> _key_columns_holder;

bool _should_build_hash_table = true;
int64_t _build_side_mem_used = 0;
int64_t _build_side_last_mem_used = 0;

size_t _build_side_rows = 0;

Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
_construct_mutable_join_block();
_probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
_probe_arena_memory_usage =
profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1);
// Probe phase
_probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime");
_probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
Expand Down Expand Up @@ -500,7 +500,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu
RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids));

if (&local_state._probe_block != input_block) {
auto old_block_mem_usage = local_state._probe_block.allocated_bytes();
input_block->swap(local_state._probe_block);
auto new_block_mem_usage = local_state._probe_block.allocated_bytes();
local_state._mem_tracker->consume(new_block_mem_usage - old_block_mem_usage);
}
}
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ struct ProcessHashTableProbe {
bool _need_calculate_build_index_has_zero = true;
bool* _has_null_in_build_side;

RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
Expand Down
Loading

0 comments on commit 69f6048

Please sign in to comment.