From 72acdf5a056bce50071f8a0c96fb52d7b085f5af Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 12 Sep 2024 16:43:42 +0800 Subject: [PATCH] update --- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++++-- .../main/java/org/apache/doris/qe/SessionVariable.java | 6 ++++++ gensrc/thrift/PaloInternalService.thrift | 2 ++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 066d42760d5b616..ac527ed8e698885 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -703,7 +703,9 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( } return Status::OK(); }; - if (target_size > 1) { + if (target_size > 1 && + (_runtime_state->query_options().__isset.parallel_prepare_threshold && + target_size > _runtime_state->query_options().parallel_prepare_threshold)) { Status prepare_status[target_size]; std::mutex m; std::condition_variable cv; @@ -730,7 +732,9 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( } } } else { - RETURN_IF_ERROR(pre_and_submit(0, this)); + for (size_t i = 0; i < target_size; i++) { + RETURN_IF_ERROR(pre_and_submit(i, this)); + } } _pipeline_parent_map.clear(); _dag.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 88702928f8d75fe..002d39ed733d8c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -278,6 +278,8 @@ public class SessionVariable implements Serializable, Writable { public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold"; + public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold"; + public static final String ENABLE_PROJECTION = "enable_projection"; public static final String CHECK_OVERFLOW_FOR_DECIMAL = "check_overflow_for_decimal"; @@ -1038,6 +1040,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD) public double autoBroadcastJoinThreshold = 0.8; + @VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD) + public int parallelPrepareThreshold = 32; + @VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER) private boolean enableJoinReorderBasedCost = false; @@ -3403,6 +3408,7 @@ public TQueryOptions toThrift() { tResult.setNumScannerThreads(numScannerThreads); tResult.setScannerScaleUpRatio(scannerScaleUpRatio); tResult.setMaxColumnReaderNum(maxColumnReaderNum); + tResult.setParallelPrepareThreshold(parallelPrepareThreshold); // TODO chenhao, reservation will be calculated by cost tResult.setMinReservation(0); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b26e271b911584f..0fb87723d63fda6 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -326,6 +326,8 @@ struct TQueryOptions { 128: optional bool enable_verbose_profile = false; 129: optional i32 rpc_verbose_profile_max_instance_count = 0; + // only in 2.1 + 999: optional i32 parallel_prepare_threshold = 0; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false }