From 517c12478f21a0388afc716ae61ea1ebbcc6a9c6 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Fri, 29 Mar 2024 16:56:57 +0800 Subject: [PATCH] [improvement](spill) spill trigger improvement (#32641) --- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/common/daemon.cpp | 13 ++ be/src/common/daemon.h | 1 + .../partitioned_aggregation_sink_operator.cpp | 3 + ...artitioned_aggregation_source_operator.cpp | 2 + .../partitioned_hash_join_probe_operator.cpp | 50 ++++--- .../partitioned_hash_join_sink_operator.cpp | 28 +++- .../partitioned_hash_join_sink_operator.h | 1 + .../exec/spill_sort_sink_operator.cpp | 8 ++ .../exec/spill_sort_source_operator.cpp | 6 + be/src/pipeline/pipeline_x/operator.h | 36 +++++ .../pipeline/pipeline_x/pipeline_x_task.cpp | 73 +++++++---- be/src/pipeline/pipeline_x/pipeline_x_task.h | 2 + be/src/runtime/fragment_mgr.cpp | 1 + be/src/runtime/query_context.cpp | 1 - be/src/runtime/query_context.h | 28 ++++ .../runtime/workload_group/workload_group.cpp | 4 + .../runtime/workload_group/workload_group.h | 38 +++++- .../workload_group/workload_group_manager.cpp | 123 ++++++++++++++++++ .../workload_group/workload_group_manager.h | 2 + be/src/vec/spill/spill_stream.h | 2 + be/src/vec/spill/spill_writer.cpp | 23 ++-- be/src/vec/spill/spill_writer.h | 5 +- 24 files changed, 382 insertions(+), 74 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9364303a3ea716..f3bde7dd36b70b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -565,6 +565,9 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000"); // Sleep time in milliseconds between memtbale flush mgr refresh iterations DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5"); +// Sleep time in milliseconds between refresh iterations of workload group memory statistics +DEFINE_mInt64(wg_mem_refresh_interval_ms, "50"); + // percent of (active memtables size / all memtables size) when reach hard limit DEFINE_mInt32(memtable_hard_limit_active_percent, "50"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 01d8a123a45b47..a0845d043de93b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -616,6 +616,9 @@ DECLARE_mInt32(memory_gc_sleep_time_ms); // Sleep time in milliseconds between memtbale flush mgr memory refresh iterations DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms); +// Sleep time in milliseconds between refresh iterations of workload group memory statistics +DECLARE_mInt64(wg_mem_refresh_interval_ms); + // percent of (active memtables size / all memtables size) when reach hard limit DECLARE_mInt32(memtable_hard_limit_active_percent); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 4735c41c724f44..582319befb287f 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -369,6 +369,14 @@ void Daemon::je_purge_dirty_pages_thread() const { } while (true); } +void Daemon::wg_mem_used_refresh_thread() { + // Refresh memory usage and limit of workload groups + while (!_stop_background_threads_latch.wait_for( + std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) { + doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info(); + } +} + void Daemon::start() { Status st; st = Thread::create( @@ -402,6 +410,11 @@ void Daemon::start() { "Daemon", "query_runtime_statistics_thread", [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; + + st = Thread::create( + "Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); }, + &_threads.emplace_back()); + CHECK(st.ok()) << st; } void Daemon::stop() { diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 5d54ba5b49d02a..28f630678969d6 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -44,6 +44,7 @@ class Daemon { void calculate_metrics_thread(); void je_purge_dirty_pages_thread() const; void report_runtime_query_statistics_thread(); + void wg_mem_used_refresh_thread(); CountDownLatch _stop_background_threads_latch; std::vector> _threads; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 3207c109589c1d..074565b4027c84 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -70,6 +70,7 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat if (Base::_closed) { return Status::OK(); } + dec_running_big_mem_op_num(state); { std::unique_lock lk(_spill_lock); if (_is_spilling) { @@ -159,6 +160,7 @@ Status PartitionedAggSinkOperatorX::close(RuntimeState* state) { Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); + local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status); @@ -174,6 +176,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: RETURN_IF_ERROR(partition->finish_current_spilling(eos)); } local_state._dependency->set_ready_to_read(); + local_state._finish_dependency->set_ready(); } } if (local_state._runtime_state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 960decdb9515ce..b6620458c06201 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -88,6 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + dec_running_big_mem_op_num(state); { std::unique_lock lk(_merge_spill_lock); if (_is_merging) { @@ -128,6 +129,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index c0f5e3b65b8ce1..8f8598202525dc 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -17,6 +17,7 @@ #include "partitioned_hash_join_probe_operator.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "util/mem_info.h" #include "vec/spill/spill_stream_manager.h" @@ -148,6 +149,10 @@ Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) { return _partitioner->open(state); } Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + dec_running_big_mem_op_num(state); RETURN_IF_ERROR(JoinProbeLocalState::close(state)); return Status::OK(); } @@ -156,7 +161,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state uint32_t partition_index) { auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks; auto& mutable_block = partitioned_build_blocks[partition_index]; - if (!mutable_block || mutable_block->rows() < state->batch_size()) { + if (!mutable_block || + mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { --_spilling_task_count; return Status::OK(); } @@ -201,6 +207,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state --_spilling_task_count; if (_spilling_task_count == 0) { + LOG(INFO) << "hash probe " << _parent->id() + << " revoke memory spill_build_block finish"; std::unique_lock lock(_spill_lock); _dependency->set_ready(); } @@ -225,7 +233,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat auto& blocks = _probe_blocks[partition_index]; auto& partitioned_block = _partitioned_blocks[partition_index]; - if (partitioned_block && partitioned_block->rows() >= state->batch_size()) { + if (partitioned_block && partitioned_block->allocated_bytes() >= + vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { blocks.emplace_back(partitioned_block->to_block()); partitioned_block.reset(); } @@ -263,6 +272,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat --_spilling_task_count; if (_spilling_task_count == 0) { + LOG(INFO) << "hash probe " << _parent->id() + << " revoke memory spill_probe_blocks finish"; std::unique_lock lock(_spill_lock); _dependency->set_ready(); } @@ -304,8 +315,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { - LOG(INFO) << "no data need to recovery for partition: " << partition_index - << ", node id: " << _parent->id() << ", task id: " << state->task_id(); return Status::OK(); } @@ -492,6 +501,7 @@ Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) { Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block, bool eos) const { auto& local_state = get_local_state(state); + local_state.inc_running_big_mem_op_num(state); const auto rows = input_block->rows(); auto& partitioned_blocks = local_state._partitioned_blocks; if (rows == 0) { @@ -694,8 +704,11 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state auto& probe_blocks = local_state._probe_blocks; for (uint32_t i = spilling_start; i < _partition_count; ++i) { auto& build_block = partitioned_build_blocks[i]; - if (build_block && build_block->rows() >= state->batch_size()) { - mem_size += build_block->allocated_bytes(); + if (build_block) { + auto block_bytes = build_block->allocated_bytes(); + if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + mem_size += build_block->allocated_bytes(); + } } for (auto& block : probe_blocks[i]) { @@ -703,8 +716,11 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state } auto& partitioned_block = local_state._partitioned_blocks[i]; - if (partitioned_block && partitioned_block->rows() >= state->batch_size()) { - mem_size += partitioned_block->allocated_bytes(); + if (partitioned_block) { + auto block_bytes = partitioned_block->allocated_bytes(); + if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + mem_size += block_bytes; + } } } return mem_size; @@ -722,6 +738,8 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo return Status::OK(); } + LOG(INFO) << "hash probe " << id() + << " revoke memory, spill task count: " << local_state._spilling_task_count; for (uint32_t i = spilling_start; i < _partition_count; ++i) { RETURN_IF_ERROR(local_state.spill_build_block(state, i)); RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i)); @@ -739,22 +757,14 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const { auto& local_state = get_local_state(state); - + const auto revocable_size = revocable_mem_size(state); + if (PipelineXTask::should_revoke_memory(state, revocable_size)) { + return true; + } if (local_state._shared_state->need_to_spill) { - const auto revocable_size = revocable_mem_size(state); const auto min_revocable_size = state->min_revocable_mem(); return revocable_size > min_revocable_size; } - - auto sys_mem_available = MemInfo::sys_mem_available(); - auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark(); - - if (sys_mem_available < - sys_mem_warning_water_mark * config::spill_mem_warning_water_mark_multiplier) { - const auto revocable_size = revocable_mem_size(state); - const auto min_revocable_size = state->min_revocable_mem(); - return min_revocable_size > 0 && revocable_size > min_revocable_size; - } return false; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a63f22c1329289..dd119ade14b1d6 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -48,15 +48,27 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(PipelineXSinkLocalState::open(state)); return _partitioner->open(state); } +Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter()); + SCOPED_TIMER(PipelineXSinkLocalState::_close_timer); + if (PipelineXSinkLocalState::_closed) { + return Status::OK(); + } + dec_running_big_mem_op_num(state); + return PipelineXSinkLocalState::close(state, exec_status); +} Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { + LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory" + << ", eos: " << _child_eos; DCHECK_EQ(_spilling_streams_count, 0); _spilling_streams_count = _shared_state->partitioned_build_blocks.size(); for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; auto& mutable_block = _shared_state->partitioned_build_blocks[i]; - if (!mutable_block || mutable_block->rows() < state->batch_size()) { + if (!mutable_block || + mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { --_spilling_streams_count; continue; } @@ -99,7 +111,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { if (_spilling_streams_count > 0) { _dependency->block(); } else if (_child_eos) { - LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id() + LOG(INFO) << "hash join sink " << _parent->id() << " set_ready_to_read" << ", task id: " << state->task_id(); _dependency->set_ready_to_read(); } @@ -129,7 +141,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( std::unique_lock lock(_spill_lock); _dependency->set_ready(); if (_child_eos) { - LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id() + LOG(INFO) << "hash join sink " << _parent->id() << " set_ready_to_read" << ", task id: " << state()->task_id(); _dependency->set_ready_to_read(); } @@ -176,6 +188,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) { Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); + local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); if (!local_state._spill_status_ok) { DCHECK_NE(local_state._spill_status.code(), 0); @@ -227,7 +240,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B } if (eos) { - LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id() + LOG(INFO) << "hash join sink " << id() << " sink eos, set_ready_to_read" << ", task id: " << state->task_id(); local_state._dependency->set_ready_to_read(); } @@ -243,8 +256,11 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state) size_t mem_size = 0; for (uint32_t i = 0; i != _partition_count; ++i) { auto& block = partitioned_blocks[i]; - if (block && block->rows() >= state->batch_size()) { - mem_size += block->allocated_bytes(); + if (block) { + auto block_bytes = block->allocated_bytes(); + if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + mem_size += block_bytes; + } } } return mem_size; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 44081bb0caae07..4d25acd1b20521 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -46,6 +46,7 @@ class PartitionedHashJoinSinkLocalState ~PartitionedHashJoinSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; Status revoke_memory(RuntimeState* state); protected: diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index c586a8e5e56012..0ddc6daa79c37b 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -80,6 +80,10 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_statu _spill_cv.wait(lk); } } + auto& parent = Base::_parent->template cast(); + if (parent._enable_spill) { + dec_running_big_mem_op_num(state); + } return Status::OK(); } Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { @@ -160,6 +164,9 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); + if (_enable_spill) { + local_state.inc_running_big_mem_op_num(state); + } SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); @@ -176,6 +183,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc RETURN_IF_ERROR(revoke_memory(state)); } else { local_state._dependency->set_ready_to_read(); + local_state._finish_dependency->set_ready(); } } else { RETURN_IF_ERROR( diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index d249b3be56e23c..56c20c853beba9 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -64,6 +64,9 @@ Status SpillSortLocalState::close(RuntimeState* state) { _merge_spill_cv.wait(lk); } } + if (Base::_shared_state->enable_spill) { + dec_running_big_mem_op_num(state); + } RETURN_IF_ERROR(Base::close(state)); for (auto& stream : _current_merging_streams) { (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); @@ -249,6 +252,9 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) { Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + if (local_state.Base::_shared_state->enable_spill) { + local_state.inc_running_big_mem_op_num(state); + } SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 94e3df7c94777a..4ca8022d163a90 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -413,9 +413,27 @@ class PipelineXLocalState : public PipelineXLocalStateBase { return _dependency ? std::vector {_dependency} : std::vector {}; } + void inc_running_big_mem_op_num(RuntimeState* state) { + if (!_big_mem_op_num_added) { + state->get_query_ctx()->inc_running_big_mem_op_num(); + _big_mem_op_num_added = true; + } + } + + void dec_running_big_mem_op_num(RuntimeState* state) { + if (_big_mem_op_num_added && !_big_mem_op_num_deced) { + state->get_query_ctx()->dec_running_big_mem_op_num(); + _big_mem_op_num_deced = true; + } + } + protected: Dependency* _dependency = nullptr; SharedStateArg* _shared_state = nullptr; + +private: + bool _big_mem_op_num_added = false; + bool _big_mem_op_num_deced = false; }; template @@ -708,9 +726,27 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase { return _dependency ? std::vector {_dependency} : std::vector {}; } + void inc_running_big_mem_op_num(RuntimeState* state) { + if (!_big_mem_op_num_added) { + state->get_query_ctx()->inc_running_big_mem_op_num(); + _big_mem_op_num_added = true; + } + } + + void dec_running_big_mem_op_num(RuntimeState* state) { + if (_big_mem_op_num_added && !_big_mem_op_num_deced) { + state->get_query_ctx()->dec_running_big_mem_op_num(); + _big_mem_op_num_deced = true; + } + } + protected: Dependency* _dependency = nullptr; SharedStateType* _shared_state = nullptr; + +private: + bool _big_mem_op_num_added = false; + bool _big_mem_op_num_deced = false; }; template diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 94cd106bbce126..9222c482381abb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -263,32 +263,10 @@ Status PipelineXTask::execute(bool* eos) { _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); - auto sys_mem_available = doris::MemInfo::sys_mem_available(); - auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark(); - auto query_mem = query_context()->query_mem_tracker->consumption(); auto sink_revocable_mem_size = _sink->revocable_mem_size(_state); - if (sink_revocable_mem_size > 0) { - VLOG_ROW << "sys mem available: " - << PrettyPrinter::print(sys_mem_available, TUnit::BYTES) - << ",\nsys_mem_available_warning_water_mark: " - << PrettyPrinter::print(sys_mem_warning_water_mark, TUnit::BYTES) - << ",\nquery mem limit: " - << PrettyPrinter::print(_state->query_mem_limit(), TUnit::BYTES) - << ",\nquery mem: " << PrettyPrinter::print(query_mem, TUnit::BYTES) - << ",\nmin revocable mem: " - << PrettyPrinter::print(_state->min_revocable_mem(), TUnit::BYTES) - << ",\nrevocable mem: " - << PrettyPrinter::print( - static_cast(_sink->revocable_mem_size(_state)), - TUnit::BYTES); - } - if (sys_mem_available < sys_mem_warning_water_mark * config::spill_mem_warning_water_mark_multiplier /*&& - (double)query_mem >= (double)_state->query_mem_limit() * 0.8*/) { - if (_state->min_revocable_mem() > 0 && - sink_revocable_mem_size >= _state->min_revocable_mem()) { - RETURN_IF_ERROR(_sink->revoke_memory(_state)); - continue; - } + if (should_revoke_memory(_state, sink_revocable_mem_size)) { + RETURN_IF_ERROR(_sink->revoke_memory(_state)); + continue; } // Pull block from operator chain @@ -321,6 +299,50 @@ Status PipelineXTask::execute(bool* eos) { return status; } +bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes) { + auto* query_ctx = state->get_query_ctx(); + auto wg = query_ctx->workload_group(); + if (!wg) { + LOG_ONCE(INFO) << "no workload group for query " << print_id(state->query_id()); + return false; + } + bool is_wg_mem_low_water_mark = false; + bool is_wg_mem_high_water_mark = false; + wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark); + if (is_wg_mem_high_water_mark) { + if (revocable_mem_bytes > 0) { + LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark"; + return true; + } + return false; + } else if (is_wg_mem_low_water_mark) { + int64_t query_weighted_limit = 0; + int64_t query_weighted_consumption = 0; + query_ctx->get_weighted_mem_info(query_weighted_limit, query_weighted_consumption); + if (query_weighted_consumption < query_weighted_limit) { + return false; + } + auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num(); + DCHECK(big_memory_operator_num >= 0); + int64_t mem_limit_of_op; + if (0 == big_memory_operator_num) { + mem_limit_of_op = (double)query_weighted_limit * 0.8; + } else { + mem_limit_of_op = query_weighted_limit / big_memory_operator_num; + } + + const auto min_revocable_mem_bytes = state->min_revocable_mem(); + LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, revocable_mem_bytes: " + << PrettyPrinter::print_bytes(revocable_mem_bytes) + << ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op) + << ", min_revocable_mem_bytes: " + << PrettyPrinter::print_bytes(min_revocable_mem_bytes); + return (revocable_mem_bytes > mem_limit_of_op || + revocable_mem_bytes > min_revocable_mem_bytes); + } else { + return false; + } +} void PipelineXTask::finalize() { PipelineTask::finalize(); std::unique_lock lc(_release_lock); @@ -417,5 +439,4 @@ void PipelineXTask::wake_up() { // call by dependency static_cast(get_task_queue()->push_back(this)); } - } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index a89df75fc9b271..1f3dd9c3b71f5f 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -154,6 +154,8 @@ class PipelineXTask : public PipelineTask { return false; } + static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); + private: friend class RuntimeFilterDependency; Dependency* _write_blocked_dependency() { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0ec721c125571b..7e15477a641d27 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -651,6 +651,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo WorkloadGroupPtr workload_group_ptr = _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), tg_id); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 76f262df0afb79..10f5425574199b 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -297,7 +297,6 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { _workload_group = tg; // Should add query first, then the workload group will not be deleted. // see task_group_manager::delete_workload_group_by_ids - RETURN_IF_ERROR(_workload_group->add_query(_query_id)); _workload_group->add_mem_tracker_limiter(query_mem_tracker); _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, &_non_pipe_thread_pool, &_remote_scan_task_scheduler); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index b4120b6942e378..c78886997d062d 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -247,6 +247,29 @@ class QueryContext { bool is_nereids() const { return _is_nereids; } + WorkloadGroupPtr workload_group() const { return _workload_group; } + + void inc_running_big_mem_op_num() { + _running_big_mem_op_num.fetch_add(1, std::memory_order_relaxed); + } + void dec_running_big_mem_op_num() { + _running_big_mem_op_num.fetch_sub(1, std::memory_order_relaxed); + } + int32_t get_running_big_mem_op_num() { + return _running_big_mem_op_num.load(std::memory_order_relaxed); + } + + void set_weighted_mem(int64_t weighted_limit, int64_t weighted_consumption) { + std::lock_guard l(_weighted_mem_lock); + _weighted_consumption = weighted_consumption; + _weighted_limit = weighted_limit; + } + void get_weighted_mem_info(int64_t& weighted_limit, int64_t& weighted_consumption) { + std::lock_guard l(_weighted_mem_lock); + weighted_limit = _weighted_limit; + weighted_consumption = _weighted_consumption; + } + DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; std::string user; @@ -280,6 +303,7 @@ class QueryContext { int64_t _bytes_limit = 0; bool _is_pipeline = false; bool _is_nereids = false; + std::atomic _running_big_mem_op_num = 0; // A token used to submit olap scanner to the "_limited_scan_thread_pool", // This thread pool token is created from "_limited_scan_thread_pool" from exec env. @@ -323,6 +347,10 @@ class QueryContext { std::map> _fragment_id_to_pipeline_ctx; std::mutex _pipeline_map_write_lock; + + std::mutex _weighted_mem_lock; + int64_t _weighted_consumption = 0; + int64_t _weighted_limit = 0; }; } // namespace doris diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 36d2a1be310423..1b0430d64fb4d6 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -118,6 +118,10 @@ int64_t WorkloadGroup::memory_used() { return used_memory; } +void WorkloadGroup::set_weighted_memory_used(int64_t wg_total_mem_used, double ratio) { + _weighted_mem_used.store(wg_total_mem_used * ratio, std::memory_order_relaxed); +} + void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 4df33b349eb2d7..49bcd841a0ffee 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -39,6 +39,7 @@ class RuntimeProfile; class ThreadPool; class ExecEnv; class CgroupCpuCtl; +class QueryContext; namespace vectorized { class SimplifiedScanScheduler; @@ -78,6 +79,25 @@ class WorkloadGroup : public std::enable_shared_from_this { int64_t memory_used(); + int spill_threshold_low_water_mark() const { + return _spill_low_watermark.load(std::memory_order_relaxed); + } + int spill_threashold_high_water_mark() const { + return _spill_high_watermark.load(std::memory_order_relaxed); + } + + void set_weighted_memory_used(int64_t wg_total_mem_used, double ratio); + + void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const { + auto weighted_mem_used = _weighted_mem_used.load(std::memory_order_relaxed); + *is_low_wartermark = + (weighted_mem_used > ((double)_memory_limit * + _spill_low_watermark.load(std::memory_order_relaxed) / 100)); + *is_high_wartermark = + (weighted_mem_used > ((double)_memory_limit * + _spill_high_watermark.load(std::memory_order_relaxed) / 100)); + } + std::string debug_string() const; void check_and_update(const WorkloadGroupInfo& tg_info); @@ -93,7 +113,7 @@ class WorkloadGroup : public std::enable_shared_from_this { return _memory_limit > 0; } - Status add_query(TUniqueId query_id) { + Status add_query(TUniqueId query_id, std::shared_ptr query_ctx) { std::unique_lock wlock(_mutex); if (_is_shutdown) { // If the workload group is set shutdown, then should not run any more, @@ -102,13 +122,13 @@ class WorkloadGroup : public std::enable_shared_from_this { "Failed add query to workload group, the workload group is shutdown. host: {}", BackendOptions::get_localhost()); } - _query_id_set.insert(query_id); + _query_ctxs.insert({query_id, query_ctx}); return Status::OK(); } void remove_query(TUniqueId query_id) { std::unique_lock wlock(_mutex); - _query_id_set.erase(query_id); + _query_ctxs.erase(query_id); } void shutdown() { @@ -118,7 +138,7 @@ class WorkloadGroup : public std::enable_shared_from_this { int query_num() { std::shared_lock r_lock(_mutex); - return _query_id_set.size(); + return _query_ctxs.size(); } int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile); @@ -132,12 +152,18 @@ class WorkloadGroup : public std::enable_shared_from_this { void try_stop_schedulers(); + std::unordered_map> queries() { + std::shared_lock r_lock(_mutex); + return _query_ctxs; + } + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; int64_t _version; - int64_t _memory_limit; // bytes + int64_t _memory_limit; // bytes + std::atomic_int64_t _weighted_mem_used = 0; // bytes bool _enable_memory_overcommit; std::atomic _cpu_share; std::vector _mem_tracker_limiter_pool; @@ -152,7 +178,7 @@ class WorkloadGroup : public std::enable_shared_from_this { // new query can not submit // waiting running query to be cancelled or finish bool _is_shutdown = false; - std::unordered_set _query_id_set; + std::unordered_map> _query_ctxs; std::shared_mutex _task_sched_lock; std::unique_ptr _cgroup_cpu_ctl = nullptr; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 7ec08387543657..027cfb2b2dd02a 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -19,10 +19,12 @@ #include #include +#include #include "pipeline/task_scheduler.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" +#include "util/mem_info.h" #include "util/threadpool.h" #include "util/time.h" #include "vec/exec/scan/scanner_scheduler.h" @@ -135,6 +137,127 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i << "ms, deleted group size:" << deleted_task_groups.size(); } +struct WorkloadGroupMemInfo { + int64_t total_mem_used = 0; + int64_t weighted_mem_used = 0; + bool is_low_wartermark = false; + bool is_high_wartermark = false; + double mem_used_ratio = 0; +}; +void WorkloadGroupMgr::refresh_wg_memory_info() { + std::shared_lock r_lock(_group_mutex); + // workload group id -> workload group queries + std::unordered_map>> + all_wg_queries; + for (auto& [wg_id, wg] : _workload_groups) { + all_wg_queries.insert({wg_id, wg->queries()}); + } + + int64_t all_queries_mem_used = 0; + + // calculate total memory used of each workload group and total memory used of all queries + std::unordered_map wgs_mem_info; + for (auto& [wg_id, wg_queries] : all_wg_queries) { + int64_t wg_total_mem_used = 0; + for (const auto& [query_id, query_ctx_ptr] : wg_queries) { + if (auto query_ctx = query_ctx_ptr.lock()) { + wg_total_mem_used += query_ctx->query_mem_tracker->consumption(); + } + } + all_queries_mem_used += wg_total_mem_used; + wgs_mem_info[wg_id] = {wg_total_mem_used}; + } + + auto proc_vm_rss = PerfCounters::get_vm_rss(); + if (all_queries_mem_used <= 0) { + return; + } + + auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache(); + auto sys_mem_available = doris::MemInfo::sys_mem_available(); + if (proc_vm_rss < all_queries_mem_used) { + all_queries_mem_used = proc_vm_rss; + } + + // process memory used is actually bigger than all_queries_mem_used, + // because memory of page cache, allocator cache, segment cache etc. are included + // in process_mem_used. + // we count these cache memories equally on workload groups. + double ratio = (double)proc_vm_rss / (double)all_queries_mem_used; + if (ratio >= 1.25) { + std::string debug_msg = fmt::format( + "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: " + "{}, all quries mem: {}", + PrettyPrinter::print(proc_vm_rss, TUnit::BYTES), + PrettyPrinter::print(process_mem_used, TUnit::BYTES), + PrettyPrinter::print(sys_mem_available, TUnit::BYTES), + PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); + LOG_EVERY_N(INFO, 10) << debug_msg; + } + + for (auto& wg : _workload_groups) { + auto wg_mem_limit = wg.second->memory_limit(); + auto& wg_mem_info = wgs_mem_info[wg.first]; + wg_mem_info.weighted_mem_used = wg_mem_info.total_mem_used * ratio; + wg_mem_info.mem_used_ratio = (double)wg_mem_info.weighted_mem_used / wg_mem_limit; + + wg.second->set_weighted_memory_used(wg_mem_info.total_mem_used, ratio); + + auto spill_low_water_mark = wg.second->spill_threshold_low_water_mark(); + auto spill_high_water_mark = wg.second->spill_threashold_high_water_mark(); + wg_mem_info.is_high_wartermark = (wg_mem_info.weighted_mem_used > + ((double)wg_mem_limit * spill_high_water_mark / 100)); + wg_mem_info.is_low_wartermark = (wg_mem_info.weighted_mem_used > + ((double)wg_mem_limit * spill_low_water_mark / 100)); + + // calculate query weighted memory limit of task group + const auto& wg_queries = all_wg_queries[wg.first]; + auto wg_query_count = wg_queries.size(); + int64_t query_weighted_mem_limit = + wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit; + + std::string debug_msg; + if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { + debug_msg = fmt::format( + "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used " + "ratio: {}, query " + "count: {}, query_weighted_mem_limit: {}", + wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES), + PrettyPrinter::print(wg_mem_info.total_mem_used, TUnit::BYTES), + PrettyPrinter::print(wg_mem_info.weighted_mem_used, TUnit::BYTES), + wg_mem_info.mem_used_ratio, wg_query_count, + PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES)); + + debug_msg += "\n Query Memory Summary:"; + } + // check where queries need to revoke memory for task group + for (const auto& query : wg_queries) { + auto query_ctx = query.second.lock(); + if (!query_ctx) { + continue; + } + auto query_consumption = query_ctx->query_mem_tracker->consumption(); + int64_t query_weighted_consumption = query_consumption * ratio; + query_ctx->set_weighted_mem(query_weighted_mem_limit, query_weighted_consumption); + + if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { + debug_msg += fmt::format( + "\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, " + "Peak={}", + query_ctx->query_mem_tracker->label(), + query_ctx->query_mem_tracker->parent_label(), + PrettyPrinter::print(query_consumption, TUnit::BYTES), + PrettyPrinter::print(query_weighted_consumption, TUnit::BYTES), + PrettyPrinter::print(query_ctx->query_mem_tracker->peak_consumption(), + TUnit::BYTES)); + } + } + if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { + LOG_EVERY_N(INFO, 10) << debug_msg; + } + } +} + void WorkloadGroupMgr::stop() { for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 1f680eb17c848b..8aeb8f988a30df 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -54,6 +54,8 @@ class WorkloadGroupMgr { bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } + void refresh_wg_memory_info(); + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups; diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 9f328240d759fe..4d53b43971256d 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -34,6 +34,8 @@ class SpillDataDir; class SpillStream { public: + // to avoid too many small file writes + static constexpr int MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024; SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir, std::string spill_dir, size_t batch_rows, size_t batch_bytes, RuntimeProfile* profile); diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index d51dcf8f1ec816..6c25c40e8585a5 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -18,6 +18,7 @@ #include "vec/spill/spill_writer.h" #include "agent/be_exec_version_manager.h" +#include "common/status.h" #include "io/file_factory.h" #include "io/fs/local_file_system.h" #include "io/fs/local_file_writer.h" @@ -35,14 +36,18 @@ Status SpillWriter::open() { return Status::OK(); } +SpillWriter::~SpillWriter() { + if (!closed_) { + (void)Status::Error("spill writer not closed correctly"); + } +} + Status SpillWriter::close() { if (closed_ || !file_writer_) { return Status::OK(); } closed_ = true; - tmp_block_.clear_column_data(); - meta_.append((const char*)&max_sub_block_size_, sizeof(max_sub_block_size_)); meta_.append((const char*)&written_blocks_, sizeof(written_blocks_)); @@ -71,17 +76,13 @@ Status SpillWriter::write(const Block& block, size_t& written_bytes) { if (rows <= batch_size_) { return _write_internal(block, written_bytes); } else { - if (is_first_write_) { - is_first_write_ = false; - tmp_block_ = block.clone_empty(); - } - + auto tmp_block = block.clone_empty(); const auto& src_data = block.get_columns_with_type_and_name(); for (size_t row_idx = 0; row_idx < rows;) { - tmp_block_.clear_column_data(); + tmp_block.clear_column_data(); - auto& dst_data = tmp_block_.get_columns_with_type_and_name(); + auto& dst_data = tmp_block.get_columns_with_type_and_name(); size_t block_rows = std::min(rows - row_idx, batch_size_); RETURN_IF_CATCH_EXCEPTION({ @@ -91,7 +92,7 @@ Status SpillWriter::write(const Block& block, size_t& written_bytes) { } }); - RETURN_IF_ERROR(_write_internal(tmp_block_, written_bytes)); + RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes)); row_idx += block_rows; } @@ -106,8 +107,8 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { std::string buff; if (block.rows() > 0) { - PBlock pblock; { + PBlock pblock; SCOPED_TIMER(serialize_timer_); status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes, &compressed_bytes, diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index 14e8120f775d11..45317e8b8cffd1 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -35,7 +35,7 @@ class SpillWriter { file_path_ = dir + "/" + std::to_string(file_index_); } - ~SpillWriter() { (void)close(); } + ~SpillWriter(); Status open(); @@ -79,9 +79,6 @@ class SpillWriter { size_t total_written_bytes_ = 0; std::string meta_; - bool is_first_write_ = true; - Block tmp_block_; - RuntimeProfile::Counter* write_bytes_counter_; RuntimeProfile::Counter* serialize_timer_; RuntimeProfile::Counter* write_timer_;