Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DorisVector to avoid memory usage from not being traced #41557

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion be/src/pipeline/common/agg_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct AggregateDataContainer {
}

*reinterpret_cast<KeyType*>(_current_keys) = key;
auto aggregate_data = _current_agg_data;
auto* aggregate_data = _current_agg_data;
++_total_count;
++_index_in_sub_container;
_current_agg_data += _size_of_aggregate_states;
Expand Down Expand Up @@ -287,6 +287,26 @@ struct AggregateDataContainer {

[[nodiscard]] uint32_t total_count() const { return _total_count; }

size_t estimate_memory(size_t rows) const {
bool need_to_expand = false;
if (_total_count == 0) {
need_to_expand = true;
} else if ((_index_in_sub_container + rows) > SUB_CONTAINER_CAPACITY) {
need_to_expand = true;
rows -= (SUB_CONTAINER_CAPACITY - _index_in_sub_container);
}

if (!need_to_expand) {
return 0;
}

size_t count = (rows + SUB_CONTAINER_CAPACITY - 1) / SUB_CONTAINER_CAPACITY;
size_t size = _size_of_key * SUB_CONTAINER_CAPACITY;
size += _size_of_aggregate_states * SUB_CONTAINER_CAPACITY;
size *= count;
return size;
}

void init_once() {
if (_inited) {
return;
Expand Down
63 changes: 31 additions & 32 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT);

_container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage", TUnit::BYTES);
_arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage", TUnit::BYTES);
_memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", TUnit::BYTES);

return Status::OK();
}
Expand Down Expand Up @@ -230,36 +230,35 @@ size_t AggSinkLocalState::_memory_usage() const {
}

void AggSinkLocalState::_update_memusage_with_serialized_key() {
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_SET(_container_memory_usage,
Base::_shared_state->aggregate_data_container->memory_usage());
COUNTER_SET(_arena_memory_usage,
static_cast<int64_t>(_agg_arena_pool->size()));
COUNTER_UPDATE(_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
}},
_agg_data->method_variant);
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_SET(
_memory_usage_container,
Base::_shared_state->aggregate_data_container->memory_usage());
COUNTER_SET(_memory_usage_arena,
static_cast<int64_t>(_agg_arena_pool->size()));
COUNTER_SET(_hash_table_memory_usage,
int64_t(data.get_buffer_size_in_bytes()));
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
}},
_agg_data->method_variant);
}

Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _container_memory_usage = nullptr;
RuntimeProfile::Counter* _arena_memory_usage = nullptr;
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;

bool _should_limit_output = false;
Expand Down
121 changes: 73 additions & 48 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
_hash_table_size_counter =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize", TUnit::UNIT, 1);

_memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", TUnit::BYTES);

auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._without_key) {
if (p._needs_finalize) {
Expand Down Expand Up @@ -582,6 +585,21 @@ template Status AggSourceOperatorX::merge_with_serialized_key_helper<true>(
template Status AggSourceOperatorX::merge_with_serialized_key_helper<false>(
RuntimeState* state, vectorized::Block* block);

size_t AggSourceOperatorX::get_estimated_memory_size_for_merging(RuntimeState* state,
size_t rows) const {
auto& local_state = get_local_state(state);
size_t size = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> size_t {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
return 0;
},
[&](auto& agg_method) { return agg_method.hash_table->estimate_memory(rows); }},
local_state._shared_state->agg_data->method_variant);
size += local_state._shared_state->aggregate_data_container->estimate_memory(rows);
return size;
}

size_t AggLocalState::_get_hash_table_size() {
return std::visit(
vectorized::Overload {[&](std::monostate& arg) -> size_t {
Expand All @@ -596,54 +614,61 @@ size_t AggLocalState::_get_hash_table_size() {
void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns,
size_t num_rows) {
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(
key, origin, *_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _shared_state->agg_arena_pool->aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};

SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
places[i] = agg_method.lazy_emplace(state, i, creator,
creator_for_null_key);
}

COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
static_cast<int64_t>(
agg_method.hash_table->get_buffer_size_in_bytes()));
COUNTER_SET(_hash_table_size_counter,
static_cast<int64_t>(agg_method.hash_table->size()));
}},
_shared_state->agg_data->method_variant);
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(
key, origin, *_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _shared_state->agg_arena_pool->aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};

SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
places[i] = agg_method.lazy_emplace(state, i, creator,
creator_for_null_key);
}

COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
static_cast<int64_t>(
agg_method.hash_table->get_buffer_size_in_bytes()));
COUNTER_SET(_hash_table_size_counter,
static_cast<int64_t>(agg_method.hash_table->size()));
COUNTER_SET(
_memory_usage_container,
static_cast<int64_t>(
_shared_state->aggregate_data_container->memory_usage()));
COUNTER_SET(_memory_usage_arena,
static_cast<int64_t>(_shared_state->agg_arena_pool->size()));
}},
_shared_state->agg_data->method_variant);
}

void AggLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places,
Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class AggLocalState final : public PipelineXLocalState<AggSharedState> {
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;

using vectorized_get_result =
std::function<Status(RuntimeState* state, vectorized::Block* block, bool* eos)>;
Expand All @@ -102,7 +104,7 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {
using Base = OperatorX<AggLocalState>;
AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);
~AggSourceOperatorX() = default;
~AggSourceOperatorX() override = default;

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

Expand All @@ -111,6 +113,8 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {
template <bool limit>
Status merge_with_serialized_key_helper(RuntimeState* state, vectorized::Block* block);

size_t get_estimated_memory_size_for_merging(RuntimeState* state, size_t rows) const;

private:
friend class AggLocalState;

Expand Down
10 changes: 9 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
size_t size_to_reserve = 0;

if (!_build_side_mutable_block.empty()) {
size_to_reserve += _build_side_mutable_block.allocated_bytes();
const auto bytes = _build_side_mutable_block.bytes();
const auto allocated_bytes = _build_side_mutable_block.allocated_bytes();
if (allocated_bytes != 0 && ((bytes * 100) / allocated_bytes) >= 85) {
size_to_reserve += bytes;
}
}

const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
Expand Down Expand Up @@ -277,6 +281,10 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
if (UNLIKELY(rows == 0)) {
return Status::OK();
}

LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " << block.columns()
<< ", bytes/allocated_bytes: " << block.bytes() << "/" << block.allocated_bytes();

COUNTER_UPDATE(_build_rows_counter, rows);
block.replace_if_overflow();

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
#include "vec/common/custom_allocator.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -73,7 +74,7 @@ struct ProcessHashTableProbe {
// each matching join column need to be processed by other join conjunct. so the struct of mutable block
// and output block may be different
// The output result is determined by the other join conjunct result and same_to_prev struct
Status do_other_join_conjuncts(vectorized::Block* output_block, std::vector<uint8_t>& visited,
Status do_other_join_conjuncts(vectorized::Block* output_block, DorisVector<uint8_t>& visited,
bool has_null_in_build_side);

template <bool with_other_conjuncts>
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "util/simd/bits.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/custom_allocator.h"
#include "vec/exprs/vexpr_context.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -493,7 +494,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo

template <int JoinOpType>
Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block* output_block,
std::vector<uint8_t>& visited,
DorisVector<uint8_t>& visited,
bool has_null_in_build_side) {
// dispose the other join conjunct exec
auto row_count = output_block->rows();
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ class PipelineXLocalStateBase {
Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
vectorized::Block* block, size_t column_to_keep);

void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage += usage; }

int64_t& estimate_memory_usage() { return _estimate_memory_usage; }

void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
Expand Down
Loading
Loading