Skip to content

Commit

Permalink
[CORE] Reuse broadcast exchange for different build keys with same ta…
Browse files Browse the repository at this point in the history
…ble (apache#5563)

Vanilla Spark build HashRelation at driver side, so it is build keys sensitive. But we broadcast byte array and build HashRelation at executor side, the build keys are actually meaningless for the broadcast value.

This pr erases the HashedRelationBroadcastMode build keys when do canonicalize. This change allows us reuse broadcast exchange for different build keys with same table.
  • Loading branch information
ulysses-you authored Apr 29, 2024
1 parent aeae0a6 commit 7dad958
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.ExecUtil
import org.apache.spark.sql.expression.{UDFExpression, UDFResolver, UserDefinedAggregateFunction}
Expand Down Expand Up @@ -577,6 +577,19 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ColumnarBuildSideRelation(child.output, serialized.map(_.getSerialized))
}

override def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
mode match {
case hash: HashedRelationBroadcastMode =>
// Node: It's different with vanilla Spark.
// Vanilla Spark build HashRelation at driver side, so it is build keys sensitive.
// But we broadcast byte array and build HashRelation at executor side,
// the build keys are actually meaningless for the broadcast value.
// This change allows us reuse broadcast exchange for different build keys with same table.
hash.copy(key = Seq.empty)
case _ => mode.canonicalized
}
}

/**
* * Expressions.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.gluten.execution
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, InputIteratorTransformer}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}

class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "/tpch-data-parquet-velox"
Expand Down Expand Up @@ -107,4 +109,39 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
}
}
}

test("Reuse broadcast exchange for different build keys with same table") {
withTable("t1", "t2") {
spark.sql("""
|CREATE TABLE t1 USING PARQUET
|AS SELECT id as c1, id as c2 FROM range(10)
|""".stripMargin)

spark.sql("""
|CREATE TABLE t2 USING PARQUET
|AS SELECT id as c1, id as c2 FROM range(3)
|""".stripMargin)

val df = spark.sql("""
|SELECT * FROM t1
|JOIN t2 as tmp1 ON t1.c1 = tmp1.c1 and tmp1.c1 = tmp1.c2
|JOIN t2 as tmp2 on t1.c2 = tmp2.c2 and tmp2.c1 = tmp2.c2
|""".stripMargin)

assert(collect(df.queryExecution.executedPlan) {
case b: BroadcastExchangeExec => b
}.size == 2)

checkAnswer(
df,
Row(2, 2, 2, 2, 2, 2) :: Row(1, 1, 1, 1, 1, 1) :: Row(0, 0, 0, 0, 0, 0) :: Nil)

assert(collect(df.queryExecution.executedPlan) {
case b: ColumnarBroadcastExchangeExec => b
}.size == 1)
assert(collect(df.queryExecution.executedPlan) {
case r @ ReusedExchangeExec(_, _: ColumnarBroadcastExchangeExec) => r
}.size == 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ trait SparkPlanExecApi {
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation

def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
mode.canonicalized
}

/** Create ColumnarWriteFilesExec */
def createColumnarWriteFilesExec(
child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override def doCanonicalize(): SparkPlan = {
ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
val canonicalized =
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down

0 comments on commit 7dad958

Please sign in to comment.