From b5805366d593797c75cc65ce476bf5471fc8fc98 Mon Sep 17 00:00:00 2001 From: shenlang Date: Fri, 22 Sep 2023 23:23:08 +0800 Subject: [PATCH] [CALCITE-6011] Add the planner rule that pushes the Filter past a Window --- .../org/apache/calcite/plan/RelOptRules.java | 3 +- .../apache/calcite/rel/rules/CoreRules.java | 4 + .../rel/rules/FilterWindowTransposeRule.java | 131 ++++++++++++++++++ .../org/apache/calcite/test/JdbcTest.java | 22 +++ .../apache/calcite/test/RelOptRulesTest.java | 61 ++++++++ .../apache/calcite/test/RelOptRulesTest.xml | 97 +++++++++++++ 6 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/calcite/rel/rules/FilterWindowTransposeRule.java diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptRules.java b/core/src/main/java/org/apache/calcite/plan/RelOptRules.java index 91b023bca7a8..e3355b38ae6a 100644 --- a/core/src/main/java/org/apache/calcite/plan/RelOptRules.java +++ b/core/src/main/java/org/apache/calcite/plan/RelOptRules.java @@ -88,7 +88,8 @@ private RelOptRules() { CoreRules.SORT_UNION_TRANSPOSE, CoreRules.EXCHANGE_REMOVE_CONSTANT_KEYS, CoreRules.SORT_EXCHANGE_REMOVE_CONSTANT_KEYS, - CoreRules.SAMPLE_TO_FILTER); + CoreRules.SAMPLE_TO_FILTER, + CoreRules.FILTER_WINDOW_TRANSPOSE); static final List ABSTRACT_RULES = ImmutableList.of(CoreRules.AGGREGATE_ANY_PULL_UP_CONSTANTS, diff --git a/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java b/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java index c02b1ab12f75..3e68da8013b9 100644 --- a/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java +++ b/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java @@ -317,6 +317,10 @@ private CoreRules() {} public static final FilterSetOpTransposeRule FILTER_SET_OP_TRANSPOSE = FilterSetOpTransposeRule.Config.DEFAULT.toRule(); + /** Rule that pushes a {@link Filter} past a {@link org.apache.calcite.rel.core.Window}. */ + public static final FilterWindowTransposeRule FILTER_WINDOW_TRANSPOSE = + FilterWindowTransposeRule.Config.DEFAULT.toRule(); + /** Rule that reduces constants inside a {@link LogicalFilter}. * * @see #JOIN_REDUCE_EXPRESSIONS diff --git a/core/src/main/java/org/apache/calcite/rel/rules/FilterWindowTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/FilterWindowTransposeRule.java new file mode 100644 index 000000000000..bc38dafd03d8 --- /dev/null +++ b/core/src/main/java/org/apache/calcite/rel/rules/FilterWindowTransposeRule.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ImmutableBitSet; + +import com.google.common.collect.ImmutableList; + +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule that pushes a {@link org.apache.calcite.rel.core.Filter} + * past a {@link org.apache.calcite.rel.core.Window}. + * + *

If {@code Filter} condition used columns belongs {@code Window} partition keys, + * then we could push the condition past the {@code Window}. + * + *

For example: + *

{@code
+ *    LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
+ *     LogicalFilter(condition=[>($1, 0)])
+ *       LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
+ *         LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
+ *           LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+ *  }
+ * + *

will convert to: + *

{@code
+ *    LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
+ *      LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
+ *        LogicalFilter(condition=[>($0, 0)])
+ *          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+ * }
+ * + * @see CoreRules#FILTER_PROJECT_TRANSPOSE + * @see CoreRules#PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW + * @see CoreRules#FILTER_WINDOW_TRANSPOSE + */ +@Value.Enclosing +public class FilterWindowTransposeRule + extends RelRule + implements TransformationRule { + + protected FilterWindowTransposeRule(final Config config) { + super(config); + } + + @Override public void onMatch(final RelOptRuleCall call) { + final Filter filterRel = call.rel(0); + final Window windowRel = call.rel(1); + + // Get the window all groups + List windowGroups = windowRel.groups; + + // The window may have multi groups,now we could only + // deal one group case,so that we could know the partition keys. + if (windowGroups.size() != 1) { + return; + } + + final List conditions = + RelOptUtil.conjunctions(filterRel.getCondition()); + // The conditions which could be pushed to past window + final List pushedConditions = new ArrayList<>(); + final List remainingConditions = new ArrayList<>(); + final Window.Group group = windowGroups.get(0); + // Get the window partition keys + final ImmutableBitSet partitionKeys = group.keys; + + for (RexNode condition : conditions) { + // Find the condition used columns + ImmutableBitSet rCols = RelOptUtil.InputFinder.bits(condition); + // If the window partition columns contains the condition used columns, + // then we could push the condition to past window. + if (partitionKeys.contains(rCols)) { + pushedConditions.add(condition); + } else { + remainingConditions.add(condition); + } + } + + final RelBuilder builder = call.builder(); + // Use the pushed conditions to create a new filter above the window's input. + RelNode rel = builder.push(windowRel.getInput()).filter(pushedConditions).build(); + if (rel == windowRel.getInput(0)) { + return; + } + rel = windowRel.copy(windowRel.getTraitSet(), ImmutableList.of(rel)); + rel = builder.push(rel).filter(remainingConditions).build(); + call.transformTo(rel); + } + + /** Rule configuration. */ + @Value.Immutable + public interface Config extends RelRule.Config { + + Config DEFAULT = ImmutableFilterWindowTransposeRule.Config.of() + .withOperandSupplier(b0 -> + b0.operand(Filter.class).oneInput(b1 -> + b1.operand(Window.class).anyInputs())); + + @Override default FilterWindowTransposeRule toRule() { + return new FilterWindowTransposeRule(this); + } + } +} diff --git a/core/src/test/java/org/apache/calcite/test/JdbcTest.java b/core/src/test/java/org/apache/calcite/test/JdbcTest.java index 8abb94f7dc61..36890f2cbbee 100644 --- a/core/src/test/java/org/apache/calcite/test/JdbcTest.java +++ b/core/src/test/java/org/apache/calcite/test/JdbcTest.java @@ -4798,6 +4798,28 @@ private void startOfGroupStep3(String startOfGroup) { "deptno=20; A=1; B=-1"); } + /** Test case for + * [CALCITE-6011] + * Add the planner rule that pushes the Filter past a Window. */ + @Test void testPushFilterToWindow() { + CalciteAssert.hr() + .query("select * from (select \"deptno\", sum(1) over (partition by \"deptno\"\n" + + " order by \"empid\" rows between unbounded preceding and current row) as a\n" + + "from \"hr\".\"emps\")" + + " where \"deptno\" = 10") + .explainContains("" + + "PLAN=EnumerableCalc(expr#0..2=[{inputs}], deptno=[$t1], $1=[$t2])\n" + + " EnumerableWindow(window#0=[window(partition {1} order by [0] rows between " + + "UNBOUNDED PRECEDING and CURRENT ROW aggs [SUM($2)])])\n" + + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=[CAST($t1):INTEGER NOT NULL], " + + "expr#6=[10], expr#7=[=($t5, $t6)], proj#0..1=[{exprs}], $condition=[$t7])\n" + + " EnumerableTableScan(table=[[hr, emps]])\n") + .returnsUnordered( + "deptno=10; A=1", + "deptno=10; A=2", + "deptno=10; A=3"); + } + /** Tests window aggregate PARTITION BY constant. */ @Test void testWinAggPartitionByConstant() { CalciteAssert.that() diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java index 3df32bafa5e9..1dc7b7f9606c 100644 --- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java +++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java @@ -920,6 +920,67 @@ private void checkJoinToMultiJoinDoesNotMatchSemiOrAntiJoin(JoinRelType type) { sql(sql).withRule(CoreRules.FILTER_AGGREGATE_TRANSPOSE).check(); } + /** Test case for + * [CALCITE-6011] + * Add the planner rule that pushes the Filter past a Window. */ + @Test void testPushFilterPastWindowWithOnePartitionColumn() { + final String sql = "select * from\n" + + "(select NAME, DEPTNO, count(*) over (partition by DEPTNO) from dept)\n" + + "where DEPTNO > 0"; + sql(sql) + .withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW) + .withRule( + CoreRules.FILTER_PROJECT_TRANSPOSE, + CoreRules.FILTER_WINDOW_TRANSPOSE, + CoreRules.PROJECT_REMOVE).check(); + } + + /** Test case for + * [CALCITE-6011] + * Add the planner rule that pushes the Filter past a Window. */ + @Test void testPushFilterPastWindowWithTwoPartitionColumns() { + final String sql = "select * from\n" + + "(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) from dept)\n" + + "where DEPTNO > 0"; + sql(sql) + .withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW) + .withRule( + CoreRules.FILTER_PROJECT_TRANSPOSE, + CoreRules.FILTER_WINDOW_TRANSPOSE, + CoreRules.PROJECT_REMOVE).check(); + } + + /** Test case for + * [CALCITE-6011] + * Add the planner rule that pushes the Filter past a Window. */ + @Test void testPushFilterPastWindowWithNoPredicatePush() { + final String sql = "select * from\n" + + "(select NAME, DEPTNO, count(*) over (partition by NAME) from dept) t\n" + + "where DEPTNO = 0"; + sql(sql) + .withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW) + .withRule( + CoreRules.FILTER_PROJECT_TRANSPOSE, + CoreRules.FILTER_WINDOW_TRANSPOSE, + CoreRules.PROJECT_REMOVE).check(); + } + + /** Test case for + * [CALCITE-6011] + * Add the planner rule that pushes the Filter past a Window. */ + @Test void testPushFilterPastWindowWithDoubleWindows() { + final String sql = "select * from\n" + + "(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) as cnt,\n" + + "sum(1) over (partition by DEPTNO) as all_sum from dept) t\n" + + "where DEPTNO = 1"; + sql(sql) + .withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW) + .withRule( + CoreRules.FILTER_PROJECT_TRANSPOSE, + CoreRules.FILTER_WINDOW_TRANSPOSE, + CoreRules.PROJECT_REMOVE).check(); + } + private RelOptFixture basePushFilterPastAggWithGroupingSets() { return sql("${sql}") .withPreRule(CoreRules.PROJECT_MERGE, diff --git a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml index 2e6d55427bc3..b7415549d3b5 100644 --- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml +++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml @@ -9843,6 +9843,103 @@ LogicalProject(NAME=[$1]) LogicalFilter(condition=[>($0, 10)]) LogicalTableScan(table=[[CATALOG, SALES, DEPT]]) LogicalTableScan(table=[[CATALOG, SALES, DEPT]]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + 0]]> + + + ($1, 0)]) + LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2]) + LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])]) + LogicalTableScan(table=[[CATALOG, SALES, DEPT]]) +]]> + + + ($0, 0)]) + LogicalTableScan(table=[[CATALOG, SALES, DEPT]]) +]]> + + + + + 0]]> + + + ($1, 0)]) + LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2]) + LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])]) + LogicalTableScan(table=[[CATALOG, SALES, DEPT]]) +]]> + + + ($0, 0)]) + LogicalTableScan(table=[[CATALOG, SALES, DEPT]]) ]]>