From 8b02f90cb436d5d404da1661e951abdc8386acb5 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 24 Apr 2024 11:10:04 +0800 Subject: [PATCH] [chore](session_variable) Add 'data_queue_max_blocks' to prevent the DataQueue from occupying too much memory. (#34017) --- be/src/pipeline/exec/data_queue.cpp | 8 +++++++- be/src/pipeline/exec/data_queue.h | 4 ++++ be/src/pipeline/exec/union_sink_operator.cpp | 1 + be/src/pipeline/pipeline_fragment_context.cpp | 5 +++++ be/src/runtime/runtime_state.h | 5 +++++ .../main/java/org/apache/doris/qe/SessionVariable.java | 10 ++++++++++ gensrc/thrift/PaloInternalService.thrift | 3 +++ 7 files changed, 35 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 06c16c7dfa6860f..d248edd908177d6 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -119,10 +119,12 @@ Status DataQueue::get_block_from_queue(std::unique_ptr* outpu } _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes(); _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; + if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) { + _sink_dependencies[_flag_queue_idx]->set_ready(); + } auto old_value = _cur_blocks_total_nums.fetch_sub(1); if (old_value == 1 && _source_dependency) { set_source_block(); - _sink_dependencies[_flag_queue_idx]->set_ready(); } } else { if (_is_finished[_flag_queue_idx]) { @@ -142,6 +144,10 @@ void DataQueue::push_block(std::unique_ptr block, int child_i _cur_bytes_in_queue[child_idx] += block->allocated_bytes(); _queue_blocks[child_idx].emplace_back(std::move(block)); _cur_blocks_nums_in_queue[child_idx] += 1; + + if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) { + _sink_dependencies[child_idx]->block(); + } _cur_blocks_total_nums++; if (_source_dependency) { set_source_ready(); diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index 1a50b7485d1367a..f5bd84cc278d0ae 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -70,6 +70,8 @@ class DataQueue { void set_source_ready(); void set_source_block(); + void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; } + private: std::vector> _queue_blocks_lock; std::vector>> _queue_blocks; @@ -93,6 +95,8 @@ class DataQueue { // only used by streaming agg source operator bool _data_exhausted = false; + int64_t _max_blocks_in_sub_queue = 1; + //this only use to record the queue[0] for profile int64_t _max_bytes_in_queue = 0; int64_t _max_size_of_queue = 0; diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index e466237a3750658..40344882a84e530 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -111,6 +111,7 @@ Status UnionSinkLocalState::open(RuntimeState* state) { for (size_t i = 0; i < p._child_expr.size(); i++) { RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); } + _shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks()); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e53492c9fa09894..e3f9de5fd030a4e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -542,6 +542,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } else { int child_count = union_node->children_count(); auto data_queue = std::make_shared(child_count); + data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks()); for (int child_id = 0; child_id < child_count; ++child_id) { auto new_child_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline)); @@ -564,8 +565,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur std::to_string(agg_node->id()) + ": group by and output is empty"); } + + const int64_t data_queue_max_blocks = _runtime_state->data_queue_max_blocks(); if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) { auto data_queue = std::make_shared(1); + data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks); OperatorBuilderPtr pre_agg_sink = std::make_shared(node->id(), agg_node, data_queue); @@ -577,6 +581,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); } else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) { auto data_queue = std::make_shared(1); + data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks); OperatorBuilderPtr pre_agg_sink = std::make_shared( node->id(), agg_node, data_queue); RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink)); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 644db3e32eb2114..6d5a6451fb4b6ea 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -430,6 +430,11 @@ class RuntimeState { return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version; } + bool data_queue_max_blocks() const { + return _query_options.__isset.data_queue_max_blocks ? _query_options.data_queue_max_blocks + : 1; + } + bool enable_page_cache() const; int partitioned_hash_join_rows_threshold() const { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d89ff2ea8df535f..7ad72283d44319a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -493,6 +493,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_JOIN_SPILL = "enable_join_spill"; public static final String ENABLE_SORT_SPILL = "enable_sort_spill"; public static final String ENABLE_AGG_SPILL = "enable_agg_spill"; + public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; @@ -1745,6 +1746,13 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { needForward = true, fuzzy = true) public boolean enableAggSpill = false; + @VariableMgr.VarAttr( + name = DATA_QUEUE_MAX_BLOCKS, + description = {"DataQueue 中每个子队列允许最大的 block 个数", + "Max blocks in DataQueue."}, + needForward = true, fuzzy = true) + public long dataQueueMaxBlocks = 1; + // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @@ -3171,6 +3179,8 @@ public TQueryOptions toThrift() { tResult.setEnableSortSpill(enableSortSpill); tResult.setEnableAggSpill(enableAggSpill); tResult.setMinRevocableMem(minRevocableMem); + tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 3265cb47b263bfa..2f7886e10c2330b 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -285,6 +285,9 @@ struct TQueryOptions { 104: optional i64 min_revocable_mem = 0 105: optional i64 spill_streaming_agg_mem_limit = 0; + + // max rows of each sub-queue in DataQueue. + 106: optional i64 data_queue_max_blocks = 0; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false