Skip to content

Commit

Permalink
[fix](nereids) runtime filter with probe expr should be pushed thoug…
Browse files Browse the repository at this point in the history
…h set operator apache#33010
  • Loading branch information
englefly committed Apr 17, 2024
1 parent e61848d commit 2fcda99
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
Expand Down Expand Up @@ -495,6 +496,23 @@ public PhysicalRelation visitPhysicalRelation(PhysicalRelation relation, Cascade
return relation;
}

@Override
public PhysicalSetOperation visitPhysicalSetOperation(PhysicalSetOperation setOperation, CascadesContext context) {
setOperation.children().forEach(child -> child.accept(this, context));
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
if (!setOperation.getRegularChildrenOutputs().isEmpty()) {
// example: RegularChildrenOutputs is empty
// "select 1 a, 2 b union all select 3, 4 union all select 10 e, 20 f;"
for (int i = 0; i < setOperation.getOutput().size(); i++) {
Pair childSlotPair = ctx.getAliasTransferPair(setOperation.getRegularChildOutput(0).get(0));
if (childSlotPair != null) {
ctx.aliasTransferMapPut(setOperation.getOutput().get(i), childSlotPair);
}
}
}
return setOperation;
}

// runtime filter build side ndv
private long getBuildSideNdv(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
ComparisonPredicate compare) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !rf_setop --
PhysicalResultSink
--hashAgg[GLOBAL]
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((T.l_linenumber = expr_cast(r_regionkey as BIGINT))) otherCondition=() build RFs:RF0 expr_cast(r_regionkey as BIGINT)->[cast(l_linenumber as BIGINT),o_orderkey]
------------PhysicalExcept
--------------PhysicalProject
----------------hashAgg[GLOBAL]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------hashAgg[LOCAL]
----------------------PhysicalProject
------------------------PhysicalOlapScan[lineitem] apply RFs: RF0
--------------PhysicalDistribute[DistributionSpecHash]
----------------PhysicalProject
------------------hashAgg[LOCAL]
--------------------PhysicalProject
----------------------PhysicalOlapScan[orders] apply RFs: RF0
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------PhysicalOlapScan[region]

-- !rf_setop_expr --
PhysicalResultSink
--hashAgg[GLOBAL]
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((expr_abs(l_linenumber) = expr_cast(r_regionkey as LARGEINT))) otherCondition=() build RFs:RF0 expr_cast(r_regionkey as LARGEINT)->[abs(l_linenumber),abs(o_orderkey)]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------PhysicalExcept
------------------PhysicalProject
--------------------hashAgg[GLOBAL]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashAgg[LOCAL]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[lineitem] apply RFs: RF0
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------hashAgg[LOCAL]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[orders] apply RFs: RF0
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------PhysicalOlapScan[region]

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

suite("test_pushdown_setop") {
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
sql 'set exec_mem_limit=21G'
sql 'set be_number_for_test=3'
sql 'set parallel_fragment_exec_instance_num=8; '
sql 'set parallel_pipeline_task_num=8; '
sql 'set forbid_unknown_col_stats=true'
sql 'set enable_nereids_timeout = false'
sql 'set enable_runtime_filter_prune=false'
sql 'set runtime_filter_type=8'
qt_rf_setop """
explain shape plan
select count() from ((select l_linenumber from lineitem) except (select o_orderkey from orders)) T join region on T.l_linenumber = r_regionkey;
"""

qt_rf_setop_expr """
explain shape plan select count() from ((select l_linenumber from lineitem) except (select o_orderkey from orders)) T join region on abs(T.l_linenumber) = r_regionkey;
"""
}

0 comments on commit 2fcda99

Please sign in to comment.