From 5ee37fcef7666343521f750d1b712c11157a113a Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Wed, 20 Dec 2023 19:09:07 +0800 Subject: [PATCH] Revert BatchScanExec filter pushdown logic --- .../execution/VeloxScanSuite.scala | 78 +++++++++++++++++++ .../BasicPhysicalOperatorTransformer.scala | 41 +++++++++- .../execution/BatchScanExecTransformer.scala | 5 +- .../execution/ScanTransformerFactory.scala | 18 +++-- .../extension/ColumnarOverrides.scala | 26 +------ 5 files changed, 138 insertions(+), 30 deletions(-) create mode 100644 backends-velox/src/test/scala/io/glutenproject/execution/VeloxScanSuite.scala diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxScanSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxScanSuite.scala new file mode 100644 index 000000000000..6bf640c9436e --- /dev/null +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxScanSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.expressions.GreaterThan +import org.apache.spark.sql.execution.ScalarSubquery + +class VeloxScanSuite extends VeloxWholeStageTransformerSuite { + protected val rootPath: String = getClass.getResource("/").getPath + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + protected val veloxTPCHQueries: String = rootPath + "/tpch-queries-velox" + protected val queriesResults: String = rootPath + "queries-output" + + override protected def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.adaptive.enabled", "false") + + override def beforeAll(): Unit = { + super.beforeAll() + } + + test("tpch q22 subquery filter pushdown - v1") { + createTPCHNotNullTables() + runTPCHQuery(22, veloxTPCHQueries, queriesResults, compareResult = false, noFallBack = false) { + df => + val plan = df.queryExecution.executedPlan + val exist = plan.collect { case scan: FileSourceScanExecTransformer => scan }.exists { + scan => + scan.filterExprs().exists { + case _ @GreaterThan(_, _: ScalarSubquery) => true + case _ => false + } + } + assert(exist) + } + } + + test("tpch q22 subquery filter pushdown - v2") { + withSQLConf("spark.sql.sources.useV1SourceList" -> "") { + // Tables must be created here, otherwise v2 scan will not be used. + createTPCHNotNullTables() + runTPCHQuery( + 22, + veloxTPCHQueries, + queriesResults, + compareResult = false, + noFallBack = false) { + df => + val plan = df.queryExecution.executedPlan + val exist = plan.collect { case scan: BatchScanExecTransformer => scan }.exists { + scan => + scan.filterExprs().exists { + case _ @GreaterThan(_, _: ScalarSubquery) => true + case _ => false + } + } + assert(exist) + } + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index 4281038dd44f..b94ab1b1ed5d 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -19,6 +19,7 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} import io.glutenproject.extension.{GlutenPlan, ValidationResult} +import io.glutenproject.extension.columnar.TransformHints import io.glutenproject.metrics.MetricsUpdater import io.glutenproject.substrait.`type`.TypeBuilder import io.glutenproject.substrait.SubstraitContext @@ -412,7 +413,7 @@ object FilterHandler { // Separate and compare the filter conditions in Scan and Filter. // Push down the left conditions in Filter into Scan. - def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan = + def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan = filter.child match { case fileSourceScan: FileSourceScanExec => val leftFilters = @@ -421,7 +422,45 @@ object FilterHandler { fileSourceScan, reuseSubquery, extraFilters = leftFilters) + case batchScan: BatchScanExec => + val leftFilters = batchScan.scan match { + case fileScan: FileScan => + getLeftFilters(fileScan.dataFilters, flattenCondition(filter.condition)) + case _ => + // TODO: For data lake format use pushedFilters in SupportsPushDownFilters + flattenCondition(filter.condition) + } + ScanHandler.getBatchScanTransformer(batchScan, leftFilters, reuseSubquery) case other => throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.") } } + +object ScanHandler { + def getBatchScanTransformer( + batchScan: BatchScanExec, + pushdownFilters: Seq[Expression], + reuseSubquery: Boolean): SparkPlan = { + if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) { + val transformer = ScanTransformerFactory.createBatchScanTransformer( + batchScan, + reuseSubquery, + pushdownFilters = pushdownFilters) + val validationResult = transformer.doValidate() + if (validationResult.isValid) { + transformer + } else { + val newSource = batchScan.copy(runtimeFilters = transformer.runtimeFilters) + TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + newSource + } + } else { + // If filter expressions aren't empty, we need to transform the inner operators, + // and fallback the BatchScanExec itself. + val newSource = batchScan.copy(runtimeFilters = ExpressionConverter + .transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)) + TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.") + newSource + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala index 051ba9779084..8bc1d4bd23df 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala @@ -49,7 +49,8 @@ class BatchScanExecTransformer( @transient table: Table, commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None, applyPartialClustering: Boolean = false, - replicatePartitions: Boolean = false) + replicatePartitions: Boolean = false, + pushdownFilters: Seq[Expression] = Seq.empty) extends BatchScanExecShim(output, scan, runtimeFilters, table) with BasicScanExecTransformer { @@ -59,7 +60,7 @@ class BatchScanExecTransformer( override def filterExprs(): Seq[Expression] = scan match { case fileScan: FileScan => - fileScan.dataFilters + fileScan.dataFilters ++ pushdownFilters case _ => throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported") } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala index 6ac287476657..84222e34e085 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -70,6 +70,7 @@ object ScanTransformerFactory { def createBatchScanTransformer( batchScanExec: BatchScanExec, reuseSubquery: Boolean, + pushdownFilters: Seq[Expression] = Seq.empty, validation: Boolean = false): BatchScanExecTransformer = { val newPartitionFilters = if (validation) { batchScanExec.runtimeFilters @@ -85,11 +86,18 @@ object ScanTransformerFactory { .asInstanceOf[DataSourceScanTransformerRegister] .createDataSourceV2Transformer(batchScanExec, newPartitionFilters) case _ => - new BatchScanExecTransformer( - batchScanExec.output, - batchScanExec.scan, - newPartitionFilters, - table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)) + scan match { + case _: FileScan => + new BatchScanExecTransformer( + batchScanExec.output, + batchScanExec.scan, + newPartitionFilters, + table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec), + pushdownFilters = pushdownFilters + ) + case _ => + throw new UnsupportedOperationException(s"Unsupported scan $scan") + } } } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index d4b1e53bd2fc..d1ee06b161f6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -137,9 +137,10 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) private def genFilterExec(plan: FilterExec): SparkPlan = { // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by // a individual rule. + val scan = plan.child // Push down the left conditions in Filter into FileSourceScan. - val newChild: SparkPlan = plan.child match { - case scan: FileSourceScanExec => + val newChild: SparkPlan = scan match { + case _: FileSourceScanExec | _: BatchScanExec => TransformHints.getHint(scan) match { case TRANSFORM_SUPPORTED() => val newScan = FilterHandler.applyFilterPushdownToScan(plan, reuseSubquery) @@ -538,26 +539,7 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) newSource } case plan: BatchScanExec => - if (ScanTransformerFactory.supportedBatchScan(plan.scan)) { - val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) - val validationResult = transformer.doValidate() - if (validationResult.isValid) { - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - transformer - } else { - logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) - TransformHints.tagNotTransformable(newSource, validationResult.reason.get) - newSource - } - } else { - // If filter expressions aren't empty, we need to transform the inner operators, - // and fallback the BatchScanExec itself. - val newSource = plan.copy(runtimeFilters = ExpressionConverter - .transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)) - TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.") - newSource - } + ScanHandler.getBatchScanTransformer(plan, Seq.empty, reuseSubquery) case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => // TODO: Add DynamicPartitionPruningHiveScanSuite.scala