Skip to content

Commit

Permalink
[OPPRO-242] Implement a workaround for Velox Anti join (oap-project#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored Sep 16, 2022
1 parent 28dc89b commit 2e89cfc
Show file tree
Hide file tree
Showing 18 changed files with 727 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,31 @@
package io.glutenproject.backendsapi.clickhouse

import scala.collection.mutable.ArrayBuffer

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.ISparkPlanExecApi
import io.glutenproject.execution._
import io.glutenproject.expression.{AliasBaseTransformer, AliasTransformer}
import io.glutenproject.vectorized.{BlockNativeWriter, CHColumnarBatchSerializer}

import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.{
Alias,
Attribute,
AttributeReference,
BoundReference,
Expression,
ExprId,
NamedExpression
}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, ExprId, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.DeltaLogFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{
BuildSideRelation,
ClickHouseBuildSideRelation,
HashedRelationBroadcastMode
}
import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.CHExecUtil
import org.apache.spark.sql.extension.{CHDataSourceV2Strategy, ClickHouseAnalysis}
Expand Down Expand Up @@ -145,6 +133,33 @@ class CHSparkPlanExecApi extends ISparkPlanExecApi with AdaptiveSparkPlanHelper
resultExpressions,
child)

/**
* Generate ShuffledHashJoinExecTransformer.
*/
def genShuffledHashJoinExecTransformer(leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan): ShuffledHashJoinExecTransformer =
CHShuffledHashJoinExecTransformer(
leftKeys, rightKeys, joinType, buildSide, condition, left, right)

/**
* Generate BroadcastHashJoinExecTransformer.
*/
def genBroadcastHashJoinExecTransformer(leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isNullAwareAntiJoin: Boolean = false)
: BroadcastHashJoinExecTransformer = CHBroadcastHashJoinExecTransformer(
leftKeys, rightKeys, joinType, buildSide, condition, left, right)

/**
* Generate Alias transformer.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.glutenproject.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.SparkPlan

case class CHShuffledHashJoinExecTransformer(leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends ShuffledHashJoinExecTransformer(
leftKeys, rightKeys, joinType, buildSide, condition, left, right) {

override protected def withNewChildrenInternal(
newLeft: SparkPlan, newRight: SparkPlan): CHShuffledHashJoinExecTransformer =
copy(left = newLeft, right = newRight)
}

case class CHBroadcastHashJoinExecTransformer(leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isNullAwareAntiJoin: Boolean = false)
extends BroadcastHashJoinExecTransformer(
leftKeys, rightKeys, joinType, buildSide, condition, left, right) {

override protected def withNewChildrenInternal(
newLeft: SparkPlan, newRight: SparkPlan): CHBroadcastHashJoinExecTransformer =
copy(left = newLeft, right = newRight)
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ object DSV2BenchmarkTest extends AdaptiveSparkPlanHelper {

def collectAllJoinSide(executedPlan: SparkPlan): Unit = {
val buildSides = collect(executedPlan) {
case s: ShuffledHashJoinExecTransformer => "Shuffle-" + s.buildSide.toString
case b: BroadcastHashJoinExecTransformer => "Broadcast-" + b.buildSide.toString
case s: ShuffledHashJoinExecTransformer => "Shuffle-" + s.joinBuildSide.toString
case b: BroadcastHashJoinExecTransformer => "Broadcast-" + b.joinBuildSide.toString
case os: ShuffledHashJoinExec => "Shuffle-" + os.buildSide.toString
case ob: BroadcastHashJoinExec => "Broadcast-" + ob.buildSide.toString
case sm: SortMergeJoinExec => "SortMerge-Join"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GlutenClickHouseTPCHNullableColumnarShuffleSuite extends GlutenClickHouseT
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
runTPCHQuery(3) { df =>
val shjBuildLeft = df.queryExecution.executedPlan.collect {
case shj: ShuffledHashJoinExecTransformer if shj.buildSide == BuildLeft => shj
case shj: ShuffledHashJoinExecTransformer if shj.joinBuildSide == BuildLeft => shj
}
assert(shjBuildLeft.size == 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
runTPCHQuery(3) { df =>
val shjBuildLeft = df.queryExecution.executedPlan.collect {
case shj: ShuffledHashJoinExecTransformer if shj.buildSide == BuildLeft => shj
case shj: ShuffledHashJoinExecTransformer if shj.joinBuildSide == BuildLeft => shj
}
assert(shjBuildLeft.size == 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class GlutenClickHouseTPCHParquetAQESuite
runTPCHQuery(3) { df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val shjBuildLeft = collect(df.queryExecution.executedPlan) {
case shj: ShuffledHashJoinExecTransformer if shj.buildSide == BuildLeft => shj
case shj: ShuffledHashJoinExecTransformer if shj.joinBuildSide == BuildLeft => shj
}
assert(shjBuildLeft.size == 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class GlutenClickHouseTPCHParquetSuite extends GlutenClickHouseTPCHAbstractSuite
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
runTPCHQuery(3) { df =>
val shjBuildLeft = df.queryExecution.executedPlan.collect {
case shj: ShuffledHashJoinExecTransformer if shj.buildSide == BuildLeft => shj
case shj: ShuffledHashJoinExecTransformer if shj.joinBuildSide == BuildLeft => shj
}
assert(shjBuildLeft.size == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
runTPCHQuery(3) { df =>
val shjBuildLeft = df.queryExecution.executedPlan.collect {
case shj: ShuffledHashJoinExecTransformer if shj.buildSide == BuildLeft => shj
case shj: ShuffledHashJoinExecTransformer if shj.joinBuildSide == BuildLeft => shj
}
assert(shjBuildLeft.size == 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.glutenproject.columnarbatch.ArrowColumnarBatches
import io.glutenproject.execution._
import io.glutenproject.expression.{AliasBaseTransformer, ArrowConverterUtils, VeloxAliasTransformer}
import io.glutenproject.vectorized.{ArrowColumnarBatchSerializer, ArrowWritableColumnVector}

import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
Expand All @@ -35,8 +34,10 @@ import org.apache.spark.sql.VeloxColumnarRules._
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan, VeloxBuildSideRelation}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
Expand Down Expand Up @@ -113,6 +114,33 @@ class VeloxSparkPlanExecApi extends ISparkPlanExecApi {
resultExpressions,
child)

/**
* Generate ShuffledHashJoinExecTransformer.
*/
def genShuffledHashJoinExecTransformer(leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan): ShuffledHashJoinExecTransformer =
VeloxShuffledHashJoinExecTransformer(
leftKeys, rightKeys, joinType, buildSide, condition, left, right)

/**
* Generate BroadcastHashJoinExecTransformer.
*/
def genBroadcastHashJoinExecTransformer(leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isNullAwareAntiJoin: Boolean = false)
: BroadcastHashJoinExecTransformer = VeloxBroadcastHashJoinExecTransformer(
leftKeys, rightKeys, joinType, buildSide, condition, left, right)

/**
* Generate Alias transformer.
*
Expand Down
Loading

0 comments on commit 2e89cfc

Please sign in to comment.