From 106a8c0db30111391610ba9ea8f208ee4611071f Mon Sep 17 00:00:00 2001 From: jakevin Date: Thu, 28 Mar 2024 14:48:47 +0800 Subject: [PATCH] [feature](Nereids): add ColumnPruningPostProcessor. (#32800) (cherry picked from commit 5970f983db5dc95341c16f3459d883ce0f9676b3) --- .../post/ColumnPruningPostProcessor.java | 107 ++++++++++++++++++ .../processor/post/PlanPostProcessors.java | 1 + .../nereids/processor/post/TopNScanOpt.java | 6 +- .../plans/physical/AbstractPhysicalPlan.java | 2 +- .../ColumnPruningPostProcessorTest.java | 62 ++++++++++ .../nereids_ssb_shape_sf100_p0/shape/q4.3.out | 25 ++-- 6 files changed, 188 insertions(+), 15 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java new file mode 100644 index 000000000000000..40d25ddd748f196 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.annotation.DependsRules; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Prune column for Join-Cluster + */ +@DependsRules({ + MergeProjectPostProcessor.class +}) +public class ColumnPruningPostProcessor extends PlanPostProcessor { + @Override + public PhysicalProject visitPhysicalProject(PhysicalProject project, CascadesContext ctx) { + Plan child = project.child(); + Plan newChild = child.accept(this, ctx); + if (newChild instanceof AbstractPhysicalJoin) { + AbstractPhysicalJoin join = (AbstractPhysicalJoin) newChild; + Plan left = join.left(); + Plan right = join.right(); + Set leftOutput = left.getOutputSet(); + Set rightOutput = right.getOutputSet(); + + Set usedSlots = project.getProjects().stream().flatMap(ne -> ne.getInputSlots().stream()) + .collect(Collectors.toSet()); + + Stream.concat(join.getHashJoinConjuncts().stream(), join.getOtherJoinConjuncts().stream()) + .flatMap(expr -> expr.getInputSlots().stream()) + .forEach(usedSlots::add); + join.getMarkJoinSlotReference().ifPresent(usedSlots::add); + + List leftNewProjections = new ArrayList<>(); + List rightNewProjections = new ArrayList<>(); + + for (Slot usedSlot : usedSlots) { + if (leftOutput.contains(usedSlot)) { + leftNewProjections.add(usedSlot); + } else if (rightOutput.contains(usedSlot)) { + rightNewProjections.add(usedSlot); + } + } + + Plan newLeft; + if (left instanceof PhysicalDistribute) { + newLeft = leftNewProjections.size() != leftOutput.size() && !leftNewProjections.isEmpty() + ? left.withChildren(new PhysicalProject<>(leftNewProjections, + left.getLogicalProperties(), left.child(0))) + : left; + } else { + newLeft = leftNewProjections.size() != leftOutput.size() && !leftNewProjections.isEmpty() + ? new PhysicalProject<>(leftNewProjections, left.getLogicalProperties(), + left).copyStatsAndGroupIdFrom((AbstractPhysicalPlan) left) + : left; + } + Plan newRight; + if (right instanceof PhysicalDistribute) { + newRight = rightNewProjections.size() != rightOutput.size() && !rightNewProjections.isEmpty() + ? right.withChildren(new PhysicalProject<>(rightNewProjections, + right.getLogicalProperties(), right.child(0))) + : right; + } else { + newRight = rightNewProjections.size() != rightOutput.size() && !rightNewProjections.isEmpty() + ? new PhysicalProject<>(rightNewProjections, right.getLogicalProperties(), + right).copyStatsAndGroupIdFrom((AbstractPhysicalPlan) right) + : right; + } + + if (newLeft != left || newRight != right) { + return (PhysicalProject) project.withChildren(join.withChildren(newLeft, newRight)); + } else { + return project; + } + } + return project; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index c6e4b245447a1ff..c5b2cf8456c2865 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -59,6 +59,7 @@ public List getProcessors() { // add processor if we need Builder builder = ImmutableList.builder(); builder.add(new PushdownFilterThroughProject()); + builder.add(new ColumnPruningPostProcessor()); builder.add(new MergeProjectPostProcessor()); builder.add(new RecomputeLogicalPropertiesProcessor()); builder.add(new AddOffsetIntoDistribute()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java index 458682273d16fd1..7201934b031d9e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -79,7 +79,8 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN) topN.withChildren(child)).copyStatsAndGroupIdFrom(topN); + topN = (PhysicalTopN) ((PhysicalTopN) topN.withChildren( + child)).copyStatsAndGroupIdFrom(topN); } return topN; } else if (topN.getSortPhase() == SortPhase.MERGE_SORT) { @@ -94,7 +95,8 @@ public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN rewrittenTopN = rewriteTopN(topN.getPhysicalTopN()); if (topN.getPhysicalTopN() != rewrittenTopN) { - topN = topN.withPhysicalTopN(rewrittenTopN).copyStatsAndGroupIdFrom(topN); + topN = (PhysicalDeferMaterializeTopN) topN.withPhysicalTopN(rewrittenTopN) + .copyStatsAndGroupIdFrom(topN); } return topN; } else if (topN.getSortPhase() == SortPhase.MERGE_SORT) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 6a0ccd093418b21..fc8a3c27c24569c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -66,7 +66,7 @@ public Plan getExplainPlan(ConnectContext ctx) { return this; } - public T copyStatsAndGroupIdFrom(T from) { + public AbstractPhysicalPlan copyStatsAndGroupIdFrom(T from) { T newPlan = (T) withPhysicalPropertiesAndStats( from.getPhysicalProperties(), from.getStats()); newPlan.setMutableState(MutableState.KEY_GROUP, from.getGroupIdAsString()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java new file mode 100644 index 000000000000000..0e65aa6d582875e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.postprocess; + +import org.apache.doris.nereids.processor.post.ColumnPruningPostProcessor; +import org.apache.doris.nereids.rules.rewrite.InferFilterNotNull; +import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.util.LogicalPlanBuilder; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.nereids.util.PlanConstructor; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ColumnPruningPostProcessorTest implements MemoPatternMatchSupported { + private final LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + private final LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0); + + @Test + void test() { + LogicalPlan plan = new LogicalPlanBuilder(scan1) + .join(scan2, JoinType.INNER_JOIN, ImmutableList.of()) + .project(ImmutableList.of(0, 2)) + .build(); + + PhysicalPlan physicalPlan = PlanChecker.from(MemoTestUtils.createConnectContext(), plan) + .applyTopDown(new InferFilterNotNull()) + .implement() + .getPhysicalPlan(); + + ColumnPruningPostProcessor processor = new ColumnPruningPostProcessor(); + PhysicalPlan newPlan = (PhysicalPlan) physicalPlan.accept(processor, null); + + Assertions.assertTrue(newPlan instanceof PhysicalProject); + Assertions.assertTrue(newPlan.child(0) instanceof PhysicalNestedLoopJoin); + Assertions.assertTrue(newPlan.child(0).child(0) instanceof PhysicalProject); + Assertions.assertTrue(newPlan.child(0).child(1) instanceof PhysicalProject); + } +} diff --git a/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out b/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out index 04f46355d63a6d6..a25a04a7d31c541 100644 --- a/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out +++ b/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out @@ -14,19 +14,20 @@ PhysicalResultSink ------------------PhysicalDistribute --------------------PhysicalProject ----------------------hashJoin[INNER_JOIN](lineorder.lo_orderdate = dates.d_datekey) -------------------------hashJoin[INNER_JOIN](lineorder.lo_partkey = part.p_partkey) ---------------------------PhysicalDistribute -----------------------------hashJoin[INNER_JOIN](lineorder.lo_suppkey = supplier.s_suppkey) -------------------------------PhysicalProject ---------------------------------PhysicalOlapScan[lineorder] -------------------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------hashJoin[INNER_JOIN](lineorder.lo_partkey = part.p_partkey) +----------------------------PhysicalDistribute +------------------------------hashJoin[INNER_JOIN](lineorder.lo_suppkey = supplier.s_suppkey) --------------------------------PhysicalProject -----------------------------------filter((supplier.s_nation = 'UNITED STATES')) -------------------------------------PhysicalOlapScan[supplier] ---------------------------PhysicalDistribute -----------------------------PhysicalProject -------------------------------filter((part.p_category = 'MFGR#14')) ---------------------------------PhysicalOlapScan[part] +----------------------------------PhysicalOlapScan[lineorder] +--------------------------------PhysicalDistribute +----------------------------------PhysicalProject +------------------------------------filter((supplier.s_nation = 'UNITED STATES')) +--------------------------------------PhysicalOlapScan[supplier] +----------------------------PhysicalDistribute +------------------------------PhysicalProject +--------------------------------filter((part.p_category = 'MFGR#14')) +----------------------------------PhysicalOlapScan[part] ------------------------PhysicalDistribute --------------------------PhysicalProject ----------------------------filter(d_year IN (1997, 1998))