Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipelinex](profile) improve memory counter of pipelineX #30538

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
RETURN_IF_ERROR(
p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i]));
}
_memory_usage_counter = ADD_LABEL_COUNTER(Base::profile(), "MemoryUsage");
_hash_table_memory_usage =
ADD_CHILD_COUNTER(Base::profile(), "HashTable", TUnit::BYTES, "MemoryUsage");
_hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable",
TUnit::BYTES, "MemoryUsage", 1);
_serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage");
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime");
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
RuntimeProfile::Counter* _serialize_data_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
_blocks_memory_usage =
_profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");

size_t agg_size = p._agg_expr_ctxs.size();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSinkDepend
bool need_check_first = false);
bool _whether_need_next_partition(vectorized::BlockRowPos& found_partition_end);

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
auto& p = _parent->cast<AnalyticSourceOperatorX>();
_agg_functions_size = p._agg_functions.size();

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_blocks_memory_usage =
profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");

_agg_functions.resize(p._agg_functions.size());
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class AnalyticLocalState final : public PipelineXLocalState<AnalyticSourceDepend
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
"");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
_wait_queue_timer =
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependenc
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;

RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
Expand Down
20 changes: 12 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
}
}

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");

_build_blocks_memory_usage =
ADD_CHILD_COUNTER(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage");
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, "MemoryUsage");
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1);
_build_arena_memory_usage =
profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage");
profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1);

// Build phase
auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile();
Expand Down Expand Up @@ -271,14 +269,20 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
HashJoinBuildSinkLocalState>
hash_table_build_process(rows, raw_ptrs, this,
state->batch_size(), state);
return hash_table_build_process.template run<
auto old_hash_table_size = arg.hash_table->get_byte_size();
auto old_key_size = arg.serialized_keys_size(true);
auto st = hash_table_build_process.template run<
JoinOpType::value, has_null_value,
short_circuit_for_null_in_build_side, with_other_conjuncts>(
arg,
has_null_value || short_circuit_for_null_in_build_side
? &null_map_val->get_data()
: nullptr,
&_shared_state->_has_null_in_build_side);
_mem_tracker->consume(arg.hash_table->get_byte_size() -
old_hash_table_size);
_mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size);
return st;
}},
*_shared_state->hash_table_variants, _shared_state->join_op_variants,
vectorized::make_bool_variant(_build_side_ignore_null),
Expand Down Expand Up @@ -469,6 +473,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

SCOPED_TIMER(local_state._build_side_merge_block_timer);
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes());
local_state._mem_tracker->consume(in_block->bytes());
if (local_state._build_side_mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported(
Expand All @@ -483,8 +489,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
DCHECK(!local_state._build_side_mutable_block.empty());
local_state._shared_state->build_block = std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());
COUNTER_UPDATE(local_state._build_blocks_memory_usage,
(*local_state._shared_state->build_block).bytes());

const bool use_global_rf =
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ class HashJoinBuildSinkLocalState final

RuntimeProfile::Counter* _allocate_resource_timer = nullptr;

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage = nullptr;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
_construct_mutable_join_block();
_probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
_probe_arena_memory_usage =
profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage");
profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
// Probe phase
_probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime");
_probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
Expand Down Expand Up @@ -320,6 +320,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
mutable_join_block, &temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
local_state._mem_tracker->set_consumption(
arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table");
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exec/scan/pip_scanner_context.h"
#include "vec/exec/scan/scanner_context.h"
Expand Down Expand Up @@ -1286,11 +1287,11 @@ Status ScanLocalState<Derived>::_init_profile() {
_scanner_profile.reset(new RuntimeProfile("VScanner"));
profile()->add_child(_scanner_profile.get(), true, nullptr);

_memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage");
_queued_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES, "MemoryUsage");
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1);
_queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter(
"QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage");
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1);
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
// time of transfer thread to wait for block from scan thread
Expand Down
11 changes: 8 additions & 3 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {

_profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true");

_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_sort_blocks_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1);
return Status::OK();
}

Expand Down Expand Up @@ -149,16 +150,20 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (in_block->rows() > 0) {
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
COUNTER_SET(local_state._sort_blocks_memory_usage,
(int64_t)local_state._shared_state->sorter->data_size());
RETURN_IF_CANCELLED(state);

// update runtime predicate
if (_use_topn_opt) {
vectorized::Field new_top = local_state._shared_state->sorter->get_top_value();
if (!new_top.is_null() && new_top != local_state.old_top) {
auto& sort_description = local_state._shared_state->sorter->get_sort_description();
const auto& sort_description =
local_state._shared_state->sorter->get_sort_description();
auto col = in_block->get_by_position(sort_description[0].column_number);
bool is_reverse = sort_description[0].direction < 0;
auto query_ctx = state->get_query_ctx();
auto* query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(
query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse));
local_state.old_top = std::move(new_top);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState<SortSinkDependency> {
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr;

// topn top value
vectorized::Field old_top {vectorized::Field::Types::Null};
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
_mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name());
_memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
_memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1);
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
return Status::OK();
}

Expand Down Expand Up @@ -462,9 +462,9 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
_mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
_memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1);
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,16 @@ const std::string* RuntimeProfile::get_info_string(const std::string& key) {

#define ADD_COUNTER_IMPL(NAME, T) \
RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, TUnit::type unit, \
const std::string& parent_counter_name) { \
const std::string& parent_counter_name, \
int64_t level) { \
DCHECK_EQ(_is_averaged_profile, false); \
std::lock_guard<std::mutex> l(_counter_map_lock); \
if (_counter_map.find(name) != _counter_map.end()) { \
return reinterpret_cast<T*>(_counter_map[name]); \
} \
DCHECK(parent_counter_name == ROOT_COUNTER || \
_counter_map.find(parent_counter_name) != _counter_map.end()); \
T* counter = _pool->add(new T(unit)); \
T* counter = _pool->add(new T(unit, level)); \
_counter_map[name] = counter; \
std::set<std::string>* child_counters = \
find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>()); \
Expand Down
6 changes: 4 additions & 2 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class RuntimeProfile {
/// as value()) and the current value.
class HighWaterMarkCounter : public Counter {
public:
HighWaterMarkCounter(TUnit::type unit) : Counter(unit), current_value_(0) {}
HighWaterMarkCounter(TUnit::type unit, int64_t level = 2)
: Counter(unit, 0, level), current_value_(0) {}

virtual void add(int64_t delta) {
current_value_.fetch_add(delta, std::memory_order_relaxed);
Expand Down Expand Up @@ -413,7 +414,8 @@ class RuntimeProfile {
/// Adds a high water mark counter to the runtime profile. Otherwise, same behavior
/// as AddCounter().
HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, TUnit::type unit,
const std::string& parent_counter_name = "");
const std::string& parent_counter_name = "",
int64_t level = 2);

// Only for create MemTracker(using profile's counter to calc consumption)
std::shared_ptr<HighWaterMarkCounter> AddSharedHighWaterMarkCounter(
Expand Down
14 changes: 14 additions & 0 deletions be/src/vec/common/hash_table/hash_map_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ struct MethodBaseInner {
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) = 0;

virtual size_t serialized_keys_size(bool is_build) const { return 0; }

void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const uint8_t* null_map) {
bucket_nums.resize(num_rows);

Expand Down Expand Up @@ -243,6 +245,10 @@ struct MethodSerialized : public MethodBase<TData> {
Base::keys = input_keys.data();
}

size_t serialized_keys_size(bool is_build) const override {
return is_build ? build_arena.size() : Base::arena.size();
}

void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows,
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) override {
Expand Down Expand Up @@ -277,6 +283,10 @@ struct MethodStringNoCache : public MethodBase<TData> {

std::vector<StringRef> stored_keys;

size_t serialized_keys_size(bool is_build) const override {
return stored_keys.size() * sizeof(StringRef);
}

void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows,
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) override {
Expand Down Expand Up @@ -430,6 +440,10 @@ struct MethodKeysFixed : public MethodBase<TData> {
}
}

size_t serialized_keys_size(bool is_build) const override {
return (is_build ? build_stored_keys.size() : stored_keys.size()) *
sizeof(typename Base::Key);
}
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows,
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) override {
Expand Down
Loading
Loading