diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 55d990365a2a8e..928c835e7c6b1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -51,6 +51,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; @@ -495,6 +496,23 @@ public PhysicalRelation visitPhysicalRelation(PhysicalRelation relation, Cascade return relation; } + @Override + public PhysicalSetOperation visitPhysicalSetOperation(PhysicalSetOperation setOperation, CascadesContext context) { + setOperation.children().forEach(child -> child.accept(this, context)); + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + if (!setOperation.getRegularChildrenOutputs().isEmpty()) { + // example: RegularChildrenOutputs is empty + // "select 1 a, 2 b union all select 3, 4 union all select 10 e, 20 f;" + for (int i = 0; i < setOperation.getOutput().size(); i++) { + Pair childSlotPair = ctx.getAliasTransferPair(setOperation.getRegularChildOutput(0).get(0)); + if (childSlotPair != null) { + ctx.aliasTransferMapPut(setOperation.getOutput().get(i), childSlotPair); + } + } + } + return setOperation; + } + // runtime filter build side ndv private long getBuildSideNdv(AbstractPhysicalJoin join, ComparisonPredicate compare) { diff --git a/regression-test/data/nereids_tpch_shape_sf1000_p0/runtime_filter/test_pushdown_setop.out b/regression-test/data/nereids_tpch_shape_sf1000_p0/runtime_filter/test_pushdown_setop.out new file mode 100644 index 00000000000000..5d0e6c3143d8d1 --- /dev/null +++ b/regression-test/data/nereids_tpch_shape_sf1000_p0/runtime_filter/test_pushdown_setop.out @@ -0,0 +1,49 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !rf_setop -- +PhysicalResultSink +--hashAgg[GLOBAL] +----PhysicalDistribute[DistributionSpecGather] +------hashAgg[LOCAL] +--------PhysicalProject +----------hashJoin[INNER_JOIN] hashCondition=((T.l_linenumber = expr_cast(r_regionkey as BIGINT))) otherCondition=() build RFs:RF0 expr_cast(r_regionkey as BIGINT)->[cast(l_linenumber as BIGINT),o_orderkey] +------------PhysicalExcept +--------------PhysicalProject +----------------hashAgg[GLOBAL] +------------------PhysicalDistribute[DistributionSpecHash] +--------------------hashAgg[LOCAL] +----------------------PhysicalProject +------------------------PhysicalOlapScan[lineitem] apply RFs: RF0 +--------------PhysicalDistribute[DistributionSpecHash] +----------------PhysicalProject +------------------hashAgg[LOCAL] +--------------------PhysicalProject +----------------------PhysicalOlapScan[orders] apply RFs: RF0 +------------PhysicalDistribute[DistributionSpecHash] +--------------PhysicalProject +----------------PhysicalOlapScan[region] + +-- !rf_setop_expr -- +PhysicalResultSink +--hashAgg[GLOBAL] +----PhysicalDistribute[DistributionSpecGather] +------hashAgg[LOCAL] +--------PhysicalProject +----------hashJoin[INNER_JOIN] hashCondition=((expr_abs(l_linenumber) = expr_cast(r_regionkey as LARGEINT))) otherCondition=() build RFs:RF0 expr_cast(r_regionkey as LARGEINT)->[abs(l_linenumber),abs(o_orderkey)] +------------PhysicalDistribute[DistributionSpecHash] +--------------PhysicalProject +----------------PhysicalExcept +------------------PhysicalProject +--------------------hashAgg[GLOBAL] +----------------------PhysicalDistribute[DistributionSpecHash] +------------------------hashAgg[LOCAL] +--------------------------PhysicalProject +----------------------------PhysicalOlapScan[lineitem] apply RFs: RF0 +------------------PhysicalDistribute[DistributionSpecHash] +--------------------PhysicalProject +----------------------hashAgg[LOCAL] +------------------------PhysicalProject +--------------------------PhysicalOlapScan[orders] apply RFs: RF0 +------------PhysicalDistribute[DistributionSpecHash] +--------------PhysicalProject +----------------PhysicalOlapScan[region] + diff --git a/regression-test/suites/nereids_tpch_shape_sf1000_p0/runtime_filter/test_pushdown_setop.groovy b/regression-test/suites/nereids_tpch_shape_sf1000_p0/runtime_filter/test_pushdown_setop.groovy new file mode 100644 index 00000000000000..04ad8e20bddb24 --- /dev/null +++ b/regression-test/suites/nereids_tpch_shape_sf1000_p0/runtime_filter/test_pushdown_setop.groovy @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +suite("test_pushdown_setop") { + String db = context.config.getDbNameByFile(new File(context.file.parent)) + sql "use ${db}" + sql 'set enable_nereids_planner=true' + sql 'set enable_fallback_to_original_planner=false' + sql 'set exec_mem_limit=21G' + sql 'set be_number_for_test=3' + sql 'set parallel_fragment_exec_instance_num=8; ' + sql 'set parallel_pipeline_task_num=8; ' + sql 'set forbid_unknown_col_stats=true' + sql 'set enable_nereids_timeout = false' + sql 'set enable_runtime_filter_prune=false' + sql 'set runtime_filter_type=8' + qt_rf_setop """ + explain shape plan + select count() from ((select l_linenumber from lineitem) except (select o_orderkey from orders)) T join region on T.l_linenumber = r_regionkey; + """ + + qt_rf_setop_expr """ + explain shape plan select count() from ((select l_linenumber from lineitem) except (select o_orderkey from orders)) T join region on abs(T.l_linenumber) = r_regionkey; + """ +} +