Skip to content

Commit

Permalink
[CORE] Move Spark / columnar rule list to backend code
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 20, 2024
1 parent b7918a9 commit a629d91
Show file tree
Hide file tree
Showing 30 changed files with 996 additions and 870 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CHBackend extends Backend {
override def validatorApi(): ValidatorApi = new CHValidatorApi
override def metricsApi(): MetricsApi = new CHMetricsApi
override def listenerApi(): ListenerApi = new CHListenerApi
override def ruleApi(): RuleApi = new CHRuleApi
override def settings(): BackendSettingsApi = CHBackendSettings
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
import org.apache.spark.util.SparkPlanRules

class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
injectSpark(injector.spark)
injectGluten(injector.gluten)
injectRas(injector.ras)
}
}

private object CHRuleApi {
def injectSpark(injector: RuleInjector.SparkInjector): Unit = {
// Regular Spark rules.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
injector.injectParser(
(spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
injector.injectResolutionRule(
spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf))
injector.injectResolutionRule(
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
injector.injectOptimizerRule(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf))
injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)

}

def injectGluten(injector: RuleInjector.GlutenInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
injector.injectTransform(c => RewriteSortMergeJoinToHashJoinRule.apply(c.session))
SparkPlanRules
.extendedColumnarRules(GlutenConfig.getConf.extendedColumnarTransformRules)
.foreach(each => injector.injectTransform(c => each(c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan()))

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(_ => ColumnarCollapseTransformStages(GlutenConfig.getConf))
SparkPlanRules
.extendedColumnarRules(GlutenConfig.getConf.extendedColumnarPostRules)
.foreach(each => injector.injectTransform(c => each(c.session)))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(GlutenConfig.getConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

def injectRas(injector: RuleInjector.RasInjector): Unit = {
// CH backend doesn't work with RAS at the moment. Inject a rule that aborts any
// execution calls.
injector.inject(
_ =>
new SparkPlanRules.AbortRule(
"Clickhouse backend doesn't yet have RAS support, please try disabling RAS and" +
" rerun the application"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.extension.{CommonSubexpressionEliminateRule, CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteDateTimestampComparisonRule, RewriteSortMergeJoinToHashJoinRule, RewriteToDateExpresstionRule}
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
Expand All @@ -36,18 +34,13 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
Expand Down Expand Up @@ -549,82 +542,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
ClickHouseBuildSideRelation(mode, newOutput, batches.flatten, rowCount, newBuildKeys)
}

/**
* Generate extended DataSourceV2 Strategies. Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedDataSourceV2Strategies(): List[SparkSession => Strategy] = {
List.empty
}

/**
* Generate extended query stage preparation rules.
*
* @return
*/
override def genExtendedQueryStagePrepRules(): List[SparkSession => Rule[SparkPlan]] = {
List(spark => FallbackBroadcastHashJoinPrepQueryStage(spark))
}

/**
* Generate extended Analyzers. Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf),
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
}

/**
* Generate extended Optimizers.
*
* @return
*/
override def genExtendedOptimizers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf),
spark => CHAggregateFunctionRewriteRule(spark),
_ => CountDistinctWithoutExpand,
_ => EqualToRewrite
)
}

/**
* Generate extended columnar pre-rules, in the validation phase.
*
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => FallbackBroadcastHashJoin(spark))

/**
* Generate extended columnar pre-rules.
*
* @return
*/
override def genExtendedColumnarTransformRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => RewriteSortMergeJoinToHashJoinRule(spark))

override def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] = {
List()
}

/**
* Generate extended Strategies.
*
* @return
*/
override def genExtendedStrategies(): List[SparkSession => Strategy] =
List()

override def genInjectExtendedParser()
: List[(SparkSession, ParserInterface) => ParserInterface] = {
List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
}

/** Define backend specfic expression mappings. */
override def extraExpressionMappings: Seq[Sig] = {
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class VeloxBackend extends Backend {
override def validatorApi(): ValidatorApi = new VeloxValidatorApi
override def metricsApi(): MetricsApi = new VeloxMetricsApi
override def listenerApi(): ListenerApi = new VeloxListenerApi
override def ruleApi(): RuleApi = new VeloxRuleApi
override def settings(): BackendSettingsApi = VeloxBackendSettings
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.extension.{ArrowScanReplaceRule, BloomFilterMightContainJointRewriteRule, CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule, RuleInjector}
import org.apache.gluten.extension.columnar.{AddFallbackTagRule, CollapseProjectExecTransformer, EliminateLocalSort, EnsureLocalSortRequirements, ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackMultiCodegens, FallbackOnANSIMode, MergeTwoPhasesHashBaseAggregate, PlanOneRowRelation, RemoveFallbackTagRule, RemoveNativeWriteFilesSortAndProject, RewriteTransformer}
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.util.SparkPlanRules

class VeloxRuleApi extends RuleApi {
import VeloxRuleApi._

override def injectRules(injector: RuleInjector): Unit = {
injectSpark(injector.spark)
injectGluten(injector.gluten)
injectRas(injector.ras)
}
}

private object VeloxRuleApi {
def injectSpark(injector: RuleInjector.SparkInjector): Unit = {
// Regular Spark rules.
injector.injectOptimizerRule(CollectRewriteRule.apply)
injector.injectOptimizerRule(HLLRewriteRule.apply)
UDFResolver.getFunctionSignatures.foreach(injector.injectFunction)
injector.injectPostHocResolutionRule(ArrowConvertorRule.apply)
}

def injectGluten(injector: RuleInjector.GlutenInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
if (GlutenConfig.getConf.enableVeloxFlushablePartialAggregation) {
injector.injectTransform(c => FlushableHashAggregateRule.apply(c.session))
}
SparkPlanRules
.extendedColumnarRules(GlutenConfig.getConf.extendedColumnarTransformRules)
.foreach(each => injector.injectTransform(c => each(c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan()))

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(_ => ColumnarCollapseTransformStages(GlutenConfig.getConf))
SparkPlanRules
.extendedColumnarRules(GlutenConfig.getConf.extendedColumnarPostRules)
.foreach(each => injector.injectTransform(c => each(c.session)))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(GlutenConfig.getConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

def injectRas(injector: RuleInjector.RasInjector): Unit = {
// Gluten RAS: Pre rules.
injector.inject(_ => RemoveTransitions)
injector.inject(c => FallbackOnANSIMode.apply(c.session))
injector.inject(c => PlanOneRowRelation.apply(c.session))
injector.inject(_ => FallbackEmptySchemaRelation())
injector.inject(_ => RewriteSubqueryBroadcast())
injector.inject(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.inject(c => ArrowScanReplaceRule.apply(c.session))
injector.inject(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))

// Gluten RAS: The RAS rule.
injector.inject(c => EnumeratedTransform(c.session, c.outputsColumnar))

// Gluten RAS: Post rules.
injector.inject(_ => RemoveTransitions)
injector.inject(_ => RemoveNativeWriteFilesSortAndProject())
injector.inject(c => RewriteTransformer.apply(c.session))
injector.inject(_ => EnsureLocalSortRequirements)
injector.inject(_ => EliminateLocalSort)
injector.inject(_ => CollapseProjectExecTransformer)
if (GlutenConfig.getConf.enableVeloxFlushablePartialAggregation) {
injector.inject(c => FlushableHashAggregateRule.apply(c.session))
}
SparkPlanRules
.extendedColumnarRules(GlutenConfig.getConf.extendedColumnarTransformRules)
.foreach(each => injector.inject(c => each(c.session)))
injector.inject(c => InsertTransitions(c.outputsColumnar))
injector.inject(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.inject(c => each(c.session)))
injector.inject(_ => ColumnarCollapseTransformStages(GlutenConfig.getConf))
SparkPlanRules
.extendedColumnarRules(GlutenConfig.getConf.extendedColumnarPostRules)
.foreach(each => injector.inject(c => each(c.session)))
injector.inject(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.inject(c => GlutenFallbackReporter(GlutenConfig.getConf, c.session))
injector.inject(_ => RemoveFallbackTagRule())
}
}
Loading

0 comments on commit a629d91

Please sign in to comment.