diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala index 10e36d6bc321..11869dfc4ea5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala @@ -93,32 +93,29 @@ case class PushdownAggregatePreProjectionAheadExpand(session: SparkSession) return hashAggregate } - def projectInputExists(expr: Expression, inputs: Seq[Attribute]): Boolean = { + def replaceProjectInputs(expr: Expression, inputs: Seq[Attribute]): Expression = { + var newChildren = Seq.empty[Expression] expr.children.foreach { case a: Attribute => - var exist = false - for (input <- inputs if input.name.equals(a.name) && input.exprId.equals(a.exprId)) { - exist = true + for (input <- inputs if input.name.equals(a.name)) { + newChildren :+= input } - return exist case p: Expression => - return projectInputExists(p, inputs) + val newChild = replaceProjectInputs(p, inputs) + newChildren :+= newChild case _ => - return true } - true + expr.withNewChildren(newChildren) } - + var newProjectExprs = Seq.empty[NamedExpression] preProjectExprs.foreach( - p => { - if (!projectInputExists(p, rootChild.output)) { - return hashAggregate - } - }) + p => + newProjectExprs :+= replaceProjectInputs(p, rootChild.output) + .asInstanceOf[NamedExpression]) // The new ahead project node will take rootChild's output and preProjectExprs as the // the projection expressions. - val aheadProject = ProjectExecTransformer(rootChild.output ++ preProjectExprs, rootChild) + val aheadProject = ProjectExecTransformer(rootChild.output ++ newProjectExprs, rootChild) val aheadProjectOuput = aheadProject.output val preProjectOutputAttrs = aheadProjectOuput.filter( e =>