Skip to content

Commit

Permalink
[CALCITE-6011] Add the planner rule that pushes the Filter past a Window
Browse files Browse the repository at this point in the history
  • Loading branch information
LakeShen committed Oct 8, 2023
1 parent ab3f5b0 commit b580536
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 1 deletion.
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/calcite/plan/RelOptRules.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ private RelOptRules() {
CoreRules.SORT_UNION_TRANSPOSE,
CoreRules.EXCHANGE_REMOVE_CONSTANT_KEYS,
CoreRules.SORT_EXCHANGE_REMOVE_CONSTANT_KEYS,
CoreRules.SAMPLE_TO_FILTER);
CoreRules.SAMPLE_TO_FILTER,
CoreRules.FILTER_WINDOW_TRANSPOSE);

static final List<RelOptRule> ABSTRACT_RULES =
ImmutableList.of(CoreRules.AGGREGATE_ANY_PULL_UP_CONSTANTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ private CoreRules() {}
public static final FilterSetOpTransposeRule FILTER_SET_OP_TRANSPOSE =
FilterSetOpTransposeRule.Config.DEFAULT.toRule();

/** Rule that pushes a {@link Filter} past a {@link org.apache.calcite.rel.core.Window}. */
public static final FilterWindowTransposeRule FILTER_WINDOW_TRANSPOSE =
FilterWindowTransposeRule.Config.DEFAULT.toRule();

/** Rule that reduces constants inside a {@link LogicalFilter}.
*
* @see #JOIN_REDUCE_EXPRESSIONS
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;

import com.google.common.collect.ImmutableList;

import org.immutables.value.Value;

import java.util.ArrayList;
import java.util.List;

/**
* Planner rule that pushes a {@link org.apache.calcite.rel.core.Filter}
* past a {@link org.apache.calcite.rel.core.Window}.
*
* <p> If {@code Filter} condition used columns belongs {@code Window} partition keys,
* then we could push the condition past the {@code Window}.
*
* <p> For example:
* <blockquote><pre>{@code
* LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
* LogicalFilter(condition=[>($1, 0)])
* LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
* LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
* LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
* }</pre></blockquote>
*
* <p> will convert to:
* <blockquote><pre>{@code
* LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
* LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
* LogicalFilter(condition=[>($0, 0)])
* LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
* }</pre></blockquote>
*
* @see CoreRules#FILTER_PROJECT_TRANSPOSE
* @see CoreRules#PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW
* @see CoreRules#FILTER_WINDOW_TRANSPOSE
*/
@Value.Enclosing
public class FilterWindowTransposeRule
extends RelRule<FilterWindowTransposeRule.Config>
implements TransformationRule {

protected FilterWindowTransposeRule(final Config config) {
super(config);
}

@Override public void onMatch(final RelOptRuleCall call) {
final Filter filterRel = call.rel(0);
final Window windowRel = call.rel(1);

// Get the window all groups
List<Window.Group> windowGroups = windowRel.groups;

// The window may have multi groups,now we could only
// deal one group case,so that we could know the partition keys.
if (windowGroups.size() != 1) {
return;
}

final List<RexNode> conditions =
RelOptUtil.conjunctions(filterRel.getCondition());
// The conditions which could be pushed to past window
final List<RexNode> pushedConditions = new ArrayList<>();
final List<RexNode> remainingConditions = new ArrayList<>();
final Window.Group group = windowGroups.get(0);
// Get the window partition keys
final ImmutableBitSet partitionKeys = group.keys;

for (RexNode condition : conditions) {
// Find the condition used columns
ImmutableBitSet rCols = RelOptUtil.InputFinder.bits(condition);
// If the window partition columns contains the condition used columns,
// then we could push the condition to past window.
if (partitionKeys.contains(rCols)) {
pushedConditions.add(condition);
} else {
remainingConditions.add(condition);
}
}

final RelBuilder builder = call.builder();
// Use the pushed conditions to create a new filter above the window's input.
RelNode rel = builder.push(windowRel.getInput()).filter(pushedConditions).build();
if (rel == windowRel.getInput(0)) {
return;
}
rel = windowRel.copy(windowRel.getTraitSet(), ImmutableList.of(rel));
rel = builder.push(rel).filter(remainingConditions).build();
call.transformTo(rel);
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {

Config DEFAULT = ImmutableFilterWindowTransposeRule.Config.of()
.withOperandSupplier(b0 ->
b0.operand(Filter.class).oneInput(b1 ->
b1.operand(Window.class).anyInputs()));

@Override default FilterWindowTransposeRule toRule() {
return new FilterWindowTransposeRule(this);
}
}
}
22 changes: 22 additions & 0 deletions core/src/test/java/org/apache/calcite/test/JdbcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4798,6 +4798,28 @@ private void startOfGroupStep3(String startOfGroup) {
"deptno=20; A=1; B=-1");
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterToWindow() {
CalciteAssert.hr()
.query("select * from (select \"deptno\", sum(1) over (partition by \"deptno\"\n"
+ " order by \"empid\" rows between unbounded preceding and current row) as a\n"
+ "from \"hr\".\"emps\")"
+ " where \"deptno\" = 10")
.explainContains(""
+ "PLAN=EnumerableCalc(expr#0..2=[{inputs}], deptno=[$t1], $1=[$t2])\n"
+ " EnumerableWindow(window#0=[window(partition {1} order by [0] rows between "
+ "UNBOUNDED PRECEDING and CURRENT ROW aggs [SUM($2)])])\n"
+ " EnumerableCalc(expr#0..4=[{inputs}], expr#5=[CAST($t1):INTEGER NOT NULL], "
+ "expr#6=[10], expr#7=[=($t5, $t6)], proj#0..1=[{exprs}], $condition=[$t7])\n"
+ " EnumerableTableScan(table=[[hr, emps]])\n")
.returnsUnordered(
"deptno=10; A=1",
"deptno=10; A=2",
"deptno=10; A=3");
}

/** Tests window aggregate PARTITION BY constant. */
@Test void testWinAggPartitionByConstant() {
CalciteAssert.that()
Expand Down
61 changes: 61 additions & 0 deletions core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,67 @@ private void checkJoinToMultiJoinDoesNotMatchSemiOrAntiJoin(JoinRelType type) {
sql(sql).withRule(CoreRules.FILTER_AGGREGATE_TRANSPOSE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithOnePartitionColumn() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by DEPTNO) from dept)\n"
+ "where DEPTNO > 0";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithTwoPartitionColumns() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) from dept)\n"
+ "where DEPTNO > 0";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithNoPredicatePush() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by NAME) from dept) t\n"
+ "where DEPTNO = 0";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithDoubleWindows() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) as cnt,\n"
+ "sum(1) over (partition by DEPTNO) as all_sum from dept) t\n"
+ "where DEPTNO = 1";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

private RelOptFixture basePushFilterPastAggWithGroupingSets() {
return sql("${sql}")
.withPreRule(CoreRules.PROJECT_MERGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9843,6 +9843,103 @@ LogicalProject(NAME=[$1])
LogicalFilter(condition=[>($0, 10)])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithDoubleWindows">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) as cnt,
sum(1) over (partition by DEPTNO) as all_sum from dept) t
where DEPTNO = 1]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], CNT=[$2], ALL_SUM=[$3])
LogicalFilter(condition=[=($1, 1)])
LogicalProject(NAME=[$1], DEPTNO=[$0], CNT=[$2], ALL_SUM=[$3])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])], window#1=[window(partition {0} aggs [SUM($2)])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], CNT=[$2], ALL_SUM=[$3])
LogicalFilter(condition=[=($0, 1)])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])], window#1=[window(partition {0} aggs [SUM($2)])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithNoPredicatePush">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by NAME) from dept) t
where DEPTNO = 0]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
LogicalFilter(condition=[=($1, 0)])
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {1} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalFilter(condition=[=($0, 0)])
LogicalWindow(window#0=[window(partition {1} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithOnePartitionColumn">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by DEPTNO) from dept)
where DEPTNO > 0]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
LogicalFilter(condition=[>($1, 0)])
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
LogicalFilter(condition=[>($0, 0)])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithTwoPartitionColumns">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) from dept)
where DEPTNO > 0]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
LogicalFilter(condition=[>($1, 0)])
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])])
LogicalFilter(condition=[>($0, 0)])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit b580536

Please sign in to comment.