From adb58e1fedba52b0bc7fd231d33986a041d7bc46 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 5 Nov 2024 18:26:39 +0800 Subject: [PATCH] update --- .../clickhouse/CHSparkPlanExecApi.scala | 28 +------------------ .../clickhouse/CHTransformerApi.scala | 11 +++++--- .../CHHashAggregateExecTransformer.scala | 1 + .../Parser/SerializedPlanParser.cpp | 1 - 4 files changed, 9 insertions(+), 32 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index b04a2def0f35..190fcb13eaeb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -159,45 +159,19 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan): HashAggregateExecBaseTransformer = { - logError(s"xxx aggregateExpressions:$aggregateExpressions") - logError(s"xxx aggregateAttributes:$aggregateAttributes") - logError(s"xxx resultExpressions:$resultExpressions") - logError(s"xxx agg expr to result: ${aggregateExpressions.map(_.resultAttribute)}") - logError( - s"xxx agg:" + - s"${aggregateExpressions.map(e => e.aggregateFunction.aggBufferAttributes.length)}") - aggregateExpressions.foreach { - e => logError(s"xxx agg fun:$e, ${e.aggregateFunction.aggBufferAttributes}") - } - val modes = aggregateExpressions.map(_.mode) - logError(s"xxx modes: $modes") val replacedResultExpressions = CHHashAggregateExecTransformer.getCHAggregateResultExpressions( groupingExpressions, aggregateExpressions, resultExpressions) - logError(s"xxx adjust agg output: $replacedResultExpressions") - val agg1 = CHHashAggregateExecTransformer( - requiredChildDistributionExpressions, - groupingExpressions.distinct, - aggregateExpressions, - aggregateAttributes, - initialInputBufferOffset, - resultExpressions.distinct, - // replacedResultExpressions, - child - ) - logError(s"xxx agg output: ${agg1.output}") - val agg = CHHashAggregateExecTransformer( + CHHashAggregateExecTransformer( requiredChildDistributionExpressions, groupingExpressions.distinct, aggregateExpressions, aggregateAttributes, initialInputBufferOffset, - // resultExpressions.distinct, replacedResultExpressions.distinct, child ) - agg } /** Generate HashAggregateExecPullOutHelper */ diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala index e5b71825853b..bf909c52ac20 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala @@ -177,10 +177,13 @@ class CHTransformerApi extends TransformerApi with Logging { // output name will be different from grouping expressions, // so using output attribute instead of grouping expression val groupingExpressions = hash.output.splitAt(hash.groupingExpressions.size)._1 - val aggResultAttributes = CHHashAggregateExecTransformer.getAggregateResultAttributes( - groupingExpressions, - hash.aggregateExpressions - ) + val aggResultAttributes = CHHashAggregateExecTransformer + .getCHAggregateResultExpressions( + groupingExpressions, + hash.aggregateExpressions, + hash.resultExpressions + ) + .map(_.toAttribute) if (aggResultAttributes.size == hash.output.size) { aggResultAttributes } else { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala index fcf6320f8ec4..e92157fc0336 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala @@ -284,6 +284,7 @@ case class CHHashAggregateExecTransformer( "PartialMerge's child not being HashAggregateExecBaseTransformer" + " is unsupported yet") } + val hashAggregateChild = child.asInstanceOf[BaseAggregateExec] val aggTypesExpr = ExpressionConverter .replaceWithExpressionTransformer( aggExpr.resultAttribute, diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 74c1d350014d..d00d184990b6 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -117,7 +117,6 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel "Missmatch result columns size. plan column size {}, subtrait plan name size {}.", cols.getNames().size(), root_rel.root().names_size()); - } for (int i = 0; i < static_cast(cols.getNames().size()); i++) aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i))); actions_dag.project(aliases);