Skip to content

Commit

Permalink
[VL] Eliminate pre local sort after offload date type range frame win…
Browse files Browse the repository at this point in the history
…dow (#6667)
  • Loading branch information
zml1206 authored Aug 2, 2024
1 parent 94b79c7 commit b7dd9a8
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,20 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}

test("window expression") {
Seq("sort", "streaming").foreach {
windowType =>
Seq(("sort", 0), ("streaming", 1)).foreach {
case (windowType, localSortSize) =>
withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> windowType) {
runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_commitdate" +
" RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) from lineitem ") {
checkSparkOperatorMatch[WindowExecTransformer]
df =>
checkSparkOperatorMatch[WindowExecTransformer](df)
assert(
getExecutedPlan(df).collect {
case s: SortExecTransformer if !s.global => s
}.size == localSortSize
)
}

runQueryAndCompare(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, UnaryEx
* - Offload SortAggregate to native hash aggregate
* - Offload WindowGroupLimit to native TopNRowNumber
* - The columnar window type is `sort`
* - Offload Window which has date type range frame
*/
object EliminateLocalSort extends Rule[SparkPlan] {
private def canEliminateLocalSort(p: SparkPlan): Boolean = p match {
case _: HashAggregateExecBaseTransformer => true
case _: ShuffledHashJoinExecTransformerBase => true
case _: WindowGroupLimitExecTransformer => true
case _: WindowExecTransformer => true
case s: SortExec if s.global == false => true
case s: SortExecTransformer if s.global == false => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class EnumeratedApplier(session: SparkSession)
List(
(_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EliminateLocalSort,
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => EliminateLocalSort,
(_: SparkSession) => CollapseProjectExecTransformer
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules() :::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class HeuristicApplier(session: SparkSession)
List(
(_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EliminateLocalSort,
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => EliminateLocalSort,
(_: SparkSession) => CollapseProjectExecTransformer
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules() :::
Expand Down

0 comments on commit b7dd9a8

Please sign in to comment.