Skip to content

Commit

Permalink
[GLUTEN-3467] Fix 'Names of tuple elements must be unique' error for …
Browse files Browse the repository at this point in the history
…ch backend (#3468)

When there is a `MergeScalarSubqueries` which will create the named_struct with the same name,
looks like `{'bloomFilter', BF1, 'bloomFilter', BF2} or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)', count(1)#333L}`,
it will cause problem for some backends, e.g. ClickHouse, which cannot tolerate duplicate type names in struct type,
so we need to rename 'nameExpr' in the named_struct to make them unique after executing the `MergeScalarSubqueries`.

Close #3467.
  • Loading branch information
zzcclp authored Oct 23, 2023
1 parent 30d78a1 commit f1a0a3e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2140,5 +2140,14 @@ class GlutenClickHouseTPCHParquetSuite extends GlutenClickHouseTPCHAbstractSuite
spark.sql("drop table test_tbl_right_3134")
}
}

test("GLUTEN-3467: Fix 'Names of tuple elements must be unique' error for ch backend") {
val sql =
"""
|select named_struct('a', r_regionkey, 'b', r_name, 'a', r_comment) as mergedValue
|from region
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
}
// scalastyle:on line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -306,45 +306,50 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
}
object ProjectExecTransformer {
private def processProjectExecTransformer(
project: ProjectExecTransformer): ProjectExecTransformer = {
// Special treatment for Project containing all bloom filters:
// If more than one bloom filter, Spark will merge them into a named_struct
// and then broadcast. The named_struct looks like {'bloomFilter',BF1,'bloomFilter',BF2},
// with two bloom filters sharing same name. This will cause problem for some backends,
// e.g. ClickHouse, which cannot tolerate duplicate type names in struct type
// So we need to rename 'bloomFilter' to make them unique.
if (project.projectList.size == 1) {
val newProjectListHead = project.projectList.head match {
case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]), "mergedValue") =>
if (
!cns.nameExprs.forall(
e =>
e.isInstanceOf[Literal] && "bloomFilter".equals(e.asInstanceOf[Literal].toString()))
) {
null
} else {
val newChildren = children.zipWithIndex.map {
case _ @(_: Literal, index) =>
val newLiteral = Literal("bloomFilter" + index / 2)
newLiteral
case other @ (_, _) => other._1
projectList: Seq[NamedExpression]): Seq[NamedExpression] = {

// When there is a MergeScalarSubqueries which will create the named_struct with the same name,
// looks like {'bloomFilter', BF1, 'bloomFilter', BF2}
// or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)', count(1)#333L},
// it will cause problem for some backends, e.g. ClickHouse,
// which cannot tolerate duplicate type names in struct type,
// so we need to rename 'nameExpr' in the named_struct to make them unique
// after executing the MergeScalarSubqueries.
var needToReplace = false
val newProjectList = projectList.map {
pro =>
pro match {
case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]), "mergedValue") =>
// check whether there are some duplicate names
if (cns.nameExprs.distinct.size == cns.nameExprs.size) {
alias
} else {
val newChildren = children
.grouped(2)
.map {
case Seq(name: Literal, value: NamedExpression) =>
val newLiteral = Literal(name.toString() + "#" + value.exprId.id)
Seq(newLiteral, value)
case Seq(name, value) => Seq(name, value)
}
.flatten
.toSeq
needToReplace = true
Alias.apply(CreateNamedStruct(newChildren), "mergedValue")(alias.exprId)
}
Alias.apply(CreateNamedStruct(newChildren), "mergedValue")(alias.exprId)
}
case _ => null
}
if (newProjectListHead == null) {
project
} else {
ProjectExecTransformer(Seq(newProjectListHead), project.child)
}
case other: NamedExpression => other
}
}

if (!needToReplace) {
projectList
} else {
project
newProjectList
}
}

def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer =
processProjectExecTransformer(new ProjectExecTransformer(projectList, child))
new ProjectExecTransformer(processProjectExecTransformer(projectList), child)
}

// An alternatives for UnionExec.
Expand Down

0 comments on commit f1a0a3e

Please sign in to comment.