Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Nov 6, 2024
1 parent c3e6bdb commit adb58e1
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,45 +159,19 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan): HashAggregateExecBaseTransformer = {
logError(s"xxx aggregateExpressions:$aggregateExpressions")
logError(s"xxx aggregateAttributes:$aggregateAttributes")
logError(s"xxx resultExpressions:$resultExpressions")
logError(s"xxx agg expr to result: ${aggregateExpressions.map(_.resultAttribute)}")
logError(
s"xxx agg:" +
s"${aggregateExpressions.map(e => e.aggregateFunction.aggBufferAttributes.length)}")
aggregateExpressions.foreach {
e => logError(s"xxx agg fun:$e, ${e.aggregateFunction.aggBufferAttributes}")
}
val modes = aggregateExpressions.map(_.mode)
logError(s"xxx modes: $modes")
val replacedResultExpressions = CHHashAggregateExecTransformer.getCHAggregateResultExpressions(
groupingExpressions,
aggregateExpressions,
resultExpressions)
logError(s"xxx adjust agg output: $replacedResultExpressions")
val agg1 = CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions.distinct,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions.distinct,
// replacedResultExpressions,
child
)
logError(s"xxx agg output: ${agg1.output}")
val agg = CHHashAggregateExecTransformer(
CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions.distinct,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
// resultExpressions.distinct,
replacedResultExpressions.distinct,
child
)
agg
}

/** Generate HashAggregateExecPullOutHelper */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,13 @@ class CHTransformerApi extends TransformerApi with Logging {
// output name will be different from grouping expressions,
// so using output attribute instead of grouping expression
val groupingExpressions = hash.output.splitAt(hash.groupingExpressions.size)._1
val aggResultAttributes = CHHashAggregateExecTransformer.getAggregateResultAttributes(
groupingExpressions,
hash.aggregateExpressions
)
val aggResultAttributes = CHHashAggregateExecTransformer
.getCHAggregateResultExpressions(
groupingExpressions,
hash.aggregateExpressions,
hash.resultExpressions
)
.map(_.toAttribute)
if (aggResultAttributes.size == hash.output.size) {
aggResultAttributes
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ case class CHHashAggregateExecTransformer(
"PartialMerge's child not being HashAggregateExecBaseTransformer" +
" is unsupported yet")
}
val hashAggregateChild = child.asInstanceOf[BaseAggregateExec]
val aggTypesExpr = ExpressionConverter
.replaceWithExpressionTransformer(
aggExpr.resultAttribute,
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel
"Missmatch result columns size. plan column size {}, subtrait plan name size {}.",
cols.getNames().size(),
root_rel.root().names_size());
}
for (int i = 0; i < static_cast<int>(cols.getNames().size()); i++)
aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i)));
actions_dag.project(aliases);
Expand Down

0 comments on commit adb58e1

Please sign in to comment.