diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index b00470f9ea4a..0f593a861bd8 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -368,6 +368,7 @@ src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala + src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/stats/*.scala src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala new file mode 100644 index 000000000000..a360fa8d7291 --- /dev/null +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.rules + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.delta.metering.DeltaLogging + +class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession) + extends Rule[LogicalPlan] + with DeltaLogging { + + // For Delta 2.0, it can not support to optimize query with the metadata + override def apply(plan: LogicalPlan): LogicalPlan = plan +} diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala new file mode 100644 index 000000000000..dbb5c4050a2c --- /dev/null +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.rules + +import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot, SubqueryTransformerHelper} +import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.DeltaScanGenerator + +import org.apache.hadoop.fs.Path + +class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession) + extends Rule[LogicalPlan] + with DeltaLogging + with SubqueryTransformerHelper + with OptimizeMetadataOnlyDeltaQuery { + + private val scannedSnapshots = + new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot] + + protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = { + // The first case means that we've fixed the table snapshot for time travel + if (index.isTimeTravelQuery) return index.getSnapshot + OptimisticTransaction + .getActive() + .map(_.getDeltaScanGenerator(index)) + .getOrElse { + // Will be called only when the log is accessed the first time + scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => index.getSnapshot) + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + // Should not be applied to subqueries to avoid duplicate delta jobs. + val isSubquery = isSubqueryRoot(plan) + // Should not be applied to DataSourceV2 write plans, because they'll be planned later + // through a V1 fallback and only that later planning takes place within the transaction. + val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand] + if (isSubquery || isDataSourceV2) { + return plan + } + // when 'stats.skipping' is off, it still use the metadata to optimize query for count/min/max + if ( + spark.sessionState.conf + .getConfString( + CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE, + CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE) + .toBoolean && + !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true) + ) { + optimizeQueryWithMetadata(plan) + } else { + plan + } + } +} diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala new file mode 100644 index 000000000000..21e31d35411e --- /dev/null +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import java.util.Objects + +import scala.collection.mutable + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.files.{TahoeFileIndexWithSnapshot, TahoeLogFileIndex} +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.PROJECT +import org.apache.spark.sql.execution.datasources.LogicalRelation + +/** + * Gluten overwrite Delta: + * + * This file is copied from Delta 2.3.0, it is modified to overcome the following issues: + * 1. Returns the plan directly even if stats.skipping is turned off + */ + +/** + * Before query planning, we prepare any scans over delta tables by pushing + * any projections or filters in allowing us to gather more accurate statistics + * for CBO and metering. + * + * Note the following + * - This rule also ensures that all reads from the same delta log use the same snapshot of log + * thus providing snapshot isolation. + * - If this rule is invoked within an active [[OptimisticTransaction]], then the scans are + * generated using the transaction. + */ +trait PrepareDeltaScanBase extends Rule[LogicalPlan] + with PredicateHelper + with DeltaLogging + with OptimizeMetadataOnlyDeltaQuery + with PreprocessTableWithDVs { self: PrepareDeltaScan => + + /** + * Tracks the first-access snapshots of other logs planned by this rule. The snapshots are + * the keyed by the log's unique id. Note that the lifetime of this rule is a single + * query, therefore, the map tracks the snapshots only within a query. + */ + private val scannedSnapshots = + new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot] + + /** + * Gets the [[DeltaScanGenerator]] for the given log, which will be used to generate + * [[DeltaScan]]s. Every time this method is called on a log within the lifetime of this + * rule (i.e., the lifetime of the query for which this rule was instantiated), the returned + * generator will read a snapshot that is pinned on the first access for that log. + * + * Internally, it will use the snapshot of the file index, the snapshot of the active transaction + * (if any), or the latest snapshot of the given log. + */ + protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = { + // The first case means that we've fixed the table snapshot for time travel + if (index.isTimeTravelQuery) return index.getSnapshot + val scanGenerator = OptimisticTransaction.getActive() + .map(_.getDeltaScanGenerator(index)) + .getOrElse { + // Will be called only when the log is accessed the first time + scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => index.getSnapshot) + } + import PrepareDeltaScanBase._ + if (onGetDeltaScanGeneratorCallback != null) onGetDeltaScanGeneratorCallback(scanGenerator) + scanGenerator + } + + /** + * Helper method to generate a [[PreparedDeltaFileIndex]] + */ + protected def getPreparedIndex( + preparedScan: DeltaScan, + fileIndex: TahoeLogFileIndex): PreparedDeltaFileIndex = { + assert(fileIndex.partitionFilters.isEmpty, + "Partition filters should have been extracted by DeltaAnalysis.") + PreparedDeltaFileIndex( + spark, + fileIndex.deltaLog, + fileIndex.path, + preparedScan, + fileIndex.versionToUse) + } + + /** + * Scan files using the given `filters` and return `DeltaScan`. + * + * Note: when `limitOpt` is non empty, `filters` must contain only partition filters. Otherwise, + * it can contain arbitrary filters. See `DeltaTableScan` for more details. + */ + protected def filesForScan( + scanGenerator: DeltaScanGenerator, + limitOpt: Option[Int], + filters: Seq[Expression], + delta: LogicalRelation): DeltaScan = { + withStatusCode("DELTA", "Filtering files for query") { + if (limitOpt.nonEmpty) { + // If we trigger limit push down, the filters must be partition filters. Since + // there are no data filters, we don't need to apply Generated Columns + // optimization. See `DeltaTableScan` for more details. + return scanGenerator.filesForScan(limitOpt.get, filters) + } + val filtersForScan = + if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) { + filters + } else { + val generatedPartitionFilters = GeneratedColumn.generatePartitionFilters( + spark, scanGenerator.snapshotToScan, filters, delta) + filters ++ generatedPartitionFilters + } + scanGenerator.filesForScan(filtersForScan) + } + } + + /** + * Prepares delta scans sequentially. + */ + protected def prepareDeltaScan(plan: LogicalPlan): LogicalPlan = { + // A map from the canonicalized form of a DeltaTableScan operator to its corresponding delta + // scan. This map is used to avoid fetching duplicate delta indexes for structurally-equal + // delta scans. + val deltaScans = new mutable.HashMap[LogicalPlan, DeltaScan]() + + transformWithSubqueries(plan) { + case scan @ DeltaTableScan(planWithRemovedProjections, filters, fileIndex, + limit, delta) => + val scanGenerator = getDeltaScanGenerator(fileIndex) + val preparedScan = deltaScans.getOrElseUpdate(planWithRemovedProjections.canonicalized, + filesForScan(scanGenerator, limit, filters, delta)) + val preparedIndex = getPreparedIndex(preparedScan, fileIndex) + optimizeGeneratedColumns(scan, preparedIndex, filters, limit, delta) + } + } + + protected def optimizeGeneratedColumns( + scan: LogicalPlan, + preparedIndex: PreparedDeltaFileIndex, + filters: Seq[Expression], + limit: Option[Int], + delta: LogicalRelation): LogicalPlan = { + if (limit.nonEmpty) { + // If we trigger limit push down, the filters must be partition filters. Since + // there are no data filters, we don't need to apply Generated Columns + // optimization. See `DeltaTableScan` for more details. + return DeltaTableUtils.replaceFileIndex(scan, preparedIndex) + } + if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) { + DeltaTableUtils.replaceFileIndex(scan, preparedIndex) + } else { + val generatedPartitionFilters = + GeneratedColumn.generatePartitionFilters(spark, preparedIndex, filters, delta) + val scanWithFilters = + if (generatedPartitionFilters.nonEmpty) { + scan transformUp { + case delta @ DeltaTable(_: TahoeLogFileIndex) => + Filter(generatedPartitionFilters.reduceLeft(And), delta) + } + } else { + scan + } + DeltaTableUtils.replaceFileIndex(scanWithFilters, preparedIndex) + } + } + + override def apply(_plan: LogicalPlan): LogicalPlan = { + var plan = _plan + + // --- modified start + // Should not be applied to subqueries to avoid duplicate delta jobs. + val isSubquery = isSubqueryRoot(plan) + // Should not be applied to DataSourceV2 write plans, because they'll be planned later + // through a V1 fallback and only that later planning takes place within the transaction. + val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand] + if (isSubquery || isDataSourceV2) { + return plan + } + + val shouldPrepareDeltaScan = ( + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING) + ) + val updatedPlan = if (shouldPrepareDeltaScan) { + if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED)) { + plan = optimizeQueryWithMetadata(plan) + } + prepareDeltaScan(plan) + } else { + // If this query is running inside an active transaction and is touching the same table + // as the transaction, then mark that the entire table as tainted to be safe. + OptimisticTransaction.getActive.foreach { txn => + val logsInPlan = plan.collect { case DeltaTable(fileIndex) => fileIndex.deltaLog } + if (logsInPlan.exists(_.isSameLogAs(txn.deltaLog))) { + txn.readWholeTable() + } + } + + // Just return the plan if statistics based skipping is off. + // It will fall back to just partition pruning at planning time. + plan + } + // --- modified end + preprocessTablesWithDVs(updatedPlan) + } + + /** + * This is an extractor object. See https://docs.scala-lang.org/tour/extractor-objects.html. + */ + object DeltaTableScan { + + /** + * The components of DeltaTableScanType are: + * - the plan with removed projections. We remove projections as a plan differentiator + * because it does not affect file listing results. + * - filter expressions collected by `PhysicalOperation` + * - the `TahoeLogFileIndex` of the matched DeltaTable` + * - integer value of limit expression, if any + * - matched `DeltaTable` + */ + private type DeltaTableScanType = + (LogicalPlan, Seq[Expression], TahoeLogFileIndex, Option[Int], LogicalRelation) + + /** + * This is an extractor method (basically, the opposite of a constructor) which takes in an + * object `plan` and tries to give back the arguments as a [[DeltaTableScanType]]. + */ + def unapply(plan: LogicalPlan): Option[DeltaTableScanType] = { + val limitPushdownEnabled = spark.conf.get(DeltaSQLConf.DELTA_LIMIT_PUSHDOWN_ENABLED) + + // Remove projections as a plan differentiator because it does not affect file listing + // results. Plans with the same filters but different projections therefore will not have + // duplicate delta indexes. + def canonicalizePlanForDeltaFileListing(plan: LogicalPlan): LogicalPlan = { + val planWithRemovedProjections = plan.transformWithPruning(_.containsPattern(PROJECT)) { + case p: Project if p.projectList.forall(_.isInstanceOf[AttributeReference]) => p.child + } + planWithRemovedProjections + } + + plan match { + case LocalLimit(IntegerLiteral(limit), + PhysicalOperation(_, filters, delta @ DeltaTable(fileIndex: TahoeLogFileIndex))) + if limitPushdownEnabled && containsPartitionFiltersOnly(filters, fileIndex) => + Some((canonicalizePlanForDeltaFileListing(plan), filters, fileIndex, Some(limit), delta)) + case PhysicalOperation( + _, + filters, + delta @ DeltaTable(fileIndex: TahoeLogFileIndex)) => + val allFilters = fileIndex.partitionFilters ++ filters + Some((canonicalizePlanForDeltaFileListing(plan), allFilters, fileIndex, None, delta)) + + case _ => None + } + } + + private def containsPartitionFiltersOnly( + filters: Seq[Expression], + fileIndex: TahoeLogFileIndex): Boolean = { + val partitionColumns = fileIndex.snapshotAtAnalysis.metadata.partitionColumns + import DeltaTableUtils._ + filters.forall(expr => !containsSubquery(expr) && + isPredicatePartitionColumnsOnly(expr, partitionColumns, spark)) + } + } +} + +class PrepareDeltaScan(protected val spark: SparkSession) + extends PrepareDeltaScanBase + +object PrepareDeltaScanBase { + + /** + * Optional callback function that is called after `getDeltaScanGenerator` is called + * by the PrepareDeltaScan rule. This is primarily used for testing purposes. + */ + @volatile private var onGetDeltaScanGeneratorCallback: DeltaScanGenerator => Unit = _ + + /** + * Run a thunk of code with the given callback function injected into the PrepareDeltaScan rule. + * The callback function is called after `getDeltaScanGenerator` is called + * by the PrepareDeltaScan rule. This is primarily used for testing purposes. + */ + private[delta] def withCallbackOnGetDeltaScanGenerator[T]( + callback: DeltaScanGenerator => Unit)(thunk: => T): T = { + try { + onGetDeltaScanGeneratorCallback = callback + thunk + } finally { + onGetDeltaScanGeneratorCallback = null + } + } +} + +/** + * A [[TahoeFileIndex]] that uses a prepared scan to return the list of relevant files. + * This is injected into a query right before query planning by [[PrepareDeltaScan]] so that + * CBO and metering can accurately understand how much data will be read. + * + * @param versionScanned The version of the table that is being scanned, if a specific version + * has specifically been requested, e.g. by time travel. + */ +case class PreparedDeltaFileIndex( + override val spark: SparkSession, + override val deltaLog: DeltaLog, + override val path: Path, + preparedScan: DeltaScan, + versionScanned: Option[Long]) + extends TahoeFileIndexWithSnapshot(spark, deltaLog, path, preparedScan.scannedSnapshot) + with DeltaLogging { + + /** + * Returns all matching/valid files by the given `partitionFilters` and `dataFilters` + */ + override def matchingFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[AddFile] = { + val currentFilters = ExpressionSet(partitionFilters ++ dataFilters) + val (addFiles, eventData) = if (currentFilters == preparedScan.allFilters || + currentFilters == preparedScan.filtersUsedForSkipping) { + // [[DeltaScan]] was created using `allFilters` out of which only `filtersUsedForSkipping` + // filters were used for skipping while creating the DeltaScan. + // If currentFilters is same as allFilters, then no need to recalculate files and we can use + // previous results. + // If currentFilters is same as filtersUsedForSkipping, then also we don't need to recalculate + // files as [[DeltaScan.files]] were calculates using filtersUsedForSkipping only. So if we + // recalculate, we will get same result. So we should use previous result in this case also. + val eventData = Map( + "reused" -> true, + "currentFiltersSameAsPreparedAllFilters" -> (currentFilters == preparedScan.allFilters), + "currentFiltersSameAsPreparedFiltersUsedForSkipping" -> + (currentFilters == preparedScan.filtersUsedForSkipping) + ) + (preparedScan.files.distinct, eventData) + } else { + logInfo( + s""" + |Prepared scan does not match actual filters. Reselecting files to query. + |Prepared: ${preparedScan.allFilters} + |Actual: ${currentFilters} + """.stripMargin) + val eventData = Map( + "reused" -> false, + "preparedAllFilters" -> preparedScan.allFilters.mkString(","), + "preparedFiltersUsedForSkipping" -> preparedScan.filtersUsedForSkipping.mkString(","), + "currentFilters" -> currentFilters.mkString(",") + ) + val files = preparedScan.scannedSnapshot.filesForScan(partitionFilters ++ dataFilters).files + (files, eventData) + } + recordDeltaEvent(deltaLog, + opType = "delta.preparedDeltaFileIndex.reuseSkippingResult", + data = eventData) + addFiles + } + + /** + * Returns the list of files that will be read when scanning this relation. This call may be + * very expensive for large tables. + */ + override def inputFiles: Array[String] = + preparedScan.files.map(f => absolutePath(f.path).toString).toArray + + /** Refresh any cached file listings */ + override def refresh(): Unit = { } + + /** Sum of table file sizes, in bytes */ + override def sizeInBytes: Long = + preparedScan.scanned.bytesCompressed + .getOrElse(spark.sessionState.conf.defaultSizeInBytes) + + override def equals(other: Any): Boolean = other match { + case p: PreparedDeltaFileIndex => + p.deltaLog == deltaLog && p.path == path && p.preparedScan == preparedScan && + p.partitionSchema == partitionSchema && p.versionScanned == versionScanned + case _ => false + } + + override def hashCode(): Int = { + Objects.hash(deltaLog, path, preparedScan, partitionSchema, versionScanned) + } + +} diff --git a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala index e747c87c6a67..ba4c21df3a34 100644 --- a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala +++ b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala @@ -27,12 +27,7 @@ import scala.collection.JavaConverters._ class ClickhouseTable( @transient private val _df: Dataset[Row], @transient private val table: ClickHouseTableV2) - extends DeltaTable(_df, table) { - - override def optimize(): DeltaOptimizeBuilder = { - DeltaOptimizeBuilder(table) - } -} + extends DeltaTable(_df, table) {} object ClickhouseTable { diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 2f5824b58092..9097a02b9337 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -196,8 +196,9 @@ class ClickhouseOptimisticTransaction( isOptimize: Boolean, additionalConstraints: Seq[Constraint]): Seq[FileAction] = { - if (isOptimize) + if (isOptimize) { throw new UnsupportedOperationException("Optimize is not supported for ClickHouse") + } hasWritten = true @@ -258,7 +259,7 @@ class ClickhouseOptimisticTransaction( DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog) } else { checkInvariants - }*/ + } */ val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) { @@ -304,10 +305,11 @@ class ClickhouseOptimisticTransaction( committer.addedStatuses }) .filter { - // In some cases, we can write out an empty `inputData`. Some examples of this (though, they - // may be fixed in the future) are the MERGE command when you delete with empty source, or - // empty target, or on disjoint tables. This is hard to catch before the write without - // collecting the DF ahead of time. Instead, we can return only the AddFiles that + // In some cases, we can write out an empty `inputData`. Some examples of this (though, + // they may be fixed in the future) are the MERGE command when you delete with empty + // source, or empty target, or on disjoint tables. This is hard to catch before + // the write without collecting the DF ahead of time. Instead, + // we can return only the AddFiles that // a) actually add rows, or // b) don't have any stats so we don't know the number of rows at all case a: AddFile => a.numLogicalRecords.forall(_ > 0) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala index bac5231309b8..f64de28f4214 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala @@ -60,7 +60,7 @@ import org.apache.spark.util._ /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0, it is modified to overcome the following issues: + * This file is copied from Delta 3.2.1, it is modified to overcome the following issues: * 1. return ClickhouseOptimisticTransaction * 2. return DeltaMergeTreeFileFormat * 3. create HadoopFsRelation with the bucket options diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala index 8836f7c88d23..5bfda914db67 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0. It is modified to overcome the following issues: + * This file is copied from Delta 3.2.1. It is modified to overcome the following issues: * 1. filesForScan() will cache the DeltaScan by the FilterExprsAsKey * 2. filesForScan() should return DeltaScan of AddMergeTreeParts instead of AddFile */ diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 5f6a2dc3d712..d887e7a21b34 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.V1Table import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable} import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.sources.DeltaDataSource @@ -89,13 +88,6 @@ class ClickHouseTableV2( ret } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteIntoDeltaBuilder( - this, - info.options, - spark.sessionState.conf.useNullsForMissingDefaultColumnValues) - } - def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( protocol, diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala index dec1f4b9c3f5..0a25346fc6c3 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.types.LongType /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0. It is modified to overcome the following issues: + * This file is copied from Delta 3.2.1. It is modified to overcome the following issues: * 1. In Clickhouse backend, we can't implement input_file_name() correctly, we can only implement * it so that it return a a list of filenames (concated by ','). */ diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 5b2170220228..439111df1b1c 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -49,7 +49,7 @@ import org.apache.spark.util.{SystemClock, ThreadUtils} /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0. It is modified in: + * This file is copied from Delta 3.2.1. It is modified in: * 1. getDeltaTable supports to get ClickHouseTableV2 * 2. runOptimizeBinJobClickhouse * 3. groupFilesIntoBinsClickhouse diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala index 9a7fb96775f0..4e75b8461970 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.types.LongType /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0. It is modified to overcome the following issues: + * This file is copied from Delta 3.2.1. It is modified to overcome the following issues: * 1. In Clickhouse backend, we can't implement input_file_name() correctly, we can only implement * it so that it return a a list of filenames (concated by ','). */ diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 5d05bdb86896..7a350ae4d594 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0. It is modified to overcome the following issues: + * This file is copied from Delta 3.2.1. It is modified to overcome the following issues: * 1. In Gluten, part is a directory, but VacuumCommand assumes part is a file. So we need some * modifications to make it work. * 2. Set the 'gluten.enabledForCurrentThread' to false, now gluten can not support vacuum cmd. @@ -255,7 +255,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val originalEnabledGluten = spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY) // gluten can not support vacuum command - spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") + spark.sparkContext + .setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") // --- modified end val validFiles = diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala index 42a89d427197..aa1f94c5c9f5 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.functions.{coalesce, col, count, input_file_name, li /** * Gluten overwrite Delta: * - * This file is copied from Delta 3.2.0. It is modified to overcome the following issues: + * This file is copied from Delta 3.2.1. It is modified to overcome the following issues: * 1. In Clickhouse backend, we can't implement input_file_name() correctly, we can only implement * it so that it return a a list of filenames (concated by ','). In findTouchedFiles func. */ diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala new file mode 100644 index 000000000000..dbb5c4050a2c --- /dev/null +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.rules + +import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot, SubqueryTransformerHelper} +import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.DeltaScanGenerator + +import org.apache.hadoop.fs.Path + +class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession) + extends Rule[LogicalPlan] + with DeltaLogging + with SubqueryTransformerHelper + with OptimizeMetadataOnlyDeltaQuery { + + private val scannedSnapshots = + new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot] + + protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = { + // The first case means that we've fixed the table snapshot for time travel + if (index.isTimeTravelQuery) return index.getSnapshot + OptimisticTransaction + .getActive() + .map(_.getDeltaScanGenerator(index)) + .getOrElse { + // Will be called only when the log is accessed the first time + scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => index.getSnapshot) + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + // Should not be applied to subqueries to avoid duplicate delta jobs. + val isSubquery = isSubqueryRoot(plan) + // Should not be applied to DataSourceV2 write plans, because they'll be planned later + // through a V1 fallback and only that later planning takes place within the transaction. + val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand] + if (isSubquery || isDataSourceV2) { + return plan + } + // when 'stats.skipping' is off, it still use the metadata to optimize query for count/min/max + if ( + spark.sessionState.conf + .getConfString( + CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE, + CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE) + .toBoolean && + !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true) + ) { + optimizeQueryWithMetadata(plan) + } else { + plan + } + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 9a1b00f71431..ba17d12ffa94 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -133,6 +133,10 @@ object CHBackendSettings extends BackendSettingsApi with Logging { val GLUTEN_CLICKHOUSE_TABLE_PATH_TO_MTPS_CACHE_SIZE: String = CHConf.prefixOf("table.path.to.mtps.cache.size") + val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE: String = + CHConf.prefixOf("delta.metadata.optimize") + val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE: String = "true" + def affinityMode: String = { SparkEnv.get.conf .get( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index dea0d50c9da6..470ece4037a5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -30,6 +30,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.delta.DeltaLogFileIndex +import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, CommandResultExec, FileSourceScanExec, GlutenFallbackReporter, RDDScanExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.V2CommandExec @@ -59,6 +60,7 @@ private object CHRuleApi { injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark)) injector.injectOptimizerRule(_ => CountDistinctWithoutExpand) injector.injectOptimizerRule(_ => EqualToRewrite) + injector.injectPreCBORule(spark => new CHOptimizeMetadataOnlyDeltaQuery(spark)) } def injectLegacy(injector: LegacyInjector): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index c1210c5fbaca..033d51467177 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1977,24 +1977,29 @@ class GlutenClickHouseMergeTreeWriteSuite | select * from lineitem |""".stripMargin) - val sqlStr = - s""" - |SELECT - | count(*) AS count_order - |FROM - | lineitem_mergetree_count_opti - |""".stripMargin - runSql(sqlStr)( - df => { - val result = df.collect() - assertResult(1)(result.length) - assertResult("600572")(result(0).getLong(0).toString) + Seq("true", "false").foreach { + skip => + withSQLConf("spark.databricks.delta.stats.skipping" -> skip.toString) { + val sqlStr = + s""" + |SELECT + | count(*) AS count_order + |FROM + | lineitem_mergetree_count_opti + |""".stripMargin + runSql(sqlStr)( + df => { + val result = df.collect() + assertResult(1)(result.length) + assertResult("600572")(result(0).getLong(0).toString) - // Spark 3.2 + Delta 2.0 does not support this feature - if (!spark32) { - assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec]) + // Spark 3.2 + Delta 2.0 does not support this feature + if (!spark32) { + assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec]) + } + }) } - }) + } } test("test mergetree with column case sensitive") { @@ -2128,4 +2133,86 @@ class GlutenClickHouseMergeTreeWriteSuite } }) } + + test( + "GLUTEN-7812: Fix the query failed for the mergetree format " + + "when the 'spark.databricks.delta.stats.skipping' is off") { + // Spark 3.2 + Delta 2.0 doesn't not support this feature + if (!spark32) { + withSQLConf(("spark.databricks.delta.stats.skipping", "false")) { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_stats_skipping; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_stats_skipping + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_returnflag) + |TBLPROPERTIES (orderByKey='l_orderkey', + | primaryKey='l_orderkey') + |LOCATION '$basePath/lineitem_mergetree_stats_skipping' + |""".stripMargin) + + // dynamic partitions + spark.sql(s""" + | insert into table lineitem_mergetree_stats_skipping + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | o_orderpriority, + | count(*) AS order_count + |FROM + | orders + |WHERE + | o_orderdate >= date'1993-07-01' + | AND o_orderdate < date'1993-07-01' + interval 3 month + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_commitdate < l_receiptdate) + |GROUP BY + | o_orderpriority + |ORDER BY + | o_orderpriority; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val result = df.collect() + assertResult(5)(result.length) + assertResult("1-URGENT")(result(0).getString(0)) + assertResult(999)(result(0).getLong(1)) + assertResult("2-HIGH")(result(1).getString(0)) + assertResult(997)(result(1).getLong(1)) + assertResult("5-LOW")(result(4).getString(0)) + assertResult(1077)(result(4).getLong(1)) + }) + } + } + } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala index 87942c4155e7..fe6db65c3a1d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala @@ -49,4 +49,8 @@ class SparkInjector private[injector] ( def injectFunction(functionDescription: FunctionDescription): Unit = { extensions.injectFunction(control.disabler().wrapFunction(functionDescription)) } + + def injectPreCBORule(builder: RuleBuilder): Unit = { + extensions.injectPreCBORule(control.disabler().wrapRule(builder)) + } }