Skip to content

Commit

Permalink
[opt](scanner) Control the degree of parallelism of scanner when only…
Browse files Browse the repository at this point in the history
… limit involved (#39927)

For queries like `select * from tbl limit 100;`, we should limit the
parallelism to 1.

We already have strategies in our code, but we can not control its
behavior. So add session variables to control them.

Previous pr like #33888 #36535 already has done something similar, this
pr integrates them.
  • Loading branch information
zhiqiang-hhhh authored Sep 4, 2024
1 parent 4c1b929 commit 7b86057
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 8 deletions.
36 changes: 30 additions & 6 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

namespace doris::pipeline {

const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT = 10000;

#define RETURN_IF_PUSH_DOWN(stmt, status) \
if (pdt == PushDownType::UNACCEPTABLE) { \
status = stmt; \
Expand Down Expand Up @@ -1147,12 +1149,6 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode&
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs),
_runtime_filter_descs(tnode.runtime_filters),
_parallel_tasks(parallel_tasks) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 && tnode.limit < 1024) {
_should_run_serial = true;
}
}
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
Expand Down Expand Up @@ -1185,6 +1181,34 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
if (tnode.__isset.topn_filter_source_node_ids) {
topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
}

// The first branch is kept for compatibility with the old version of the FE
if (!query_options.__isset.enable_adaptive_pipeline_task_serial_read_on_limit) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 &&
tnode.limit <= ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT) {
_should_run_serial = true;
}
}
} else {
DCHECK(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit);
// The set of enable_adaptive_pipeline_task_serial_read_on_limit
// is checked in previous branch.
if (query_options.enable_adaptive_pipeline_task_serial_read_on_limit) {
int32_t adaptive_pipeline_task_serial_read_on_limit =
ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT;
if (query_options.__isset.adaptive_pipeline_task_serial_read_on_limit) {
adaptive_pipeline_task_serial_read_on_limit =
query_options.adaptive_pipeline_task_serial_read_on_limit;
}

if (tnode.limit > 0 && tnode.limit <= adaptive_pipeline_task_serial_read_on_limit) {
_should_run_serial = true;
}
}
}

return Status::OK();
}

Expand Down
16 changes: 14 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,20 @@ public int getScanRangeNum() {
}

public boolean shouldUseOneInstance(ConnectContext ctx) {
long limitRowsForSingleInstance = ctx == null ? 10000 : ctx.getSessionVariable().limitRowsForSingleInstance;
return hasLimit() && getLimit() < limitRowsForSingleInstance && conjuncts.isEmpty();
int adaptivePipelineTaskSerialReadOnLimit = 10000;

if (ctx != null) {
if (ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) {
adaptivePipelineTaskSerialReadOnLimit = ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit;
} else {
return false;
}
} else {
// No connection context, typically for broker load.
}

// For UniqueKey table, we will use multiple instance.
return hasLimit() && getLimit() <= adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty();
}

// In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions.
Expand Down
25 changes: 25 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ public class SessionVariable implements Serializable, Writable {

public static final String IN_LIST_VALUE_COUNT_THRESHOLD = "in_list_value_count_threshold";

public static final String ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
"enable_adaptive_pipeline_task_serial_read_on_limit";
public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
"adaptive_pipeline_task_serial_read_on_limit";

/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
*/
Expand Down Expand Up @@ -2115,13 +2120,30 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
})
public boolean enableFallbackOnMissingInvertedIndex = true;


@VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = {
"in条件value数量大于这个threshold后将不会走fast_execute",
"When the number of values in the IN condition exceeds this threshold,"
+ " fast_execute will not be used."
})
public int inListValueCountThreshold = 10;

@VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = {
"开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于"
+ "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 的并行度将会被设置为 1",
"When enabled, the pipeline task concurrency will be adjusted automatically. When the scan node has no filter "
+ "conditions and the limit parameter is less than the number of rows specified in "
+ "adaptive_pipeline_task_serial_read_on_limit, the parallelism of the scan will be set to 1."
})
public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true;

@VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = {
"当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan 的并行度将会被设置为 1 的行数阈值",
"When enable_adaptive_pipeline_task_serial_read_on_limit is enabled, "
+ "the number of rows at which the parallelism of the scan will be set to 1."
})
public int adaptivePipelineTaskSerialReadOnLimit = 10000;

public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
Expand Down Expand Up @@ -3700,6 +3722,9 @@ public TQueryOptions toThrift() {
tResult.setKeepCarriageReturn(keepCarriageReturn);

tResult.setEnableSegmentCache(enableSegmentCache);

tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
tResult.setInListValueCountThreshold(inListValueCountThreshold);
return tResult;
}
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ struct TQueryOptions {

127: optional i32 in_list_value_count_threshold = 10;

128: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true;
129: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils


def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

suite('adaptive_pipeline_task_serial_read_on_limit') {
sql """
DROP TABLE IF EXISTS adaptive_pipeline_task_serial_read_on_limit;
"""
sql """
CREATE TABLE if not exists `adaptive_pipeline_task_serial_read_on_limit` (
`id` INT,
`name` varchar(32)
) ENGINE=OLAP
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// Insert data to table
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

def uuidString = UUID.randomUUID().toString()
sql "set enable_profile=true"
// set parallel_pipeline_task_num to 1 so that only one scan node,
// and we can check MaxScannerThreadNum in profile.
sql "set parallel_pipeline_task_num=1;"
// no limit, MaxScannerThreadNum = TabletNum
sql """
select "no_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit;
"""
sql "set parallel_pipeline_task_num=0;"
// With Limit, MaxScannerThreadNum = 1
sql """
select "with_limit_1_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10000;
"""
// With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum
sql """
select "with_limit_2_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 10001;
"""
sql """
set enable_adaptive_pipeline_task_serial_read_on_limit=false;
"""
sql "set parallel_pipeline_task_num=1;"
// Forbid the strategy, with limit, MaxScannerThreadNum = TabletNum
sql """
select "not_enable_limit_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 100;
"""

sql "set parallel_pipeline_task_num=0;"

// Enable the strategy, with limit 20, MaxScannerThreadNum = 1
sql """
set enable_adaptive_pipeline_task_serial_read_on_limit=true;
"""
sql """
set adaptive_pipeline_task_serial_read_on_limit=10;
"""
sql """
select "modify_to_20_${uuidString}", * from adaptive_pipeline_task_serial_read_on_limit limit 15;
"""

sql "set enable_profile=false"

def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryIdNoLimit1 = "";
String queryIdWithLimit1 = "";
String queryIdWithLimit2 = "";
String queryIDNotEnableLimit = "";
String queryIdModifyTo20 = "";

logger.info("{}", uuidString)

for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains("no_limit_1_${uuidString}")) {
queryIdNoLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) {
queryIdWithLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("with_limit_2_${uuidString}")) {
queryIdWithLimit2 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("not_enable_limit_${uuidString}")) {
queryIDNotEnableLimit = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
if (profileItem["Sql Statement"].toString().contains("modify_to_20_${uuidString}")) {
queryIdModifyTo20 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
}

logger.info("queryIdNoLimit1_${uuidString}: {}", queryIdNoLimit1)
logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)
logger.info("queryIdWithLimit2_${uuidString}: {}", queryIdWithLimit2)
logger.info("queryIDNotEnableLimit_${uuidString}: {}", queryIDNotEnableLimit)
logger.info("queryIdModifyTo20_${uuidString}: {}", queryIdModifyTo20)

assertTrue(queryIdNoLimit1 != "")
assertTrue(queryIdWithLimit1 != "")
assertTrue(queryIdWithLimit2 != "")
assertTrue(queryIDNotEnableLimit != "")
assertTrue(queryIdModifyTo20 != "")

def String profileNoLimit1 = getProfile(queryIdNoLimit1).toString()
def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
def String profileWithLimit2 = getProfile(queryIdWithLimit2).toString()
def String profileNotEnableLimit = getProfile(queryIDNotEnableLimit).toString()
def String profileModifyTo20 = getProfile(queryIdModifyTo20).toString()

assertTrue(profileNoLimit1.contains("- MaxScannerThreadNum: 10"))
assertTrue(profileWithLimit1.contains("- MaxScannerThreadNum: 1"))
assertTrue(profileWithLimit2.contains("- MaxScannerThreadNum: 10"))
assertTrue(profileNotEnableLimit.contains("- MaxScannerThreadNum: 10"))
assertTrue(profileModifyTo20.contains("- MaxScannerThreadNum: 1"))
}

0 comments on commit 7b86057

Please sign in to comment.