Skip to content

Commit

Permalink
[opt](iceberg)Add a new appearance to display the pushDown count (#…
Browse files Browse the repository at this point in the history
…37046)

## Proposed changes

1. When the count can be pushed down, the specific data will be
displayed later.
```

|      pushdown agg=COUNT (1)     |
```

2. Add a session variable `enable_count_push_down_for_external_table `
to control whether a pushdown count operation is required. Default is
`true`;
```
mysql> show variables like 'enable_count_push_down_for_external_table';
+-------------------------------------------+-------+---------------+---------+
| Variable_name                             | Value | Default_Value | Changed |
+-------------------------------------------+-------+---------------+---------+
| enable_count_push_down_for_external_table | false | true          | 1       |
+-------------------------------------------+-------+---------------+---------+
1 row in set (0.02 sec)
```
  • Loading branch information
wuwenchi authored Jul 10, 2024
1 parent 5e228bc commit 49180fe
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -98,6 +99,10 @@ protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
}

public long getPushDownCount() {
return 0;
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
Expand Down Expand Up @@ -173,7 +178,14 @@ public int compare(TFileRangeDesc o1, TFileRangeDesc o2) {
output.append(String.format("avgRowSize=%s, ", avgRowSize));
}
output.append(String.format("numNodes=%s", numNodes)).append("\n");
output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp)).append("\n");

// pushdown agg
output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp));
if (pushDownAggNoGroupingOp.equals(TPushAggOp.COUNT)) {
output.append(" (").append(getPushDownCount()).append(")");
}
output.append("\n");

if (useTopnFilter()) {
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -214,10 +213,11 @@ private List<Split> doGetSplits() throws UserException {
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long rowCount = getCountFromSnapshot();
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount > 0) {
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount >= 0) {
this.rowCount = rowCount;
return new ArrayList<>();
}

CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
Expand Down Expand Up @@ -271,13 +271,6 @@ private List<Split> doGetSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

// TODO: Need to delete this as we can handle count pushdown in fe side
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
// we can create a special empty split and skip the plan process
return Collections.singletonList(splits.get(0));
}

readPartitionNum = partitionPathSet.size();

return splits;
Expand Down Expand Up @@ -431,7 +424,7 @@ private long getCountFromSnapshot() {

// empty table
if (snapshot == null) {
return -1;
return 0;
}

Map<String, String> summary = snapshot.summary();
Expand All @@ -448,12 +441,17 @@ protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot > 0) {
if (countFromSnapshot >= 0) {
planNode.setPushDownCount(countFromSnapshot);
}
}
}

@Override
public long getPushDownCount() {
return getCountFromSnapshot();
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (pushdownIcebergPredicates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
Expand Down Expand Up @@ -1111,6 +1112,12 @@ public PlanFragment visitPhysicalStorageLayerAggregate(
+ storageLayerAggregate.getAggOp());
}

if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan
&& pushAggOp.equals(TPushAggOp.COUNT)
&& !ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable()) {
pushAggOp = TPushAggOp.NONE;
}

context.setRelationPushAggOp(
storageLayerAggregate.getRelation().getRelationId(), pushAggOp);

Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String FORCE_JNI_SCANNER = "force_jni_scanner";

public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table";

public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection";

public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = "max_msg_size_of_result_receiver";
Expand Down Expand Up @@ -1849,6 +1851,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
private boolean forceJniScanner = false;

@VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"})
private boolean enableCountPushDownForExternalTable = true;

public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids";

public Set<Integer> getIgnoredRuntimeFilterIds() {
Expand Down Expand Up @@ -4121,6 +4127,10 @@ public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}

public boolean isEnableCountPushDownForExternalTable() {
return enableCountPushDownForExternalTable;
}

public boolean isForceToLocalShuffle() {
return enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
1000

-- !q02 --
1000

-- !q03 --
1000

-- !q04 --
1000

-- !q05 --
1000

-- !q06 --
1000

-- !q07 --
1000

-- !q08 --
1000

Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
return
}

String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String catalog_name = "test_iceberg_optimize_count"

try {

sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://${externalEnvIp}:${rest_port}',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""

sql """ switch ${catalog_name} """
sql """ create database if not exists ${catalog_name} """
sql """ use format_v2 """

sqlstr1 = """ select count(*) from sample_cow_orc; """
sqlstr2 = """ select count(*) from sample_cow_parquet; """
sqlstr3 = """ select count(*) from sample_mor_orc; """
sqlstr4 = """ select count(*) from sample_mor_parquet; """

// use push down count
sql """ set enable_count_push_down_for_external_table=true; """

qt_q01 """${sqlstr1}"""
qt_q02 """${sqlstr2}"""
qt_q03 """${sqlstr3}"""
qt_q04 """${sqlstr4}"""

explain {
sql("""${sqlstr1}""")
contains """pushdown agg=COUNT (1000)"""
}
explain {
sql("""${sqlstr2}""")
contains """pushdown agg=COUNT (1000)"""
}
explain {
sql("""${sqlstr3}""")
contains """pushdown agg=COUNT (1000)"""
}
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=COUNT (1000)"""
}

// don't use push down count
sql """ set enable_count_push_down_for_external_table=false; """

qt_q05 """${sqlstr1}"""
qt_q06 """${sqlstr2}"""
qt_q07 """${sqlstr3}"""
qt_q08 """${sqlstr4}"""

explain {
sql("""${sqlstr1}""")
contains """pushdown agg=NONE"""
}
explain {
sql("""${sqlstr2}""")
contains """pushdown agg=NONE"""
}
explain {
sql("""${sqlstr3}""")
contains """pushdown agg=NONE"""
}
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=NONE"""
}

} finally {
sql """ set enable_count_push_down_for_external_table=true; """
sql """drop catalog if exists ${catalog_name}"""
}
}

0 comments on commit 49180fe

Please sign in to comment.