Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6882][CORE] Move Spark / columnar rule list to backend code #6931

Merged
merged 8 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CHBackend extends Backend {
override def validatorApi(): ValidatorApi = new CHValidatorApi
override def metricsApi(): MetricsApi = new CHMetricsApi
override def listenerApi(): ListenerApi = new CHListenerApi
override def ruleApi(): RuleApi = new CHRuleApi
override def settings(): BackendSettingsApi = CHBackendSettings
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
import org.apache.spark.util.SparkPlanRules

class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
}
}

private object CHRuleApi {
def injectSpark(injector: SparkInjector): Unit = {
// Regular Spark rules.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
injector.injectParser(
(spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
injector.injectResolutionRule(
spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf))
injector.injectResolutionRule(
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
injector.injectOptimizerRule(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf))
injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)
}

def injectLegacy(injector: LegacyInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
injector.injectTransform(c => RewriteSortMergeJoinToHashJoinRule.apply(c.session))
injector.injectTransform(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan()))

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectTransform(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

def injectRas(injector: RasInjector): Unit = {
// CH backend doesn't work with RAS at the moment. Inject a rule that aborts any
// execution calls.
injector.inject(
_ =>
new SparkPlanRules.AbortRule(
"Clickhouse backend doesn't yet have RAS support, please try disabling RAS and" +
" rerunning the application"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.extension.{CommonSubexpressionEliminateRule, CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteDateTimestampComparisonRule, RewriteSortMergeJoinToHashJoinRule, RewriteToDateExpresstionRule}
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
Expand All @@ -36,18 +34,13 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
Expand Down Expand Up @@ -549,82 +542,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
ClickHouseBuildSideRelation(mode, newOutput, batches.flatten, rowCount, newBuildKeys)
}

/**
* Generate extended DataSourceV2 Strategies. Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedDataSourceV2Strategies(): List[SparkSession => Strategy] = {
List.empty
}

/**
* Generate extended query stage preparation rules.
*
* @return
*/
override def genExtendedQueryStagePrepRules(): List[SparkSession => Rule[SparkPlan]] = {
List(spark => FallbackBroadcastHashJoinPrepQueryStage(spark))
}

/**
* Generate extended Analyzers. Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf),
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
}

/**
* Generate extended Optimizers.
*
* @return
*/
override def genExtendedOptimizers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf),
spark => CHAggregateFunctionRewriteRule(spark),
_ => CountDistinctWithoutExpand,
_ => EqualToRewrite
)
}

/**
* Generate extended columnar pre-rules, in the validation phase.
*
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => FallbackBroadcastHashJoin(spark))

/**
* Generate extended columnar pre-rules.
*
* @return
*/
override def genExtendedColumnarTransformRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => RewriteSortMergeJoinToHashJoinRule(spark))

override def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] = {
List()
}

/**
* Generate extended Strategies.
*
* @return
*/
override def genExtendedStrategies(): List[SparkSession => Strategy] =
List()

override def genInjectExtendedParser()
: List[(SparkSession, ParserInterface) => ParserInterface] = {
List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
}

/** Define backend specfic expression mappings. */
override def extraExpressionMappings: Seq[Sig] = {
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class VeloxBackend extends Backend {
override def validatorApi(): ValidatorApi = new VeloxValidatorApi
override def metricsApi(): MetricsApi = new VeloxMetricsApi
override def listenerApi(): ListenerApi = new VeloxListenerApi
override def ruleApi(): RuleApi = new VeloxRuleApi
override def settings(): BackendSettingsApi = VeloxBackendSettings
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.util.SparkPlanRules

class VeloxRuleApi extends RuleApi {
import VeloxRuleApi._

override def injectRules(injector: RuleInjector): Unit = {
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
}
}

private object VeloxRuleApi {
def injectSpark(injector: SparkInjector): Unit = {
// Regular Spark rules.
injector.injectOptimizerRule(CollectRewriteRule.apply)
injector.injectOptimizerRule(HLLRewriteRule.apply)
UDFResolver.getFunctionSignatures.foreach(injector.injectFunction)
injector.injectPostHocResolutionRule(ArrowConvertorRule.apply)
}

def injectLegacy(injector: LegacyInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
injector.injectTransform(c => FlushableHashAggregateRule.apply(c.session))
injector.injectTransform(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan()))

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectTransform(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

def injectRas(injector: RasInjector): Unit = {
// Gluten RAS: Pre rules.
injector.inject(_ => RemoveTransitions)
injector.inject(c => FallbackOnANSIMode.apply(c.session))
injector.inject(c => PlanOneRowRelation.apply(c.session))
injector.inject(_ => FallbackEmptySchemaRelation())
injector.inject(_ => RewriteSubqueryBroadcast())
injector.inject(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.inject(c => ArrowScanReplaceRule.apply(c.session))
injector.inject(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))

// Gluten RAS: The RAS rule.
injector.inject(c => EnumeratedTransform(c.session, c.outputsColumnar))

// Gluten RAS: Post rules.
injector.inject(_ => RemoveTransitions)
injector.inject(_ => RemoveNativeWriteFilesSortAndProject())
injector.inject(c => RewriteTransformer.apply(c.session))
injector.inject(_ => EnsureLocalSortRequirements)
injector.inject(_ => EliminateLocalSort)
injector.inject(_ => CollapseProjectExecTransformer)
injector.inject(c => FlushableHashAggregateRule.apply(c.session))
injector.inject(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))
injector.inject(c => InsertTransitions(c.outputsColumnar))
injector.inject(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.inject(c => each(c.session)))
injector.inject(c => ColumnarCollapseTransformStages(c.conf))
injector.inject(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))
injector.inject(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.inject(c => GlutenFallbackReporter(c.conf, c.session))
injector.inject(_ => RemoveFallbackTagRule())
}
}
Loading
Loading