Skip to content

Commit

Permalink
[pipelinex](profile) improve memory counter of pipelineX (#30538)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg authored and Doris-Extras committed Jan 31, 2024
1 parent 8deeaef commit ef8d9ad
Show file tree
Hide file tree
Showing 20 changed files with 66 additions and 41 deletions.
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
RETURN_IF_ERROR(
p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i]));
}
_memory_usage_counter = ADD_LABEL_COUNTER(Base::profile(), "MemoryUsage");
_hash_table_memory_usage =
ADD_CHILD_COUNTER(Base::profile(), "HashTable", TUnit::BYTES, "MemoryUsage");
_hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable",
TUnit::BYTES, "MemoryUsage", 1);
_serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage");
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime");
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
RuntimeProfile::Counter* _serialize_data_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
_blocks_memory_usage =
_profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");

size_t agg_size = p._agg_expr_ctxs.size();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSinkDepend
bool need_check_first = false);
bool _whether_need_next_partition(vectorized::BlockRowPos& found_partition_end);

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
auto& p = _parent->cast<AnalyticSourceOperatorX>();
_agg_functions_size = p._agg_functions.size();

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_blocks_memory_usage =
profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");

_agg_functions.resize(p._agg_functions.size());
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class AnalyticLocalState final : public PipelineXLocalState<AnalyticSourceDepend
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
"");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
_wait_queue_timer =
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependenc
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;

RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
Expand Down
20 changes: 12 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
}
}

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");

_build_blocks_memory_usage =
ADD_CHILD_COUNTER(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage");
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, "MemoryUsage");
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1);
_build_arena_memory_usage =
profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage");
profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1);

// Build phase
auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile();
Expand Down Expand Up @@ -271,14 +269,20 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
HashJoinBuildSinkLocalState>
hash_table_build_process(rows, raw_ptrs, this,
state->batch_size(), state);
return hash_table_build_process.template run<
auto old_hash_table_size = arg.hash_table->get_byte_size();
auto old_key_size = arg.serialized_keys_size(true);
auto st = hash_table_build_process.template run<
JoinOpType::value, has_null_value,
short_circuit_for_null_in_build_side, with_other_conjuncts>(
arg,
has_null_value || short_circuit_for_null_in_build_side
? &null_map_val->get_data()
: nullptr,
&_shared_state->_has_null_in_build_side);
_mem_tracker->consume(arg.hash_table->get_byte_size() -
old_hash_table_size);
_mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size);
return st;
}},
*_shared_state->hash_table_variants, _shared_state->join_op_variants,
vectorized::make_bool_variant(_build_side_ignore_null),
Expand Down Expand Up @@ -469,6 +473,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

SCOPED_TIMER(local_state._build_side_merge_block_timer);
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes());
local_state._mem_tracker->consume(in_block->bytes());
if (local_state._build_side_mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported(
Expand All @@ -483,8 +489,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
DCHECK(!local_state._build_side_mutable_block.empty());
local_state._shared_state->build_block = std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());
COUNTER_UPDATE(local_state._build_blocks_memory_usage,
(*local_state._shared_state->build_block).bytes());

const bool use_global_rf =
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ class HashJoinBuildSinkLocalState final

RuntimeProfile::Counter* _allocate_resource_timer = nullptr;

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage = nullptr;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
_construct_mutable_join_block();
_probe_column_disguise_null.reserve(_probe_expr_ctxs.size());
_probe_arena_memory_usage =
profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage");
profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
// Probe phase
_probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime");
_probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
Expand Down Expand Up @@ -320,6 +320,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
mutable_join_block, &temp_block,
local_state._probe_block.rows(), _is_mark_join,
_have_other_join_conjunct);
local_state._mem_tracker->set_consumption(
arg.serialized_keys_size(false));
} else {
st = Status::InternalError("uninited hash table");
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exec/scan/pip_scanner_context.h"
#include "vec/exec/scan/scanner_context.h"
Expand Down Expand Up @@ -1286,11 +1287,11 @@ Status ScanLocalState<Derived>::_init_profile() {
_scanner_profile.reset(new RuntimeProfile("VScanner"));
profile()->add_child(_scanner_profile.get(), true, nullptr);

_memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage");
_queued_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES, "MemoryUsage");
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1);
_queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter(
"QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage");
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1);
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
// time of transfer thread to wait for block from scan thread
Expand Down
11 changes: 8 additions & 3 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {

_profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true");

_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_sort_blocks_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1);
return Status::OK();
}

Expand Down Expand Up @@ -149,16 +150,20 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (in_block->rows() > 0) {
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
COUNTER_SET(local_state._sort_blocks_memory_usage,
(int64_t)local_state._shared_state->sorter->data_size());
RETURN_IF_CANCELLED(state);

// update runtime predicate
if (_use_topn_opt) {
vectorized::Field new_top = local_state._shared_state->sorter->get_top_value();
if (!new_top.is_null() && new_top != local_state.old_top) {
auto& sort_description = local_state._shared_state->sorter->get_sort_description();
const auto& sort_description =
local_state._shared_state->sorter->get_sort_description();
auto col = in_block->get_by_position(sort_description[0].column_number);
bool is_reverse = sort_description[0].direction < 0;
auto query_ctx = state->get_query_ctx();
auto* query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(
query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse));
local_state.old_top = std::move(new_top);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState<SortSinkDependency> {
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;

RuntimeProfile::Counter* _memory_usage_counter = nullptr;
RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr;

// topn top value
vectorized::Field old_top {vectorized::Field::Types::Null};
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
_mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name());
_memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
_memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1);
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
return Status::OK();
}

Expand Down Expand Up @@ -462,9 +462,9 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
_mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
_memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1);
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,16 @@ const std::string* RuntimeProfile::get_info_string(const std::string& key) {

#define ADD_COUNTER_IMPL(NAME, T) \
RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, TUnit::type unit, \
const std::string& parent_counter_name) { \
const std::string& parent_counter_name, \
int64_t level) { \
DCHECK_EQ(_is_averaged_profile, false); \
std::lock_guard<std::mutex> l(_counter_map_lock); \
if (_counter_map.find(name) != _counter_map.end()) { \
return reinterpret_cast<T*>(_counter_map[name]); \
} \
DCHECK(parent_counter_name == ROOT_COUNTER || \
_counter_map.find(parent_counter_name) != _counter_map.end()); \
T* counter = _pool->add(new T(unit)); \
T* counter = _pool->add(new T(unit, level)); \
_counter_map[name] = counter; \
std::set<std::string>* child_counters = \
find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>()); \
Expand Down
6 changes: 4 additions & 2 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class RuntimeProfile {
/// as value()) and the current value.
class HighWaterMarkCounter : public Counter {
public:
HighWaterMarkCounter(TUnit::type unit) : Counter(unit), current_value_(0) {}
HighWaterMarkCounter(TUnit::type unit, int64_t level = 2)
: Counter(unit, 0, level), current_value_(0) {}

virtual void add(int64_t delta) {
current_value_.fetch_add(delta, std::memory_order_relaxed);
Expand Down Expand Up @@ -413,7 +414,8 @@ class RuntimeProfile {
/// Adds a high water mark counter to the runtime profile. Otherwise, same behavior
/// as AddCounter().
HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, TUnit::type unit,
const std::string& parent_counter_name = "");
const std::string& parent_counter_name = "",
int64_t level = 2);

// Only for create MemTracker(using profile's counter to calc consumption)
std::shared_ptr<HighWaterMarkCounter> AddSharedHighWaterMarkCounter(
Expand Down
14 changes: 14 additions & 0 deletions be/src/vec/common/hash_table/hash_map_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ struct MethodBaseInner {
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) = 0;

virtual size_t serialized_keys_size(bool is_build) const { return 0; }

void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const uint8_t* null_map) {
bucket_nums.resize(num_rows);

Expand Down Expand Up @@ -243,6 +245,10 @@ struct MethodSerialized : public MethodBase<TData> {
Base::keys = input_keys.data();
}

size_t serialized_keys_size(bool is_build) const override {
return is_build ? build_arena.size() : Base::arena.size();
}

void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows,
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) override {
Expand Down Expand Up @@ -277,6 +283,10 @@ struct MethodStringNoCache : public MethodBase<TData> {

std::vector<StringRef> stored_keys;

size_t serialized_keys_size(bool is_build) const override {
return stored_keys.size() * sizeof(StringRef);
}

void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows,
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) override {
Expand Down Expand Up @@ -430,6 +440,10 @@ struct MethodKeysFixed : public MethodBase<TData> {
}
}

size_t serialized_keys_size(bool is_build) const override {
return (is_build ? build_stored_keys.size() : stored_keys.size()) *
sizeof(typename Base::Key);
}
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows,
const uint8_t* null_map = nullptr, bool is_join = false,
bool is_build = false, uint32_t bucket_size = 0) override {
Expand Down
Loading

0 comments on commit ef8d9ad

Please sign in to comment.