Skip to content

Commit

Permalink
unity agg output
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Nov 5, 2024
1 parent 48d312a commit 3266361
Showing 1 changed file with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,36 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan): HashAggregateExecBaseTransformer =
CHHashAggregateExecTransformer(
child: SparkPlan): HashAggregateExecBaseTransformer = {
logError(s"xxx aggregateExpressions:$aggregateExpressions")
logError(s"xxx aggregateAttributes:$aggregateAttributes")
logError(s"xxx resultExpressions:$resultExpressions")
logError(s"xxx agg expr to result: ${aggregateExpressions.map(_.resultAttribute)}")
logError(
s"xxx agg:" +
s"${aggregateExpressions.map(e => e.aggregateFunction.aggBufferAttributes.length)}")
aggregateExpressions.foreach {
e => logError(s"xxx agg fun:$e, ${e.aggregateFunction.aggBufferAttributes}")
}
val replacedResultExpressions =
groupingExpressions ++ aggregateExpressions.map(_.resultAttribute)
val agg = CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions.distinct,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions.distinct,
// resultExpressions.distinct,
replacedResultExpressions,
child
)
// val xoutputs = CHHashAggregateExecTransformer.getCHAggregateResultAttributes(
// aggregateExpressions,
// resultExpressions.slice(groupingExpressions.length, resultExpressions.length))
// logError(s"xxx adjust agg output: $xoutputs")
logError(s"xxx agg output: ${agg.output}")
agg
}

/** Generate HashAggregateExecPullOutHelper */
override def genHashAggregateExecPullOutHelper(
Expand Down

0 comments on commit 3266361

Please sign in to comment.