Skip to content

Commit

Permalink
[chore](session_variable) Add 'data_queue_max_blocks' to prevent the …
Browse files Browse the repository at this point in the history
…DataQueue from occupying too much memory. (#34017)
  • Loading branch information
mrhhsg authored and Doris-Extras committed Apr 24, 2024
1 parent cc3decf commit 94cc58b
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 1 deletion.
8 changes: 7 additions & 1 deletion be/src/pipeline/exec/data_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
}
_cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
_sink_dependencies[_flag_queue_idx]->set_ready();
}
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
set_source_block();
_sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
if (_is_finished[_flag_queue_idx]) {
Expand All @@ -142,6 +144,10 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;

if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
_sink_dependencies[child_idx]->block();
}
_cur_blocks_total_nums++;
if (_source_dependency) {
set_source_ready();
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class DataQueue {
void set_source_ready();
void set_source_block();

void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; }

private:
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
Expand All @@ -93,6 +95,8 @@ class DataQueue {
// only used by streaming agg source operator
bool _data_exhausted = false;

int64_t _max_blocks_in_sub_queue = 1;

//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/union_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Status UnionSinkLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < p._child_expr.size(); i++) {
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
}
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
} else {
int child_count = union_node->children_count();
auto data_queue = std::make_shared<DataQueue>(child_count);
data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks());
for (int child_id = 0; child_id < child_count; ++child_id) {
auto new_child_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline));
Expand All @@ -564,8 +565,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
std::to_string(agg_node->id()) +
": group by and output is empty");
}

const int64_t data_queue_max_blocks = _runtime_state->data_queue_max_blocks();
if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
data_queue);
Expand All @@ -577,6 +581,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ class RuntimeState {
return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version;
}

bool data_queue_max_blocks() const {
return _query_options.__isset.data_queue_max_blocks ? _query_options.data_queue_max_blocks
: 1;
}

bool enable_page_cache() const;

int partitioned_hash_join_rows_threshold() const {
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";

public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";

Expand Down Expand Up @@ -1745,6 +1746,13 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
needForward = true, fuzzy = true)
public boolean enableAggSpill = false;

@VariableMgr.VarAttr(
name = DATA_QUEUE_MAX_BLOCKS,
description = {"DataQueue 中每个子队列允许最大的 block 个数",
"Max blocks in DataQueue."},
needForward = true, fuzzy = true)
public long dataQueueMaxBlocks = 1;

// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
Expand Down Expand Up @@ -3171,6 +3179,8 @@ public TQueryOptions toThrift() {
tResult.setEnableSortSpill(enableSortSpill);
tResult.setEnableAggSpill(enableAggSpill);
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);

return tResult;
}

Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ struct TQueryOptions {
104: optional i64 min_revocable_mem = 0

105: optional i64 spill_streaming_agg_mem_limit = 0;

// max rows of each sub-queue in DataQueue.
106: optional i64 data_queue_max_blocks = 0;

// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
Expand Down

0 comments on commit 94cc58b

Please sign in to comment.