Skip to content

Commit

Permalink
[GLUTEN-7812][CH] Fix the query failed for the mergetree format when …
Browse files Browse the repository at this point in the history
…the 'spark.databricks.delta.stats.skipping' is off

For the Spark 3.3 + Delta 2.3, when the 'spark.databricks.delta.stats.skipping' is off, there are some queries failed with the subquery, the error message is below:

```
java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:268)
	at org.apache.spark.sql.delta.SubqueryTransformerHelper.transformWithSubqueries(SubqueryTransformerHelper.scala:42)
	at org.apache.spark.sql.delta.SubqueryTransformerHelper.transformWithSubqueries$(SubqueryTransformerHelper.scala:40)
	at org.apache.spark.sql.delta.stats.PrepareDeltaScan.transformWithSubqueries(PrepareDeltaScan.scala:291)
	at org.apache.spark.sql.delta.PreprocessTableWithDVs.preprocessTablesWithDVs(PreprocessTableWithDVs.scala:67)
	at org.apache.spark.sql.delta.PreprocessTableWithDVs.preprocessTablesWithDVs$(PreprocessTableWithDVs.scala:66)
	at org.apache.spark.sql.delta.stats.PrepareDeltaScan.preprocessTablesWithDVs(PrepareDeltaScan.scala:291)
	at org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply(PrepareDeltaScan.scala:227)
	at org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply$(PrepareDeltaScan.scala:191)
	at org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:291)
	at org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:291)
```

Close #7812.
  • Loading branch information
zzcclp committed Nov 5, 2024
1 parent 3108d91 commit 247abe0
Show file tree
Hide file tree
Showing 19 changed files with 722 additions and 44 deletions.
1 change: 1 addition & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/stats/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.rules

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.metering.DeltaLogging

class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
extends Rule[LogicalPlan]
with DeltaLogging {

// For Delta 2.0, it can not support to optimize query with the metadata
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.rules

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot, SubqueryTransformerHelper}
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeltaScanGenerator

import org.apache.hadoop.fs.Path

class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
extends Rule[LogicalPlan]
with DeltaLogging
with SubqueryTransformerHelper
with OptimizeMetadataOnlyDeltaQuery {

private val scannedSnapshots =
new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]

protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = {
// The first case means that we've fixed the table snapshot for time travel
if (index.isTimeTravelQuery) return index.getSnapshot
OptimisticTransaction
.getActive()
.map(_.getDeltaScanGenerator(index))
.getOrElse {
// Will be called only when the log is accessed the first time
scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => index.getSnapshot)
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
// Should not be applied to subqueries to avoid duplicate delta jobs.
val isSubquery = isSubqueryRoot(plan)
// Should not be applied to DataSourceV2 write plans, because they'll be planned later
// through a V1 fallback and only that later planning takes place within the transaction.
val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
if (isSubquery || isDataSourceV2) {
return plan
}
// when 'stats.skipping' is off, it still use the metadata to optimize query for count/min/max
if (
spark.sessionState.conf
.getConfString(
CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE,
CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE)
.toBoolean &&
!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true)
) {
optimizeQueryWithMetadata(plan)
} else {
plan
}
}
}
Loading

0 comments on commit 247abe0

Please sign in to comment.