Skip to content

Commit

Permalink
[VL] Support date type in window range frame (#6653)
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 authored Aug 1, 2024
1 parent 49e2d17 commit edae5b8
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
Seq("sort", "streaming").foreach {
windowType =>
withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> windowType) {
runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_commitdate" +
" RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) from lineitem ") {
checkSparkOperatorMatch[WindowExecTransformer]
}

runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,7 @@ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper {
}
case _ => false
}.isDefined) ||
window.windowExpression.exists(_.find {
case we: WindowExpression =>
we.windowSpec.frameSpecification match {
case swf: SpecifiedWindowFrame
if needPreComputeRangeFrame(swf) && supportPreComputeRangeFrame(
we.windowSpec.orderSpec) =>
true
case _ => false
}
case _ => false
}.isDefined)
windowNeedPreComputeRangeFrame(window)
case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
val window = SparkShimLoader.getSparkShims
.getWindowGroupLimitExecShim(plan)
Expand Down Expand Up @@ -176,14 +166,16 @@ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper {

case window: WindowExec if needsPreProject(window) =>
val expressionMap = new mutable.HashMap[Expression, NamedExpression]()
// Handle orderSpec.
val newOrderSpec = getNewSortOrder(window.orderSpec, expressionMap)

// Handle partitionSpec.
// Handle foldable orderSpec and foldable partitionSpec. Spark analyzer rule
// ExtractWindowExpressions will extract expressions from non-foldable orderSpec and
// partitionSpec.
var newOrderSpec = getNewSortOrder(window.orderSpec, expressionMap)
val newPartitionSpec =
window.partitionSpec.map(replaceExpressionWithAttribute(_, expressionMap))

// Handle windowExpressions.
newOrderSpec = rewriteOrderSpecs(window, newOrderSpec, expressionMap)

val newWindowExpressions = window.windowExpression.toIndexedSeq.map {
_.transform {
case we: WindowExpression => rewriteWindowExpression(we, newOrderSpec, expressionMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction}
import org.apache.spark.sql.execution.aggregate._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType, ShortType}

import java.sql.Date
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
Expand Down Expand Up @@ -161,14 +163,32 @@ trait PullOutProjectHelper {
case _: PreComputeRangeFrameBound => bound
case _ if !bound.foldable => bound
case _ if bound.foldable =>
val orderExpr = if (expressionMap.contains(orderSpec.child)) {
expressionMap(orderSpec.child).asInstanceOf[Alias].child
} else {
orderSpec.child
}
val a = expressionMap
.getOrElseUpdate(
bound.canonicalized,
Alias(Add(orderSpec.child, bound), generatePreAliasName)())
Alias(Add(orderExpr, bound), generatePreAliasName)())
PreComputeRangeFrameBound(a.asInstanceOf[Alias], bound)
}
}

protected def windowNeedPreComputeRangeFrame(w: WindowExec): Boolean =
w.windowExpression.exists(_.find {
case we: WindowExpression =>
we.windowSpec.frameSpecification match {
case swf: SpecifiedWindowFrame
if needPreComputeRangeFrame(swf) && supportPreComputeRangeFrame(
we.windowSpec.orderSpec) =>
true
case _ => false
}
case _ => false
}.isDefined)

protected def needPreComputeRangeFrame(swf: SpecifiedWindowFrame): Boolean = {
BackendsApiManager.getSettings.needPreComputeRangeFrameBoundary &&
swf.frameType == RangeFrame &&
Expand All @@ -185,6 +205,36 @@ trait PullOutProjectHelper {
}
}

/**
* Convert DateType to IntType for orderSpec if needPreComputeRangeFrame, because spark's frame
* type does not support DateType. It does not affect the correctness of sort.
*/
protected def rewriteOrderSpecs(
window: WindowExec,
orderSpecs: Seq[SortOrder],
expressionMap: mutable.HashMap[Expression, NamedExpression]): Seq[SortOrder] = {
if (windowNeedPreComputeRangeFrame(window)) {
// This is guaranteed by Spark, but we still check it here
if (orderSpecs.size != 1) {
throw new GlutenException(
s"A range window frame with value boundaries expects one and only one " +
s"order by expression: ${orderSpecs.mkString(",")}")
}
val orderSpec = orderSpecs.head
orderSpec.child.dataType match {
case DateType =>
val alias = Alias(
DateDiff(orderSpec.child, Literal(Date.valueOf("1970-01-01"))),
generatePreAliasName)()
expressionMap.getOrElseUpdate(alias.toAttribute, alias)
Seq(orderSpec.copy(child = alias.toAttribute))
case _ => orderSpecs
}
} else {
orderSpecs
}
}

protected def rewriteWindowExpression(
we: WindowExpression,
orderSpecs: Seq[SortOrder],
Expand All @@ -202,12 +252,6 @@ trait PullOutProjectHelper {

val newWindowSpec = we.windowSpec.frameSpecification match {
case swf: SpecifiedWindowFrame if needPreComputeRangeFrame(swf) =>
// This is guaranteed by Spark, but we still check it here
if (orderSpecs.size != 1) {
throw new GlutenException(
s"A range window frame with value boundaries expects one and only one " +
s"order by expression: ${orderSpecs.mkString(",")}")
}
val orderSpec = orderSpecs.head
val lowerFrameCol = preComputeRangeFrameBoundary(swf.lower, orderSpec, expressionMap)
val upperFrameCol = preComputeRangeFrameBoundary(swf.upper, orderSpec, expressionMap)
Expand Down

0 comments on commit edae5b8

Please sign in to comment.