From b44b7a16f6225e7e3e0e0806c2ed8cb5367d9380 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Tue, 8 Oct 2024 17:23:20 +0800 Subject: [PATCH] Use DorisVector to avoid memory usage from not being traced --- be/src/pipeline/common/agg_utils.h | 22 +++- .../exec/aggregation_sink_operator.cpp | 63 +++++---- .../pipeline/exec/aggregation_sink_operator.h | 4 +- .../exec/aggregation_source_operator.cpp | 121 +++++++++++------- .../exec/aggregation_source_operator.h | 6 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 6 +- .../exec/join/process_hash_table_probe.h | 3 +- .../exec/join/process_hash_table_probe_impl.h | 3 +- be/src/pipeline/exec/operator.h | 2 - .../partitioned_aggregation_sink_operator.cpp | 18 ++- .../partitioned_aggregation_sink_operator.h | 5 +- ...artitioned_aggregation_source_operator.cpp | 114 +++++++++-------- .../partitioned_aggregation_source_operator.h | 8 +- .../partitioned_hash_join_probe_operator.cpp | 110 ++++++++++------ .../partitioned_hash_join_probe_operator.h | 10 +- .../partitioned_hash_join_sink_operator.cpp | 21 ++- .../partitioned_hash_join_sink_operator.h | 3 + be/src/pipeline/pipeline_task.cpp | 1 - .../workload_group/workload_group_manager.cpp | 13 ++ be/src/vec/columns/column.h | 29 ++++- be/src/vec/columns/column_const.h | 5 +- be/src/vec/columns/column_decimal.cpp | 9 +- be/src/vec/columns/column_decimal.h | 9 +- be/src/vec/columns/column_nullable.cpp | 4 +- be/src/vec/columns/column_nullable.h | 6 +- be/src/vec/columns/column_object.h | 9 +- be/src/vec/columns/column_string.cpp | 11 +- be/src/vec/columns/column_string.h | 9 +- be/src/vec/columns/column_vector.cpp | 9 +- be/src/vec/columns/column_vector.h | 9 +- be/src/vec/common/custom_allocator.h | 2 +- .../vec/common/hash_table/hash_map_context.h | 29 +++-- .../vec/common/hash_table/join_hash_table.h | 11 +- 33 files changed, 412 insertions(+), 272 deletions(-) diff --git a/be/src/pipeline/common/agg_utils.h b/be/src/pipeline/common/agg_utils.h index d67ebb9fdf7376..6b18e42b3b36d6 100644 --- a/be/src/pipeline/common/agg_utils.h +++ b/be/src/pipeline/common/agg_utils.h @@ -210,7 +210,7 @@ struct AggregateDataContainer { } *reinterpret_cast(_current_keys) = key; - auto aggregate_data = _current_agg_data; + auto* aggregate_data = _current_agg_data; ++_total_count; ++_index_in_sub_container; _current_agg_data += _size_of_aggregate_states; @@ -287,6 +287,26 @@ struct AggregateDataContainer { [[nodiscard]] uint32_t total_count() const { return _total_count; } + size_t estimate_memory(size_t rows) const { + bool need_to_expand = false; + if (_total_count == 0) { + need_to_expand = true; + } else if ((_index_in_sub_container + rows) > SUB_CONTAINER_CAPACITY) { + need_to_expand = true; + rows -= (SUB_CONTAINER_CAPACITY - _index_in_sub_container); + } + + if (!need_to_expand) { + return 0; + } + + size_t count = (rows + SUB_CONTAINER_CAPACITY - 1) / SUB_CONTAINER_CAPACITY; + size_t size = _size_of_key * SUB_CONTAINER_CAPACITY; + size += _size_of_aggregate_states * SUB_CONTAINER_CAPACITY; + size *= count; + return size; + } + void init_once() { if (_inited) { return; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 0107c5ec4fb56f..d9fc1ee9417fa1 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -77,8 +77,8 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); - _container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage", TUnit::BYTES); - _arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage", TUnit::BYTES); + _memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", TUnit::BYTES); + _memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", TUnit::BYTES); return Status::OK(); } @@ -230,36 +230,35 @@ size_t AggSinkLocalState::_memory_usage() const { } void AggSinkLocalState::_update_memusage_with_serialized_key() { - std::visit( - vectorized::Overload { - [&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - }, - [&](auto& agg_method) -> void { - auto& data = *agg_method.hash_table; - auto arena_memory_usage = - _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage() - - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume( - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_SET(_container_memory_usage, - Base::_shared_state->aggregate_data_container->memory_usage()); - COUNTER_SET(_arena_memory_usage, - static_cast(_agg_arena_pool->size())); - COUNTER_UPDATE(_hash_table_memory_usage, - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - Base::_shared_state->mem_usage_record.used_in_state = - data.get_buffer_size_in_bytes(); - Base::_shared_state->mem_usage_record.used_in_arena = - _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage(); - }}, - _agg_data->method_variant); + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + auto& data = *agg_method.hash_table; + auto arena_memory_usage = + _agg_arena_pool->size() + + Base::_shared_state->aggregate_data_container->memory_usage() - + Base::_shared_state->mem_usage_record.used_in_arena; + Base::_mem_tracker->consume(arena_memory_usage); + Base::_mem_tracker->consume( + data.get_buffer_size_in_bytes() - + Base::_shared_state->mem_usage_record.used_in_state); + _serialize_key_arena_memory_usage->add(arena_memory_usage); + COUNTER_SET( + _memory_usage_container, + Base::_shared_state->aggregate_data_container->memory_usage()); + COUNTER_SET(_memory_usage_arena, + static_cast(_agg_arena_pool->size())); + COUNTER_SET(_hash_table_memory_usage, + int64_t(data.get_buffer_size_in_bytes())); + Base::_shared_state->mem_usage_record.used_in_state = + data.get_buffer_size_in_bytes(); + Base::_shared_state->mem_usage_record.used_in_arena = + _agg_arena_pool->size() + + Base::_shared_state->aggregate_data_container->memory_usage(); + }}, + _agg_data->method_variant); } Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 2fa2d18a7e6338..66aa6bd88b5147 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -112,8 +112,8 @@ class AggSinkLocalState : public PipelineXSinkLocalState { RuntimeProfile::Counter* _max_row_size_counter = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; - RuntimeProfile::Counter* _container_memory_usage = nullptr; - RuntimeProfile::Counter* _arena_memory_usage = nullptr; + RuntimeProfile::Counter* _memory_usage_container = nullptr; + RuntimeProfile::Counter* _memory_usage_arena = nullptr; RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; bool _should_limit_output = false; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 2d760092f576e0..0cf68924a1ddfb 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -53,6 +53,9 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _hash_table_size_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize", TUnit::UNIT, 1); + _memory_usage_container = ADD_COUNTER(profile(), "MemoryUsageContainer", TUnit::BYTES); + _memory_usage_arena = ADD_COUNTER(profile(), "MemoryUsageArena", TUnit::BYTES); + auto& p = _parent->template cast(); if (p._without_key) { if (p._needs_finalize) { @@ -582,6 +585,21 @@ template Status AggSourceOperatorX::merge_with_serialized_key_helper( template Status AggSourceOperatorX::merge_with_serialized_key_helper( RuntimeState* state, vectorized::Block* block); +size_t AggSourceOperatorX::get_estimated_memory_size_for_merging(RuntimeState* state, + size_t rows) const { + auto& local_state = get_local_state(state); + size_t size = std::visit( + vectorized::Overload { + [&](std::monostate& arg) -> size_t { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + return 0; + }, + [&](auto& agg_method) { return agg_method.hash_table->estimate_memory(rows); }}, + local_state._shared_state->agg_data->method_variant); + size += local_state._shared_state->aggregate_data_container->estimate_memory(rows); + return size; +} + size_t AggLocalState::_get_hash_table_size() { return std::visit( vectorized::Overload {[&](std::monostate& arg) -> size_t { @@ -596,54 +614,61 @@ size_t AggLocalState::_get_hash_table_size() { void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places, vectorized::ColumnRawPtrs& key_columns, size_t num_rows) { - std::visit(vectorized::Overload { - [&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - }, - [&](auto& agg_method) -> void { - SCOPED_TIMER(_hash_table_compute_timer); - using HashMethodType = std::decay_t; - using AggState = typename HashMethodType::State; - AggState state(key_columns); - agg_method.init_serialized_keys(key_columns, num_rows); - - auto creator = [this](const auto& ctor, auto& key, auto& origin) { - HashMethodType::try_presis_key_and_origin( - key, origin, *_shared_state->agg_arena_pool); - auto mapped = - Base::_shared_state->aggregate_data_container->append_data( - origin); - auto st = _create_agg_status(mapped); - if (!st) { - throw Exception(st.code(), st.to_string()); - } - ctor(key, mapped); - }; - - auto creator_for_null_key = [&](auto& mapped) { - mapped = _shared_state->agg_arena_pool->aligned_alloc( - _shared_state->total_size_of_aggregate_states, - _shared_state->align_aggregate_states); - auto st = _create_agg_status(mapped); - if (!st) { - throw Exception(st.code(), st.to_string()); - } - }; - - SCOPED_TIMER(_hash_table_emplace_timer); - for (size_t i = 0; i < num_rows; ++i) { - places[i] = agg_method.lazy_emplace(state, i, creator, - creator_for_null_key); - } - - COUNTER_UPDATE(_hash_table_input_counter, num_rows); - COUNTER_SET(_hash_table_memory_usage, - static_cast( - agg_method.hash_table->get_buffer_size_in_bytes())); - COUNTER_SET(_hash_table_size_counter, - static_cast(agg_method.hash_table->size())); - }}, - _shared_state->agg_data->method_variant); + std::visit( + vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + SCOPED_TIMER(_hash_table_compute_timer); + using HashMethodType = std::decay_t; + using AggState = typename HashMethodType::State; + AggState state(key_columns); + agg_method.init_serialized_keys(key_columns, num_rows); + + auto creator = [this](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key_and_origin( + key, origin, *_shared_state->agg_arena_pool); + auto mapped = + Base::_shared_state->aggregate_data_container->append_data( + origin); + auto st = _create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + ctor(key, mapped); + }; + + auto creator_for_null_key = [&](auto& mapped) { + mapped = _shared_state->agg_arena_pool->aligned_alloc( + _shared_state->total_size_of_aggregate_states, + _shared_state->align_aggregate_states); + auto st = _create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + }; + + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + places[i] = agg_method.lazy_emplace(state, i, creator, + creator_for_null_key); + } + + COUNTER_UPDATE(_hash_table_input_counter, num_rows); + COUNTER_SET(_hash_table_memory_usage, + static_cast( + agg_method.hash_table->get_buffer_size_in_bytes())); + COUNTER_SET(_hash_table_size_counter, + static_cast(agg_method.hash_table->size())); + COUNTER_SET( + _memory_usage_container, + static_cast( + _shared_state->aggregate_data_container->memory_usage())); + COUNTER_SET(_memory_usage_arena, + static_cast(_shared_state->agg_arena_pool->size())); + }}, + _shared_state->agg_data->method_variant); } void AggLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places, diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 98ddd6a214260f..09b03a5aab177d 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -86,6 +86,8 @@ class AggLocalState final : public PipelineXLocalState { RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _merge_timer = nullptr; RuntimeProfile::Counter* _deserialize_data_timer = nullptr; + RuntimeProfile::Counter* _memory_usage_container = nullptr; + RuntimeProfile::Counter* _memory_usage_arena = nullptr; using vectorized_get_result = std::function; @@ -102,7 +104,7 @@ class AggSourceOperatorX : public OperatorX { using Base = OperatorX; AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); - ~AggSourceOperatorX() = default; + ~AggSourceOperatorX() override = default; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -111,6 +113,8 @@ class AggSourceOperatorX : public OperatorX { template Status merge_with_serialized_key_helper(RuntimeState* state, vectorized::Block* block); + size_t get_estimated_memory_size_for_merging(RuntimeState* state, size_t rows) const; + private: friend class AggLocalState; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 02e538c4ab3f89..55896cd0549f87 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -122,7 +122,7 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { size_t size_to_reserve = 0; if (!_build_side_mutable_block.empty()) { - size_to_reserve += _build_side_mutable_block.allocated_bytes(); + size_to_reserve += _build_side_mutable_block.bytes(); } const size_t rows = _build_side_mutable_block.rows() + state->batch_size(); @@ -277,6 +277,10 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, if (UNLIKELY(rows == 0)) { return Status::OK(); } + + LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " << block.columns() + << ", bytes/allocated_bytes: " << block.bytes() << "/" << block.allocated_bytes(); + COUNTER_UPDATE(_build_rows_counter, rows); block.replace_if_overflow(); diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 965d62192b2fed..620438677b8532 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -23,6 +23,7 @@ #include "vec/columns/column.h" #include "vec/columns/columns_number.h" #include "vec/common/arena.h" +#include "vec/common/custom_allocator.h" namespace doris { namespace vectorized { @@ -73,7 +74,7 @@ struct ProcessHashTableProbe { // each matching join column need to be processed by other join conjunct. so the struct of mutable block // and output block may be different // The output result is determined by the other join conjunct result and same_to_prev struct - Status do_other_join_conjuncts(vectorized::Block* output_block, std::vector& visited, + Status do_other_join_conjuncts(vectorized::Block* output_block, DorisVector& visited, bool has_null_in_build_side); template diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 653cc8ab4473dd..f1fea52ca89346 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -26,6 +26,7 @@ #include "util/simd/bits.h" #include "vec/columns/column_filter_helper.h" #include "vec/columns/column_nullable.h" +#include "vec/common/custom_allocator.h" #include "vec/exprs/vexpr_context.h" namespace doris::pipeline { @@ -493,7 +494,7 @@ Status ProcessHashTableProbe::do_mark_join_conjuncts(vectorized::Blo template Status ProcessHashTableProbe::do_other_join_conjuncts(vectorized::Block* output_block, - std::vector& visited, + DorisVector& visited, bool has_null_in_build_side) { // dispose the other join conjunct exec auto row_count = output_block->rows(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index b7fbd1b8ceaa1b..b8e3c5b3079dc0 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -196,8 +196,6 @@ class PipelineXLocalStateBase { Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts, vectorized::Block* block, size_t column_to_keep); - void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage += usage; } - int64_t& estimate_memory_usage() { return _estimate_memory_usage; } void reset_estimate_memory_usage() { _estimate_memory_usage = 0; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 0bed1d9e13dcaf..60f691de47f6ea 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -107,10 +107,12 @@ void PartitionedAggSinkLocalState::_init_counters() { _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); - _container_memory_usage = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "ContainerMemoryUsage", TUnit::BYTES, 1); - _arena_memory_usage = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "ArenaMemoryUsage", TUnit::BYTES, 1); + _memory_usage_container = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageContainer", TUnit::BYTES, 1); + _memory_usage_arena = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageArena", TUnit::BYTES, 1); + _memory_usage_reserved = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "MemoryUsageReserved", TUnit::BYTES, 1); COUNTER_SET(_max_row_size_counter, (int64_t)0); _spill_serialize_hash_table_timer = @@ -137,8 +139,8 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime"); UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount"); UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes"); - UPDATE_PROFILE(_container_memory_usage, "ContainerMemoryUsage"); - UPDATE_PROFILE(_arena_memory_usage, "ArenaMemoryUsage"); + UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer"); + UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena"); update_max_min_rows_counter(); } @@ -255,7 +257,9 @@ Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { auto& local_state = get_local_state(state); auto* runtime_state = local_state._runtime_state.get(); - return _agg_sink_operator->get_reserve_mem_size(runtime_state); + auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state); + COUNTER_SET(local_state._memory_usage_reserved, int64_t(size)); + return size; } Status PartitionedAggSinkLocalState::revoke_memory( diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 22001b752a2e66..3c6b46f908e19f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -303,8 +303,9 @@ class PartitionedAggSinkLocalState RuntimeProfile::Counter* _deserialize_data_timer = nullptr; RuntimeProfile::Counter* _max_row_size_counter = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; - RuntimeProfile::Counter* _container_memory_usage = nullptr; - RuntimeProfile::Counter* _arena_memory_usage = nullptr; + RuntimeProfile::Counter* _memory_usage_container = nullptr; + RuntimeProfile::Counter* _memory_usage_arena = nullptr; + RuntimeProfile::Counter* _memory_usage_reserved = nullptr; RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 8b281a88684bd5..c6a6c09f01b82e 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -27,6 +27,7 @@ #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/runtime_profile.h" +#include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -74,6 +75,10 @@ void PartitionedAggLocalState::_init_counters() { ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount", TUnit::UNIT, 1); _hash_table_memory_usage = ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); + + _memory_usage_container = + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageContainer", TUnit::BYTES, 1); + _memory_usage_arena = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageArena", TUnit::BYTES, 1); } #define UPDATE_PROFILE(counter, name) \ @@ -92,6 +97,8 @@ void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime"); UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize"); UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage"); + UPDATE_PROFILE(_memory_usage_container, "MemoryUsageContainer"); + UPDATE_PROFILE(_memory_usage_arena, "MemoryUsageArena"); } Status PartitionedAggLocalState::close(RuntimeState* state) { @@ -140,18 +147,37 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); - if (local_state._shared_state->is_spilled) { - local_state._status = local_state.initiate_merge_spill_partition_agg_data(state); - RETURN_IF_ERROR(local_state._status); - - /// When `_is_merging` is true means we are reading spilled data and merging the data into hash table. - if (local_state._is_merging) { + if (local_state._shared_state->is_spilled && + local_state._need_to_merge_data_for_current_partition) { + if (local_state._blocks.empty() && !local_state._current_partition_eos) { + bool has_recovering_data = false; + RETURN_IF_ERROR(local_state.recover_blocks_from_disk(state, has_recovering_data)); + *eos = !has_recovering_data; return Status::OK(); + } else if (!local_state._blocks.empty()) { + size_t merged_rows = 0; + while (!local_state._blocks.empty()) { + auto block = std::move(local_state._blocks.front()); + merged_rows += block.rows(); + local_state._blocks.erase(local_state._blocks.begin()); + RETURN_IF_ERROR(_agg_source_operator->merge_with_serialized_key_helper( + local_state._runtime_state.get(), &block)); + } + local_state._estimate_memory_usage += + _agg_source_operator->get_estimated_memory_size_for_merging( + local_state._runtime_state.get(), merged_rows); + + if (!local_state._current_partition_eos) { + return Status::OK(); + } } + + local_state._need_to_merge_data_for_current_partition = false; } // not spilled in sink or current partition still has data auto* runtime_state = local_state._runtime_state.get(); + local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once(); local_state._status = _agg_source_operator->get_block(runtime_state, block, eos); RETURN_IF_ERROR(local_state._status); if (local_state._runtime_state) { @@ -162,6 +188,9 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: if (*eos) { if (local_state._shared_state->is_spilled && !local_state._shared_state->spill_partitions.empty()) { + local_state._current_partition_eos = false; + local_state._need_to_merge_data_for_current_partition = true; + RETURN_IF_ERROR(local_state._shared_state->in_mem_shared_state->reset_hash_table()); *eos = false; } } @@ -195,53 +224,34 @@ Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) { return source_local_state->open(state); } -Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(RuntimeState* state) { - DCHECK(!_is_merging); - Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); - if (Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator != - Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() || - _shared_state->spill_partitions.empty()) { +Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, bool& has_data) { + const auto query_id = state->query_id(); + MonotonicStopWatch submit_timer; + submit_timer.start(); + + if (_shared_state->spill_partitions.empty()) { + _shared_state->close(); + has_data = false; return Status::OK(); } - _is_merging = true; - VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << _parent->node_id() - << ", task id: " << _state->task_id() << " merge spilled agg data"; - - RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); - _spill_dependency->Dependency::block(); - - auto query_id = state->query_id(); - - MonotonicStopWatch submit_timer; - submit_timer.start(); + has_data = true; auto spill_func = [this, state, query_id, submit_timer] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); MonotonicStopWatch execution_timer; execution_timer.start(); - size_t read_size = 0; Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { LOG(WARNING) << "query " << print_id(query_id) << " agg node " - << _parent->node_id() - << " merge spilled agg data error: " << _status; + << _parent->node_id() << " recover agg data error: " << _status; } _shared_state->close(); - } else { - VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << _parent->node_id() - << ", task id: " << _state->task_id() - << " merge spilled agg data finish, time used: " - << (execution_timer.elapsed_time() / (1000L * 1000 * 1000)) - << "s, read size: " << read_size << ", " - << _shared_state->spill_partitions.size() << " partitions left"; } - Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); - _is_merging = false; _spill_dependency->Dependency::set_ready(); }}; bool has_agg_data = false; - auto& parent = Base::_parent->template cast(); + size_t accumulated_blocks_size = 0; while (!state->is_cancelled() && !has_agg_data && !_shared_state->spill_partitions.empty()) { for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) { @@ -267,28 +277,23 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime if (!block.empty()) { has_agg_data = true; - read_size += block.bytes(); - _status = parent._agg_source_operator - ->merge_with_serialized_key_helper( - _runtime_state.get(), &block); - RETURN_IF_ERROR(_status); + accumulated_blocks_size += block.allocated_bytes(); + _blocks.emplace_back(std::move(block)); + + if (accumulated_blocks_size >= + vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + break; + } } } - (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); - - if (!has_agg_data) { - VLOG_DEBUG << "query " << print_id(query_id) << " agg node " - << _parent->node_id() << ", task id: " << _state->task_id() - << " merge spilled agg data finish, time used: " - << execution_timer.elapsed_time() << ", empty partition " - << read_size << ", " << _shared_state->spill_partitions.size() - << " partitions left"; + + _current_partition_eos = eos; + + if (_current_partition_eos) { + (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); + _shared_state->spill_partitions.pop_front(); } } - _shared_state->spill_partitions.pop_front(); - } - if (_shared_state->spill_partitions.empty()) { - _shared_state->close(); } return _status; }; @@ -313,6 +318,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime return Status::Error( "fault_inject partitioned_agg_source submit_func failed"); }); + _spill_dependency->block(); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( std::make_shared(state, _shared_state->shared_from_this(), exception_catch_func)); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index 05f9ff6eff0943..53e683ada20453 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -41,7 +41,7 @@ class PartitionedAggLocalState final : public PipelineXSpillLocalState> _spill_merge_promise; std::future _spill_merge_future; bool _current_partition_eos = true; - bool _is_merging = false; + bool _need_to_merge_data_for_current_partition = true; std::shared_ptr _spill_dependency; + std::vector _blocks; std::unique_ptr _internal_runtime_profile; RuntimeProfile::Counter* _get_results_timer = nullptr; @@ -75,6 +76,9 @@ class PartitionedAggLocalState final : public PipelineXSpillLocalState { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 81dccb5eca10ce..70312d866465ae 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -20,6 +20,10 @@ #include #include +#include +#include + +#include "common/status.h" #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" @@ -63,6 +67,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _spill_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", 1); _recovery_probe_blocks = ADD_COUNTER(profile(), "SpillRecoveryProbeBlocks", TUnit::UNIT); _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillRecoveryProbeTime", 1); + _memory_usage_reserved = + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::UNIT, 1); _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBlocksBytes", TUnit::BYTES, 1); @@ -294,29 +300,21 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in return Status::OK(); } -Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState* state, - uint32_t partition_index, - bool& has_data) { +Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState* state, + uint32_t partition_index, + bool& has_data) { VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index - << " recovery_build_blocks_from_disk"; + << " recover_build_blocks_from_disk"; auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { return Status::OK(); } - RETURN_IF_ERROR(spilled_stream->spill_eof()); spilled_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, _spill_read_bytes, _spill_read_wait_io_timer); - auto& mutable_block = _shared_state->partitioned_build_blocks[partition_index]; - if (!mutable_block) { - ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); - spilled_stream.reset(); - return Status::OK(); - } - std::weak_ptr shared_state_holder = _shared_state->shared_from_this(); @@ -325,8 +323,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti MonotonicStopWatch submit_timer; submit_timer.start(); - auto read_func = [this, query_id, state, spilled_stream = spilled_stream, &mutable_block, - shared_state_holder, submit_timer, partition_index] { + auto read_func = [this, query_id, state, spilled_stream = spilled_stream, shared_state_holder, + submit_timer, partition_index] { auto shared_state_sptr = shared_state_holder.lock(); if (!shared_state_sptr || state->is_cancelled()) { LOG(INFO) << "query: " << print_id(query_id) @@ -368,25 +366,31 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti break; } - if (mutable_block->empty()) { - *mutable_block = std::move(block); + if (!_recovered_build_block) { + _recovered_build_block = vectorized::MutableBlock::create_unique(std::move(block)); } else { - DCHECK_EQ(mutable_block->columns(), block.columns()); - st = mutable_block->merge(std::move(block)); + DCHECK_EQ(_recovered_build_block->columns(), block.columns()); + st = _recovered_build_block->merge(std::move(block)); if (!st.ok()) { _spill_status_ok = false; _spill_status = std::move(st); break; } } + + if (_recovered_build_block->allocated_bytes() >= + vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + break; + } } - ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); - shared_state_sptr->spilled_streams[partition_index].reset(); - const size_t rows = mutable_block ? mutable_block->rows() : 0; - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() - << ", task id: " << state->task_id() << ", partition: " << partition_index - << ", recovery build data done, rows: " << rows; + if (eos) { + ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); + shared_state_sptr->spilled_streams[partition_index].reset(); + VLOG_DEBUG << "query: " << print_id(state->query_id()) + << ", node: " << _parent->node_id() << ", task id: " << state->task_id() + << ", partition: " << partition_index; + } }; auto exception_catch_func = [read_func, query_id, this]() { @@ -433,7 +437,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti exception_catch_func); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index - << " recovery_build_blocks_from_disk submit func"; + << " recover_build_blocks_from_disk submit func"; return spill_io_pool->submit(std::move(spill_runnable)); } @@ -459,9 +463,9 @@ std::string PartitionedHashJoinProbeLocalState::debug_string(int indentation_lev return fmt::to_string(debug_string_buffer); } -Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state, - uint32_t partition_index, - bool& has_data) { +Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(RuntimeState* state, + uint32_t partition_index, + bool& has_data) { auto& spilled_stream = _probe_spilling_streams[partition_index]; has_data = false; if (!spilled_stream) { @@ -485,18 +489,24 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti st = Status::Error( "fault_inject partitioned_hash_join_probe recover_probe_blocks failed"); }); - if (st.ok()) { + + size_t read_size = 0; + while (!eos && !_state->is_cancelled() && st.ok()) { st = spilled_stream->read_next_block_sync(&block, &eos); - } - if (!st.ok()) { - _spill_status_ok = false; - _spill_status = std::move(st); - } else { - COUNTER_UPDATE(_recovery_probe_rows, block.rows()); - COUNTER_UPDATE(_recovery_probe_blocks, 1); - blocks.emplace_back(std::move(block)); - } + if (!st.ok()) { + _spill_status_ok = false; + _spill_status = std::move(st); + } else { + COUNTER_UPDATE(_recovery_probe_rows, block.rows()); + COUNTER_UPDATE(_recovery_probe_blocks, 1); + read_size += block.allocated_bytes(); + blocks.emplace_back(std::move(block)); + } + if (read_size >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + break; + } + } if (eos) { VLOG_DEBUG << "query: " << print_id(query_id) << ", recovery probe data done: " << spilled_stream->get_spill_dir(); @@ -739,15 +749,27 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, const auto partition_index = local_state._partition_cursor; auto& probe_blocks = local_state._probe_blocks[partition_index]; + + if (local_state._recovered_build_block && !local_state._recovered_build_block->empty()) { + local_state._estimate_memory_usage += local_state._recovered_build_block->allocated_bytes(); + auto& mutable_block = local_state._shared_state->partitioned_build_blocks[partition_index]; + if (!mutable_block) { + mutable_block = std::move(local_state._recovered_build_block); + } else { + RETURN_IF_ERROR(mutable_block->merge(local_state._recovered_build_block->to_block())); + local_state._recovered_build_block.reset(); + } + } + if (local_state._need_to_setup_internal_operators) { - *eos = false; bool has_data = false; - RETURN_IF_ERROR(local_state.recovery_build_blocks_from_disk( + RETURN_IF_ERROR(local_state.recover_build_blocks_from_disk( state, local_state._partition_cursor, has_data)); if (has_data) { return Status::OK(); } + *eos = false; RETURN_IF_ERROR(local_state.finish_spilling(partition_index)); RETURN_IF_ERROR(_setup_internal_operators(local_state, state)); local_state._need_to_setup_internal_operators = false; @@ -763,7 +785,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, *eos = false; bool has_data = false; RETURN_IF_ERROR( - local_state.recovery_probe_blocks_from_disk(state, partition_index, has_data)); + local_state.recover_probe_blocks_from_disk(state, partition_index, has_data)); if (!has_data) { vectorized::Block block; RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true)); @@ -916,6 +938,12 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori } }); #endif + + Defer defer([&]() { + COUNTER_SET(local_state._memory_usage_reserved, + int64_t(local_state.estimate_memory_usage())); + }); + if (need_more_input_data(state)) { RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), &local_state._child_eos)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 1368d25986f55d..8ee518e45a1dca 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -50,10 +50,10 @@ class PartitionedHashJoinProbeLocalState final Status spill_probe_blocks(RuntimeState* state, const std::shared_ptr& spill_context = nullptr); - Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, - bool& has_data); - Status recovery_probe_blocks_from_disk(RuntimeState* state, uint32_t partition_index, - bool& has_data); + Status recover_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, + bool& has_data); + Status recover_probe_blocks_from_disk(RuntimeState* state, uint32_t partition_index, + bool& has_data); Status finish_spilling(uint32_t partition_index); @@ -81,6 +81,7 @@ class PartitionedHashJoinProbeLocalState final std::atomic _spill_status_ok {true}; std::vector> _partitioned_blocks; + std::unique_ptr _recovered_build_block; std::map> _probe_blocks; std::vector _probe_spilling_streams; @@ -142,6 +143,7 @@ class PartitionedHashJoinProbeLocalState final RuntimeProfile::Counter* _probe_rows_counter = nullptr; RuntimeProfile::Counter* _join_filter_timer = nullptr; RuntimeProfile::Counter* _build_output_block_timer = nullptr; + RuntimeProfile::Counter* _memory_usage_reserved = nullptr; }; class PartitionedHashJoinProbeOperatorX final diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 136466aa6b2199..a79d03302eb7c3 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -55,6 +55,8 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionShuffleTime", 1); _spill_build_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", 1); _in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow", TUnit::UNIT, 1); + _memory_usage_reserved = + ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1); return Status::OK(); } @@ -126,6 +128,8 @@ size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta _shared_state->inner_runtime_state.get()); } } + + COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve)); return size_to_reserve; } @@ -379,6 +383,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); } }); + RETURN_IF_ERROR(_finish_spilling()); _dependency->set_ready_to_read(); if (spill_context) { @@ -388,6 +393,15 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( return Status::OK(); } +Status PartitionedHashJoinSinkLocalState::_finish_spilling() { + for (auto& stream : _shared_state->spilled_streams) { + if (stream) { + RETURN_IF_ERROR(stream->spill_eof()); + } + } + return Status::OK(); +} + Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, vectorized::Block* in_block, size_t begin, size_t end) { @@ -470,6 +484,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); } }); + _spill_status = _finish_spilling(); _dependency->set_ready_to_read(); } @@ -582,7 +597,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B << node_id() << " sink eos, set_ready_to_read" << ", task id: " << state->task_id() << ", need spill: " << need_to_spill; - if (!need_to_spill) { + if (need_to_spill) { + return revoke_memory(state, nullptr); + } else { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { RETURN_IF_ERROR(_setup_internal_operator(state)); } @@ -601,8 +618,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B << ", task id: " << state->task_id() << ", nonspill build usage: " << _inner_sink_operator->get_memory_usage( local_state._shared_state->inner_runtime_state.get()); - } else { - return revoke_memory(state, nullptr); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index a2d75cf2f9bed6..ae0d48394432b3 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -61,6 +61,8 @@ class PartitionedHashJoinSinkLocalState Status _revoke_unpartitioned_block(RuntimeState* state, const std::shared_ptr& spill_context); + Status _finish_spilling(); + friend class PartitionedHashJoinSinkOperatorX; std::atomic_int _spilling_streams_count {0}; @@ -80,6 +82,7 @@ class PartitionedHashJoinSinkLocalState RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; RuntimeProfile::Counter* _spill_build_timer = nullptr; RuntimeProfile::Counter* _in_mem_rows_counter = nullptr; + RuntimeProfile::Counter* _memory_usage_reserved = nullptr; }; class PartitionedHashJoinSinkOperatorX diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 37b44c6e6f1481..cbd0fca4e4bee6 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -383,7 +383,6 @@ Status PipelineTask::execute(bool* eos) { std::max(sink_reserve_size, _state->minimum_operator_memory_required_bytes()); reserve_size = _root->get_reserve_mem_size(_state) + sink_reserve_size; _root->reset_reserve_mem_size(_state); - DCHECK_EQ(_root->get_reserve_mem_size(_state), 0); auto workload_group = _state->get_query_ctx()->workload_group(); if (workload_group && reserve_size > 0) { diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 2e9f4b0b0ad96f..47ff31ad85f684 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -563,6 +563,19 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr query_c query_id, memory_usage, query_ctx->get_mem_limit())); } } else { + if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) { + LOG(INFO) << "query: " << query_id + << ", process limit not exceeded now, resume this query" + << ", process memory info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->set_memory_sufficient(true); + return true; + } + + LOG(INFO) << "query: " << query_id << ", process limit exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); query_ctx->cancel(doris::Status::Error( "The query({}) reserved memory failed because process limit exceeded, and " "there is no cache now. And could not find task to spill. Maybe you should set " diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index ec4b8585db3ec1..86c66985c149d1 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -37,6 +37,7 @@ #include "olap/olap_common.h" #include "runtime/define_primitive_type.h" #include "vec/common/cow.h" +#include "vec/common/custom_allocator.h" #include "vec/common/pod_array_fwd.h" #include "vec/common/string_ref.h" #include "vec/common/typeid_cast.h" @@ -336,14 +337,23 @@ class IColumn : public COW { return 0; } - virtual void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const { + void serialize_vec(std::vector& keys, size_t num_rows, + size_t max_row_byte_size) const { + serialize_vec(keys.data(), num_rows, max_row_byte_size); + } + + void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + const uint8_t* null_map) const { + serialize_vec_with_null_map(keys.data(), num_rows, null_map); + } + + virtual void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Method serialize_vec is not supported for " + get_name()); __builtin_unreachable(); } - virtual void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + virtual void serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const uint8_t* null_map) const { throw doris::Exception( ErrorCode::NOT_IMPLEMENTED_ERROR, @@ -351,15 +361,24 @@ class IColumn : public COW { __builtin_unreachable(); } + void deserialize_vec(std::vector& keys, const size_t num_rows) { + deserialize_vec(keys.data(), num_rows); + } + + void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, + const uint8_t* null_map) { + deserialize_vec_with_null_map(keys.data(), num_rows, null_map); + } + // This function deserializes group-by keys into column in the vectorized way. - virtual void deserialize_vec(std::vector& keys, const size_t num_rows) { + virtual void deserialize_vec(StringRef* keys, const size_t num_rows) { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Method deserialize_vec is not supported for " + get_name()); __builtin_unreachable(); } // Used in ColumnNullable::deserialize_vec - virtual void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, + virtual void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) { throw doris::Exception( ErrorCode::NOT_IMPLEMENTED_ERROR, diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 0d1b16161eb2e3..5498ee25a6e2f1 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -176,8 +176,7 @@ class ColumnConst final : public COWHelper { size_t get_max_row_byte_size() const override { return data->get_max_row_byte_size(); } - void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const override { + void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const override { data->serialize_vec(keys, num_rows, max_row_byte_size); } @@ -196,7 +195,7 @@ class ColumnConst final : public COWHelper { get_data_column_ptr()->update_crc_with_value(start, end, hash, nullptr); } - void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + void serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const uint8_t* null_map) const override { data->serialize_vec_with_null_map(keys, num_rows, null_map); } diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 2e5fc5e136a508..9af67dac5ec737 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -74,7 +74,7 @@ size_t ColumnDecimal::get_max_row_byte_size() const { } template -void ColumnDecimal::serialize_vec(std::vector& keys, size_t num_rows, +void ColumnDecimal::serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const { for (size_t i = 0; i < num_rows; ++i) { memcpy_fixed(const_cast(keys[i].data + keys[i].size), (char*)&data[i]); @@ -83,7 +83,7 @@ void ColumnDecimal::serialize_vec(std::vector& keys, size_t num_ro } template -void ColumnDecimal::serialize_vec_with_null_map(std::vector& keys, size_t num_rows, +void ColumnDecimal::serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const UInt8* null_map) const { DCHECK(null_map != nullptr); const bool has_null = simd::contain_byte(null_map, num_rows, 1); @@ -111,7 +111,7 @@ void ColumnDecimal::serialize_vec_with_null_map(std::vector& keys, } template -void ColumnDecimal::deserialize_vec(std::vector& keys, const size_t num_rows) { +void ColumnDecimal::deserialize_vec(StringRef* keys, const size_t num_rows) { for (size_t i = 0; i < num_rows; ++i) { keys[i].data = deserialize_and_insert_from_arena(keys[i].data); keys[i].size -= sizeof(T); @@ -119,8 +119,7 @@ void ColumnDecimal::deserialize_vec(std::vector& keys, const size_ } template -void ColumnDecimal::deserialize_vec_with_null_map(std::vector& keys, - const size_t num_rows, +void ColumnDecimal::deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) { for (size_t i = 0; i < num_rows; ++i) { if (null_map[i] == 0) { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 0927cb88e15abc..30561005e04727 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -165,15 +165,14 @@ class ColumnDecimal final : public COWHelper> { size_t get_max_row_byte_size() const override; - void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const override; + void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const override; - void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + void serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const uint8_t* null_map) const override; - void deserialize_vec(std::vector& keys, const size_t num_rows) override; + void deserialize_vec(StringRef* keys, const size_t num_rows) override; - void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, + void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) override; void update_hash_with_value(size_t n, SipHash& hash) const override; diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index dbee5a2025aa70..38ca32ce291255 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -246,13 +246,13 @@ size_t ColumnNullable::get_max_row_byte_size() const { return flag_size + get_nested_column().get_max_row_byte_size(); } -void ColumnNullable::serialize_vec(std::vector& keys, size_t num_rows, +void ColumnNullable::serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const { const auto& arr = get_null_map_data(); get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data()); } -void ColumnNullable::deserialize_vec(std::vector& keys, const size_t num_rows) { +void ColumnNullable::deserialize_vec(StringRef* keys, const size_t num_rows) { auto& arr = get_null_map_data(); const size_t old_size = arr.size(); arr.resize(old_size + num_rows); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 7772b6e80ade9b..c39036c2e3a00d 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -180,10 +180,10 @@ class ColumnNullable final : public COWHelper, public N StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; size_t get_max_row_byte_size() const override; - void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const override; - void deserialize_vec(std::vector& keys, size_t num_rows) override; + void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const override; + + void deserialize_vec(StringRef* keys, size_t num_rows) override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 96a27e44e92a2c..6e3c754fc32ea8 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -518,24 +518,23 @@ class ColumnObject final : public COWHelper { "get_max_row_byte_size" + std::string(get_family_name())); } - void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const override { + void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "serialize_vec" + std::string(get_family_name())); } - void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + void serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const uint8_t* null_map) const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "serialize_vec_with_null_map" + std::string(get_family_name())); } - void deserialize_vec(std::vector& keys, const size_t num_rows) override { + void deserialize_vec(StringRef* keys, const size_t num_rows) override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "deserialize_vec" + std::string(get_family_name())); } - void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, + void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "deserialize_vec_with_null_map" + std::string(get_family_name())); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index d8fd42e36c76a3..a6cf116b6db5bf 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -383,8 +383,7 @@ size_t ColumnStr::get_max_row_byte_size() const { } template -void ColumnStr::serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const { +void ColumnStr::serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const { for (size_t i = 0; i < num_rows; ++i) { uint32_t offset(offset_at(i)); uint32_t string_size(size_at(i)); @@ -397,7 +396,7 @@ void ColumnStr::serialize_vec(std::vector& keys, size_t num_rows, } template -void ColumnStr::serialize_vec_with_null_map(std::vector& keys, size_t num_rows, +void ColumnStr::serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const UInt8* null_map) const { DCHECK(null_map != nullptr); @@ -438,7 +437,7 @@ void ColumnStr::serialize_vec_with_null_map(std::vector& keys, siz } template -void ColumnStr::deserialize_vec(std::vector& keys, const size_t num_rows) { +void ColumnStr::deserialize_vec(StringRef* keys, const size_t num_rows) { for (size_t i = 0; i != num_rows; ++i) { auto original_ptr = keys[i].data; keys[i].data = deserialize_and_insert_from_arena(original_ptr); @@ -447,8 +446,8 @@ void ColumnStr::deserialize_vec(std::vector& keys, const size_t nu } template -void ColumnStr::deserialize_vec_with_null_map(std::vector& keys, - const size_t num_rows, const uint8_t* null_map) { +void ColumnStr::deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, + const uint8_t* null_map) { for (size_t i = 0; i != num_rows; ++i) { if (null_map[i] == 0) { auto original_ptr = keys[i].data; diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index b441b81613b184..9f3771bbe96d5a 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -387,17 +387,16 @@ class ColumnStr final : public COWHelper> { const char* deserialize_and_insert_from_arena(const char* pos) override; - void deserialize_vec(std::vector& keys, const size_t num_rows) override; + void deserialize_vec(StringRef* keys, const size_t num_rows) override; size_t get_max_row_byte_size() const override; - void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const override; + void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const override; - void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + void serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const uint8_t* null_map) const override; - void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, + void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) override; void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 300152b57843d3..4382fa62851573 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -63,7 +63,7 @@ size_t ColumnVector::get_max_row_byte_size() const { } template -void ColumnVector::serialize_vec(std::vector& keys, size_t num_rows, +void ColumnVector::serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const { for (size_t i = 0; i < num_rows; ++i) { memcpy_fixed(const_cast(keys[i].data + keys[i].size), (char*)&data[i]); @@ -72,7 +72,7 @@ void ColumnVector::serialize_vec(std::vector& keys, size_t num_row } template -void ColumnVector::serialize_vec_with_null_map(std::vector& keys, size_t num_rows, +void ColumnVector::serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const UInt8* null_map) const { DCHECK(null_map != nullptr); @@ -102,7 +102,7 @@ void ColumnVector::serialize_vec_with_null_map(std::vector& keys, } template -void ColumnVector::deserialize_vec(std::vector& keys, const size_t num_rows) { +void ColumnVector::deserialize_vec(StringRef* keys, const size_t num_rows) { for (size_t i = 0; i != num_rows; ++i) { keys[i].data = deserialize_and_insert_from_arena(keys[i].data); keys[i].size -= sizeof(T); @@ -110,8 +110,7 @@ void ColumnVector::deserialize_vec(std::vector& keys, const size_t } template -void ColumnVector::deserialize_vec_with_null_map(std::vector& keys, - const size_t num_rows, +void ColumnVector::deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) { for (size_t i = 0; i < num_rows; ++i) { if (null_map[i] == 0) { diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 015595797b932f..a6e329b95221ee 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -247,17 +247,16 @@ class ColumnVector final : public COWHelper> { const char* deserialize_and_insert_from_arena(const char* pos) override; - void deserialize_vec(std::vector& keys, const size_t num_rows) override; + void deserialize_vec(StringRef* keys, const size_t num_rows) override; - void deserialize_vec_with_null_map(std::vector& keys, const size_t num_rows, + void deserialize_vec_with_null_map(StringRef* keys, const size_t num_rows, const uint8_t* null_map) override; size_t get_max_row_byte_size() const override; - void serialize_vec(std::vector& keys, size_t num_rows, - size_t max_row_byte_size) const override; + void serialize_vec(StringRef* keys, size_t num_rows, size_t max_row_byte_size) const override; - void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, + void serialize_vec_with_null_map(StringRef* keys, size_t num_rows, const uint8_t* null_map) const override; void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, diff --git a/be/src/vec/common/custom_allocator.h b/be/src/vec/common/custom_allocator.h index eee800a059d329..6361a60689c0b3 100644 --- a/be/src/vec/common/custom_allocator.h +++ b/be/src/vec/common/custom_allocator.h @@ -20,7 +20,7 @@ #include "vec/common/allocator.h" #include "vec/common/allocator_fwd.h" -template > +template > class CustomStdAllocator; template diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 0df0c8997f0f61..594f9b54a69a98 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -56,10 +56,10 @@ struct MethodBaseInner { bool inited_iterator = false; Key* keys = nullptr; Arena arena; - std::vector hash_values; + DorisVector hash_values; // use in join case - std::vector bucket_nums; + DorisVector bucket_nums; MethodBaseInner() { hash_table.reset(new HashMap()); } virtual ~MethodBaseInner() = default; @@ -198,10 +198,10 @@ struct MethodSerialized : public MethodBase { using State = ColumnsHashing::HashMethodSerialized; using Base::try_presis_key; // need keep until the hash probe end. - std::vector build_stored_keys; + DorisVector build_stored_keys; Arena build_arena; // refresh each time probe - std::vector stored_keys; + DorisVector stored_keys; StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, Arena& pool) { @@ -216,7 +216,7 @@ struct MethodSerialized : public MethodBase { } void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, - std::vector& input_keys, Arena& input_arena) { + DorisVector& input_keys, Arena& input_arena) { input_arena.clear(); input_keys.resize(num_rows); @@ -243,14 +243,15 @@ struct MethodSerialized : public MethodBase { } for (const auto& column : key_columns) { - column->serialize_vec(input_keys, num_rows, max_one_row_byte_size); + column->serialize_vec(input_keys.data(), num_rows, max_one_row_byte_size); } } Base::keys = input_keys.data(); } size_t serialized_keys_size(bool is_build) const override { - return is_build ? build_arena.size() : Base::arena.size(); + return is_build ? (build_stored_keys.size() * sizeof(StringRef)) + : (stored_keys.size() * sizeof(StringRef)); } void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, @@ -286,9 +287,9 @@ struct MethodStringNoCache : public MethodBase { ColumnsHashing::HashMethodString; // need keep until the hash probe end. - std::vector _build_stored_keys; + DorisVector _build_stored_keys; // refresh each time probe - std::vector _stored_keys; + DorisVector _stored_keys; size_t serialized_keys_size(bool is_build) const override { return is_build ? (_build_stored_keys.size() * sizeof(StringRef)) @@ -296,13 +297,13 @@ struct MethodStringNoCache : public MethodBase { } void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, - std::vector& stored_keys) { + DorisVector& stored_keys) { const IColumn& column = *key_columns[0]; const auto& nested_column = column.is_nullable() ? assert_cast(column).get_nested_column() : column; - auto serialized_str = [](const auto& column_string, std::vector& stored_keys) { + auto serialized_str = [](const auto& column_string, DorisVector& stored_keys) { const auto& offsets = column_string.get_offsets(); const auto* chars = column_string.get_chars().data(); stored_keys.resize(column_string.size()); @@ -388,16 +389,16 @@ struct MethodKeysFixed : public MethodBase { has_nullable_keys>; // need keep until the hash probe end. use only in join - std::vector build_stored_keys; + DorisVector build_stored_keys; // refresh each time probe hash table - std::vector stored_keys; + DorisVector stored_keys; Sizes key_sizes; MethodKeysFixed(Sizes key_sizes_) : key_sizes(std::move(key_sizes_)) {} template void pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns, - const ColumnRawPtrs& nullmap_columns, std::vector& result) { + const ColumnRawPtrs& nullmap_columns, DorisVector& result) { size_t bitmap_size = get_bitmap_size(nullmap_columns.size()); // set size to 0 at first, then use resize to call default constructor on index included from [0, row_numbers) to reset all memory result.clear(); diff --git a/be/src/vec/common/hash_table/join_hash_table.h b/be/src/vec/common/hash_table/join_hash_table.h index 317987541cdbe1..d95512e275c452 100644 --- a/be/src/vec/common/hash_table/join_hash_table.h +++ b/be/src/vec/common/hash_table/join_hash_table.h @@ -22,6 +22,7 @@ #include #include "vec/columns/column_filter_helper.h" +#include "vec/common/custom_allocator.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_allocator.h" @@ -69,7 +70,7 @@ class JoinHashTable { size_t size() const { return next.size(); } - std::vector& get_visited() { return visited; } + DorisVector& get_visited() { return visited; } template void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, @@ -255,7 +256,7 @@ class JoinHashTable { bool has_null_key() { return _has_null_key; } - void pre_build_idxs(std::vector& buckets, const uint8_t* null_map) const { + void pre_build_idxs(DorisVector& buckets, const uint8_t* null_map) const { if (null_map) { for (unsigned int& bucket : buckets) { bucket = bucket == bucket_size ? bucket_size : first[bucket]; @@ -454,13 +455,13 @@ class JoinHashTable { } const Key* __restrict build_keys; - std::vector visited; + DorisVector visited; uint32_t bucket_size = 1; int max_batch_size = 4064; - std::vector first = {0}; - std::vector next = {0}; + DorisVector first = {0}; + DorisVector next = {0}; // use in iter hash map mutable uint32_t iter_idx = 1;