Skip to content

Commit

Permalink
another fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zouyunhe committed Nov 1, 2024
1 parent 4a606ef commit 00652a4
Showing 1 changed file with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,32 +93,29 @@ case class PushdownAggregatePreProjectionAheadExpand(session: SparkSession)
return hashAggregate
}

def projectInputExists(expr: Expression, inputs: Seq[Attribute]): Boolean = {
def replaceProjectInputs(expr: Expression, inputs: Seq[Attribute]): Expression = {
var newChildren = Seq.empty[Expression]
expr.children.foreach {
case a: Attribute =>
var exist = false
for (input <- inputs if input.name.equals(a.name) && input.exprId.equals(a.exprId)) {
exist = true
for (input <- inputs if input.name.equals(a.name)) {
newChildren :+= input
}
return exist
case p: Expression =>
return projectInputExists(p, inputs)
val newChild = replaceProjectInputs(p, inputs)
newChildren :+= newChild
case _ =>
return true
}
true
expr.withNewChildren(newChildren)
}

var newProjectExprs = Seq.empty[NamedExpression]
preProjectExprs.foreach(
p => {
if (!projectInputExists(p, rootChild.output)) {
return hashAggregate
}
})
p =>
newProjectExprs :+= replaceProjectInputs(p, rootChild.output)
.asInstanceOf[NamedExpression])

// The new ahead project node will take rootChild's output and preProjectExprs as the
// the projection expressions.
val aheadProject = ProjectExecTransformer(rootChild.output ++ preProjectExprs, rootChild)
val aheadProject = ProjectExecTransformer(rootChild.output ++ newProjectExprs, rootChild)
val aheadProjectOuput = aheadProject.output
val preProjectOutputAttrs = aheadProjectOuput.filter(
e =>
Expand Down

0 comments on commit 00652a4

Please sign in to comment.