From f1a0a3e6b489b7e3290e75fab5c427201c5198bb Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Mon, 23 Oct 2023 09:35:53 +0800 Subject: [PATCH] [GLUTEN-3467] Fix 'Names of tuple elements must be unique' error for ch backend (#3468) When there is a `MergeScalarSubqueries` which will create the named_struct with the same name, looks like `{'bloomFilter', BF1, 'bloomFilter', BF2} or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)', count(1)#333L}`, it will cause problem for some backends, e.g. ClickHouse, which cannot tolerate duplicate type names in struct type, so we need to rename 'nameExpr' in the named_struct to make them unique after executing the `MergeScalarSubqueries`. Close #3467. --- .../GlutenClickHouseTPCHParquetSuite.scala | 9 +++ .../BasicPhysicalOperatorTransformer.scala | 71 ++++++++++--------- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala index 5546022699b6..4b041654a394 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetSuite.scala @@ -2140,5 +2140,14 @@ class GlutenClickHouseTPCHParquetSuite extends GlutenClickHouseTPCHAbstractSuite spark.sql("drop table test_tbl_right_3134") } } + + test("GLUTEN-3467: Fix 'Names of tuple elements must be unique' error for ch backend") { + val sql = + """ + |select named_struct('a', r_regionkey, 'b', r_name, 'a', r_comment) as mergedValue + |from region + |""".stripMargin + compareResultsAgainstVanillaSpark(sql, true, { _ => }) + } } // scalastyle:on line.size.limit diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index 8572e4b7f920..44905796c3af 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -306,45 +306,50 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch } object ProjectExecTransformer { private def processProjectExecTransformer( - project: ProjectExecTransformer): ProjectExecTransformer = { - // Special treatment for Project containing all bloom filters: - // If more than one bloom filter, Spark will merge them into a named_struct - // and then broadcast. The named_struct looks like {'bloomFilter',BF1,'bloomFilter',BF2}, - // with two bloom filters sharing same name. This will cause problem for some backends, - // e.g. ClickHouse, which cannot tolerate duplicate type names in struct type - // So we need to rename 'bloomFilter' to make them unique. - if (project.projectList.size == 1) { - val newProjectListHead = project.projectList.head match { - case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]), "mergedValue") => - if ( - !cns.nameExprs.forall( - e => - e.isInstanceOf[Literal] && "bloomFilter".equals(e.asInstanceOf[Literal].toString())) - ) { - null - } else { - val newChildren = children.zipWithIndex.map { - case _ @(_: Literal, index) => - val newLiteral = Literal("bloomFilter" + index / 2) - newLiteral - case other @ (_, _) => other._1 + projectList: Seq[NamedExpression]): Seq[NamedExpression] = { + + // When there is a MergeScalarSubqueries which will create the named_struct with the same name, + // looks like {'bloomFilter', BF1, 'bloomFilter', BF2} + // or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)', count(1)#333L}, + // it will cause problem for some backends, e.g. ClickHouse, + // which cannot tolerate duplicate type names in struct type, + // so we need to rename 'nameExpr' in the named_struct to make them unique + // after executing the MergeScalarSubqueries. + var needToReplace = false + val newProjectList = projectList.map { + pro => + pro match { + case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]), "mergedValue") => + // check whether there are some duplicate names + if (cns.nameExprs.distinct.size == cns.nameExprs.size) { + alias + } else { + val newChildren = children + .grouped(2) + .map { + case Seq(name: Literal, value: NamedExpression) => + val newLiteral = Literal(name.toString() + "#" + value.exprId.id) + Seq(newLiteral, value) + case Seq(name, value) => Seq(name, value) + } + .flatten + .toSeq + needToReplace = true + Alias.apply(CreateNamedStruct(newChildren), "mergedValue")(alias.exprId) } - Alias.apply(CreateNamedStruct(newChildren), "mergedValue")(alias.exprId) - } - case _ => null - } - if (newProjectListHead == null) { - project - } else { - ProjectExecTransformer(Seq(newProjectListHead), project.child) - } + case other: NamedExpression => other + } + } + + if (!needToReplace) { + projectList } else { - project + newProjectList } } def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = - processProjectExecTransformer(new ProjectExecTransformer(projectList, child)) + new ProjectExecTransformer(processProjectExecTransformer(projectList), child) } // An alternatives for UnionExec.