Skip to content

Commit

Permalink
[GLUTEN-7759][CH]Fix pre project push down in aggregate (apache#7779)
Browse files Browse the repository at this point in the history
* fix pre project

* add test

* another fix

* fix ci

* fix ci

* fix review

---------

Co-authored-by: zouyunhe <[email protected]>
  • Loading branch information
KevinyhZou and zouyunhe authored Nov 7, 2024
1 parent f6a9665 commit 27ab77c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

// If there is an expression (not a attribute) in an aggregation function's
// parameters. It will introduce a pr-projection to calculate the expression
// parameters. It will introduce a pre-projection to calculate the expression
// at first, and make all the parameters be attributes.
// If it's a aggregation with grouping set, this pre-projection is placed after
// expand operator. This is not efficient, we cannot move this pre-projection
Expand Down Expand Up @@ -83,7 +83,7 @@ case class PushdownAggregatePreProjectionAheadExpand(session: SparkSession)
val originInputAttributes = aheadProjectExprs.filter(e => isAttributeOrLiteral(e))

val preProjectExprs = aheadProjectExprs.filter(e => !isAttributeOrLiteral(e))
if (preProjectExprs.length == 0) {
if (preProjectExprs.isEmpty) {
return hashAggregate
}

Expand All @@ -93,11 +93,31 @@ case class PushdownAggregatePreProjectionAheadExpand(session: SparkSession)
return hashAggregate
}

def projectInputExists(expr: Expression, inputs: Seq[Attribute]): Boolean = {
expr.children.foreach {
case a: Attribute =>
return inputs.indexOf(a) != -1
case p: Expression =>
return projectInputExists(p, inputs)
case _ =>
return true
}
true
}

val couldPushDown = preProjectExprs.forall {
case p: Expression => projectInputExists(p, rootChild.output)
case _ => true
}

if (!couldPushDown) {
return hashAggregate;
}

// The new ahead project node will take rootChild's output and preProjectExprs as the
// the projection expressions.
val aheadProject = ProjectExecTransformer(rootChild.output ++ preProjectExprs, rootChild)
val aheadProjectOuput = aheadProject.output

val preProjectOutputAttrs = aheadProjectOuput.filter(
e =>
!originInputAttributes.exists(_.exprId.equals(e.asInstanceOf[NamedExpression].exprId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3067,5 +3067,30 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
}

test("GLUTEN-7759: Fix bug of agg pre-project push down") {
val table_create_sql =
"create table test_tbl_7759(id bigint, name string, day string) using parquet"
val insert_data_sql =
"insert into test_tbl_7759 values(1, 'a123', '2024-11-01'),(2, 'a124', '2024-11-01')"
val query_sql =
"""
|select distinct day, name from(
|select '2024-11-01' as day
|,coalesce(name,'all') name
|,cnt
|from
|(
|select count(distinct id) as cnt, name
|from test_tbl_7759
|group by name
|with cube
|)) limit 10
|""".stripMargin
spark.sql(table_create_sql)
spark.sql(insert_data_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
spark.sql("drop table test_tbl_7759")
}
}
// scalastyle:on line.size.limit

0 comments on commit 27ab77c

Please sign in to comment.