Skip to content

Commit

Permalink
Use DorisVector to avoid memory usage from not being traced
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Oct 9, 2024
1 parent ae335e7 commit c79bf06
Show file tree
Hide file tree
Showing 33 changed files with 414 additions and 274 deletions.
22 changes: 21 additions & 1 deletion be/src/pipeline/common/agg_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct AggregateDataContainer {
}

*reinterpret_cast<KeyType*>(_current_keys) = key;
auto aggregate_data = _current_agg_data;
auto* aggregate_data = _current_agg_data;
++_total_count;
++_index_in_sub_container;
_current_agg_data += _size_of_aggregate_states;
Expand Down Expand Up @@ -287,6 +287,26 @@ struct AggregateDataContainer {

[[nodiscard]] uint32_t total_count() const { return _total_count; }

size_t estimate_memory(size_t rows) const {
bool need_to_expand = false;
if (_total_count == 0) {
need_to_expand = true;
} else if ((_index_in_sub_container + rows) > SUB_CONTAINER_CAPACITY) {
need_to_expand = true;
rows -= (SUB_CONTAINER_CAPACITY - _index_in_sub_container);
}

if (!need_to_expand) {
return 0;
}

size_t count = (rows + SUB_CONTAINER_CAPACITY - 1) / SUB_CONTAINER_CAPACITY;
size_t size = _size_of_key * SUB_CONTAINER_CAPACITY;
size += _size_of_aggregate_states * SUB_CONTAINER_CAPACITY;
size *= count;
return size;
}

void init_once() {
if (_inited) {
return;
Expand Down
63 changes: 31 additions & 32 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT);

_container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage", TUnit::BYTES);
_arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage", TUnit::BYTES);
_memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", TUnit::BYTES);

return Status::OK();
}
Expand Down Expand Up @@ -230,36 +230,35 @@ size_t AggSinkLocalState::_memory_usage() const {
}

void AggSinkLocalState::_update_memusage_with_serialized_key() {
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_SET(_container_memory_usage,
Base::_shared_state->aggregate_data_container->memory_usage());
COUNTER_SET(_arena_memory_usage,
static_cast<int64_t>(_agg_arena_pool->size()));
COUNTER_UPDATE(_hash_table_memory_usage,
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
}},
_agg_data->method_variant);
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
auto arena_memory_usage =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage() -
Base::_shared_state->mem_usage_record.used_in_arena;
Base::_mem_tracker->consume(arena_memory_usage);
Base::_mem_tracker->consume(
data.get_buffer_size_in_bytes() -
Base::_shared_state->mem_usage_record.used_in_state);
_serialize_key_arena_memory_usage->add(arena_memory_usage);
COUNTER_SET(
_memory_usage_container,
Base::_shared_state->aggregate_data_container->memory_usage());
COUNTER_SET(_memory_usage_arena,
static_cast<int64_t>(_agg_arena_pool->size()));
COUNTER_SET(_hash_table_memory_usage,
int64_t(data.get_buffer_size_in_bytes()));
Base::_shared_state->mem_usage_record.used_in_state =
data.get_buffer_size_in_bytes();
Base::_shared_state->mem_usage_record.used_in_arena =
_agg_arena_pool->size() +
Base::_shared_state->aggregate_data_container->memory_usage();
}},
_agg_data->method_variant);
}

Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _container_memory_usage = nullptr;
RuntimeProfile::Counter* _arena_memory_usage = nullptr;
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr;

bool _should_limit_output = false;
Expand Down
121 changes: 73 additions & 48 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
_hash_table_size_counter =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize", TUnit::UNIT, 1);

_memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", TUnit::BYTES);

auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._without_key) {
if (p._needs_finalize) {
Expand Down Expand Up @@ -582,6 +585,21 @@ template Status AggSourceOperatorX::merge_with_serialized_key_helper<true>(
template Status AggSourceOperatorX::merge_with_serialized_key_helper<false>(
RuntimeState* state, vectorized::Block* block);

size_t AggSourceOperatorX::get_estimated_memory_size_for_merging(RuntimeState* state,
size_t rows) const {
auto& local_state = get_local_state(state);
size_t size = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> size_t {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
return 0;
},
[&](auto& agg_method) { return agg_method.hash_table->estimate_memory(rows); }},
local_state._shared_state->agg_data->method_variant);
size += local_state._shared_state->aggregate_data_container->estimate_memory(rows);
return size;
}

size_t AggLocalState::_get_hash_table_size() {
return std::visit(
vectorized::Overload {[&](std::monostate& arg) -> size_t {
Expand All @@ -596,54 +614,61 @@ size_t AggLocalState::_get_hash_table_size() {
void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns,
size_t num_rows) {
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(
key, origin, *_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _shared_state->agg_arena_pool->aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};

SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
places[i] = agg_method.lazy_emplace(state, i, creator,
creator_for_null_key);
}

COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
static_cast<int64_t>(
agg_method.hash_table->get_buffer_size_in_bytes()));
COUNTER_SET(_hash_table_size_counter,
static_cast<int64_t>(agg_method.hash_table->size()));
}},
_shared_state->agg_data->method_variant);
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(
key, origin, *_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _shared_state->agg_arena_pool->aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};

SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
places[i] = agg_method.lazy_emplace(state, i, creator,
creator_for_null_key);
}

COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
static_cast<int64_t>(
agg_method.hash_table->get_buffer_size_in_bytes()));
COUNTER_SET(_hash_table_size_counter,
static_cast<int64_t>(agg_method.hash_table->size()));
COUNTER_SET(
_memory_usage_container,
static_cast<int64_t>(
_shared_state->aggregate_data_container->memory_usage()));
COUNTER_SET(_memory_usage_arena,
static_cast<int64_t>(_shared_state->agg_arena_pool->size()));
}},
_shared_state->agg_data->method_variant);
}

void AggLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places,
Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class AggLocalState final : public PipelineXLocalState<AggSharedState> {
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;

using vectorized_get_result =
std::function<Status(RuntimeState* state, vectorized::Block* block, bool* eos)>;
Expand All @@ -102,7 +104,7 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {
using Base = OperatorX<AggLocalState>;
AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);
~AggSourceOperatorX() = default;
~AggSourceOperatorX() override = default;

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

Expand All @@ -111,6 +113,8 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {
template <bool limit>
Status merge_with_serialized_key_helper(RuntimeState* state, vectorized::Block* block);

size_t get_estimated_memory_size_for_merging(RuntimeState* state, size_t rows) const;

private:
friend class AggLocalState;

Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
size_t size_to_reserve = 0;

if (!_build_side_mutable_block.empty()) {
size_to_reserve += _build_side_mutable_block.allocated_bytes();
size_to_reserve += _build_side_mutable_block.bytes();
}

const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
Expand Down Expand Up @@ -277,6 +277,10 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
if (UNLIKELY(rows == 0)) {
return Status::OK();
}

LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " << block.columns()
<< ", bytes/allocated_bytes: " << block.bytes() << "/" << block.allocated_bytes();

COUNTER_UPDATE(_build_rows_counter, rows);
block.replace_if_overflow();

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
#include "vec/common/custom_allocator.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -73,7 +74,7 @@ struct ProcessHashTableProbe {
// each matching join column need to be processed by other join conjunct. so the struct of mutable block
// and output block may be different
// The output result is determined by the other join conjunct result and same_to_prev struct
Status do_other_join_conjuncts(vectorized::Block* output_block, std::vector<uint8_t>& visited,
Status do_other_join_conjuncts(vectorized::Block* output_block, DorisVector<uint8_t>& visited,
bool has_null_in_build_side);

template <bool with_other_conjuncts>
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "util/simd/bits.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/custom_allocator.h"
#include "vec/exprs/vexpr_context.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -493,7 +494,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo

template <int JoinOpType>
Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block* output_block,
std::vector<uint8_t>& visited,
DorisVector<uint8_t>& visited,
bool has_null_in_build_side) {
// dispose the other join conjunct exec
auto row_count = output_block->rows();
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ class PipelineXLocalStateBase {
Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
vectorized::Block* block, size_t column_to_keep);

void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage += usage; }

int64_t& estimate_memory_usage() { return _estimate_memory_usage; }

void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
Expand Down
Loading

0 comments on commit c79bf06

Please sign in to comment.