Skip to content

Commit

Permalink
Revert BatchScanExec filter pushdown logic
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 20, 2023
1 parent cd47a64 commit 5ee37fc
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.GreaterThan
import org.apache.spark.sql.execution.ScalarSubquery

class VeloxScanSuite extends VeloxWholeStageTransformerSuite {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

protected val veloxTPCHQueries: String = rootPath + "/tpch-queries-velox"
protected val queriesResults: String = rootPath + "queries-output"

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.adaptive.enabled", "false")

override def beforeAll(): Unit = {
super.beforeAll()
}

test("tpch q22 subquery filter pushdown - v1") {
createTPCHNotNullTables()
runTPCHQuery(22, veloxTPCHQueries, queriesResults, compareResult = false, noFallBack = false) {
df =>
val plan = df.queryExecution.executedPlan
val exist = plan.collect { case scan: FileSourceScanExecTransformer => scan }.exists {
scan =>
scan.filterExprs().exists {
case _ @GreaterThan(_, _: ScalarSubquery) => true
case _ => false
}
}
assert(exist)
}
}

test("tpch q22 subquery filter pushdown - v2") {
withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
// Tables must be created here, otherwise v2 scan will not be used.
createTPCHNotNullTables()
runTPCHQuery(
22,
veloxTPCHQueries,
queriesResults,
compareResult = false,
noFallBack = false) {
df =>
val plan = df.queryExecution.executedPlan
val exist = plan.collect { case scan: BatchScanExecTransformer => scan }.exists {
scan =>
scan.filterExprs().exists {
case _ @GreaterThan(_, _: ScalarSubquery) => true
case _ => false
}
}
assert(exist)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.`type`.TypeBuilder
import io.glutenproject.substrait.SubstraitContext
Expand Down Expand Up @@ -412,7 +413,7 @@ object FilterHandler {

// Separate and compare the filter conditions in Scan and Filter.
// Push down the left conditions in Filter into Scan.
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan =
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan =
filter.child match {
case fileSourceScan: FileSourceScanExec =>
val leftFilters =
Expand All @@ -421,7 +422,45 @@ object FilterHandler {
fileSourceScan,
reuseSubquery,
extraFilters = leftFilters)
case batchScan: BatchScanExec =>
val leftFilters = batchScan.scan match {
case fileScan: FileScan =>
getLeftFilters(fileScan.dataFilters, flattenCondition(filter.condition))
case _ =>
// TODO: For data lake format use pushedFilters in SupportsPushDownFilters
flattenCondition(filter.condition)
}
ScanHandler.getBatchScanTransformer(batchScan, leftFilters, reuseSubquery)
case other =>
throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.")
}
}

object ScanHandler {
def getBatchScanTransformer(
batchScan: BatchScanExec,
pushdownFilters: Seq[Expression],
reuseSubquery: Boolean): SparkPlan = {
if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) {
val transformer = ScanTransformerFactory.createBatchScanTransformer(
batchScan,
reuseSubquery,
pushdownFilters = pushdownFilters)
val validationResult = transformer.doValidate()
if (validationResult.isValid) {
transformer
} else {
val newSource = batchScan.copy(runtimeFilters = transformer.runtimeFilters)
TransformHints.tagNotTransformable(newSource, validationResult.reason.get)
newSource
}
} else {
// If filter expressions aren't empty, we need to transform the inner operators,
// and fallback the BatchScanExec itself.
val newSource = batchScan.copy(runtimeFilters = ExpressionConverter
.transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery))
TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.")
newSource
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class BatchScanExecTransformer(
@transient table: Table,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false)
replicatePartitions: Boolean = false,
pushdownFilters: Seq[Expression] = Seq.empty)
extends BatchScanExecShim(output, scan, runtimeFilters, table)
with BasicScanExecTransformer {

Expand All @@ -59,7 +60,7 @@ class BatchScanExecTransformer(

override def filterExprs(): Seq[Expression] = scan match {
case fileScan: FileScan =>
fileScan.dataFilters
fileScan.dataFilters ++ pushdownFilters
case _ =>
throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ object ScanTransformerFactory {
def createBatchScanTransformer(
batchScanExec: BatchScanExec,
reuseSubquery: Boolean,
pushdownFilters: Seq[Expression] = Seq.empty,
validation: Boolean = false): BatchScanExecTransformer = {
val newPartitionFilters = if (validation) {
batchScanExec.runtimeFilters
Expand All @@ -85,11 +86,18 @@ object ScanTransformerFactory {
.asInstanceOf[DataSourceScanTransformerRegister]
.createDataSourceV2Transformer(batchScanExec, newPartitionFilters)
case _ =>
new BatchScanExecTransformer(
batchScanExec.output,
batchScanExec.scan,
newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec))
scan match {
case _: FileScan =>
new BatchScanExecTransformer(
batchScanExec.output,
batchScanExec.scan,
newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec),
pushdownFilters = pushdownFilters
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported scan $scan")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
private def genFilterExec(plan: FilterExec): SparkPlan = {
// FIXME: Filter push-down should be better done by Vanilla Spark's planner or by
// a individual rule.
val scan = plan.child
// Push down the left conditions in Filter into FileSourceScan.
val newChild: SparkPlan = plan.child match {
case scan: FileSourceScanExec =>
val newChild: SparkPlan = scan match {
case _: FileSourceScanExec | _: BatchScanExec =>
TransformHints.getHint(scan) match {
case TRANSFORM_SUPPORTED() =>
val newScan = FilterHandler.applyFilterPushdownToScan(plan, reuseSubquery)
Expand Down Expand Up @@ -538,26 +539,7 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
newSource
}
case plan: BatchScanExec =>
if (ScanTransformerFactory.supportedBatchScan(plan.scan)) {
val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery)
val validationResult = transformer.doValidate()
if (validationResult.isValid) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
transformer
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.")
val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters)
TransformHints.tagNotTransformable(newSource, validationResult.reason.get)
newSource
}
} else {
// If filter expressions aren't empty, we need to transform the inner operators,
// and fallback the BatchScanExec itself.
val newSource = plan.copy(runtimeFilters = ExpressionConverter
.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery))
TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.")
newSource
}
ScanHandler.getBatchScanTransformer(plan, Seq.empty, reuseSubquery)

case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
// TODO: Add DynamicPartitionPruningHiveScanSuite.scala
Expand Down

0 comments on commit 5ee37fc

Please sign in to comment.