From b7dd9a83ca8956bb8d597b9643b247528fd95cf6 Mon Sep 17 00:00:00 2001 From: Mingliang Zhu Date: Fri, 2 Aug 2024 16:52:37 +0800 Subject: [PATCH] [VL] Eliminate pre local sort after offload date type range frame window (#6667) --- .../org/apache/gluten/execution/TestOperator.scala | 12 +++++++++--- .../extension/columnar/EliminateLocalSort.scala | 3 +++ .../columnar/enumerated/EnumeratedApplier.scala | 2 +- .../columnar/heuristic/HeuristicApplier.scala | 2 +- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index 7eb9df7be6ca..5ca5087d9ef4 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -351,14 +351,20 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla } test("window expression") { - Seq("sort", "streaming").foreach { - windowType => + Seq(("sort", 0), ("streaming", 1)).foreach { + case (windowType, localSortSize) => withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> windowType) { runQueryAndCompare( "select max(l_partkey) over" + " (partition by l_suppkey order by l_commitdate" + " RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) from lineitem ") { - checkSparkOperatorMatch[WindowExecTransformer] + df => + checkSparkOperatorMatch[WindowExecTransformer](df) + assert( + getExecutedPlan(df).collect { + case s: SortExecTransformer if !s.global => s + }.size == localSortSize + ) } runQueryAndCompare( diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala index 6a5c195e5da1..03e7e4eb73ce 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EliminateLocalSort.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, UnaryEx * - Offload SortAggregate to native hash aggregate * - Offload WindowGroupLimit to native TopNRowNumber * - The columnar window type is `sort` + * - Offload Window which has date type range frame */ object EliminateLocalSort extends Rule[SparkPlan] { private def canEliminateLocalSort(p: SparkPlan): Boolean = p match { @@ -37,6 +38,8 @@ object EliminateLocalSort extends Rule[SparkPlan] { case _: ShuffledHashJoinExecTransformerBase => true case _: WindowGroupLimitExecTransformer => true case _: WindowExecTransformer => true + case s: SortExec if s.global == false => true + case s: SortExecTransformer if s.global == false => true case _ => false } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index 3df0282f8e64..5cf3961c548b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -102,8 +102,8 @@ class EnumeratedApplier(session: SparkSession) List( (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(), (spark: SparkSession) => RewriteTransformer(spark), - (_: SparkSession) => EliminateLocalSort, (_: SparkSession) => EnsureLocalSortRequirements, + (_: SparkSession) => EliminateLocalSort, (_: SparkSession) => CollapseProjectExecTransformer ) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules() ::: diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index 738d67f4b77b..f776a1dcc3cd 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -114,8 +114,8 @@ class HeuristicApplier(session: SparkSession) List( (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(), (spark: SparkSession) => RewriteTransformer(spark), - (_: SparkSession) => EliminateLocalSort, (_: SparkSession) => EnsureLocalSortRequirements, + (_: SparkSession) => EliminateLocalSort, (_: SparkSession) => CollapseProjectExecTransformer ) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules() :::