Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Add GlutenImplicits to get FallbackSummary easily #3599

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -785,3 +785,17 @@ If you want to disable Gluten UI, add a config when submitting `--conf spark.glu
## History server

Gluten UI also supports Spark history server. Add gluten-ui jar into the history server classpath, e.g., $SPARK_HOME/jars, then restart history server.

# Gluten Implicits

Gluten provides a helper class to get the fallback summary from a Spark Dataset.

```
import org.apache.spark.sql.execution.GlutenImplicits._
val df = spark.sql("SELECT * FROM t")
df.fallbackSummary
```

Note that, if AQE is enabled, but the query is not materialized, then it will re-plan
the query execution with disabled AQE. It is a workaround to get the final plan, and it may
cause the inconsistent results with a materialized query. However, we have no choice.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.apache.spark.shuffle

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.MemoryTarget
import io.glutenproject.memory.memtarget.Spiller
import io.glutenproject.vectorized._
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.GlutenFallbackReporter.FALLBACK_REASON_TAG
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}

import java.util.{IdentityHashMap, Set}
Expand All @@ -38,25 +40,52 @@ import scala.collection.mutable.{ArrayBuffer, BitSet}
// 2. remove `plan.verboseStringWithOperatorId`
// 3. remove codegen id
object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String, String]) = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]
type FallbackInfo = (Int, Map[String, String])

def addFallbackNodeWithReason(p: SparkPlan, reason: String): Unit = {
p.getTagValue(QueryPlan.OP_ID_TAG).foreach {
opId =>
// e.g., 002 project, it is used to help analysis by `substring(4)`
val formattedNodeName = f"$opId%03d ${p.nodeName}"
fallbackNodeToReason.put(formattedNodeName, reason)
}
def addFallbackNodeWithReason(
p: SparkPlan,
reason: String,
fallbackNodeToReason: mutable.HashMap[String, String]): Unit = {
p.getTagValue(QueryPlan.OP_ID_TAG).foreach {
opId =>
// e.g., 002 project, it is used to help analysis by `substring(4)`
val formattedNodeName = f"$opId%03d ${p.nodeName}"
fallbackNodeToReason.put(formattedNodeName, reason)
}
}

def handleVanillaSparkPlan(
p: SparkPlan,
fallbackNodeToReason: mutable.HashMap[String, String]
): Unit = {
p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
case Some(reason) => addFallbackNodeWithReason(p, reason, fallbackNodeToReason)
case _ =>
// If the SparkPlan does not have fallback reason, then there are two options:
// 1. Gluten ignore that plan and it's a kind of fallback
// 2. Gluten does not support it without the fallback reason
addFallbackNodeWithReason(
p,
"Gluten does not touch it or does not support it",
fallbackNodeToReason)
}
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]

def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
case _: WholeStageTransformer =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec => collect(p.executedPlan)
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: QueryStageExec => collect(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
Expand All @@ -65,17 +94,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
if (InMemoryTableScanHelper.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled")
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
}
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
case Some(reason) => addFallbackNodeWithReason(p, reason)
case _ =>
// If the SparkPlan does not have fallback reason, then there are two options:
// 1. Gluten ignore that plan and it's a kind of fallback
// 2. Gluten does not support it without the fallback reason
addFallbackNodeWithReason(p, "Gluten does not touch it or does not support it")
}
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
case _ =>
}
Expand Down Expand Up @@ -120,7 +143,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
*/
def processPlan[T <: QueryPlan[T]](
plan: T,
append: String => Unit): (Int, Map[String, String]) = {
append: String => Unit,
collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): FallbackInfo = {
try {
// Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
// intentional overwriting of IDs generated in previous AQE iteration
Expand Down Expand Up @@ -186,7 +210,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
append("\n")
}

collectFallbackNodes(plan)
if (collectFallbackFunc.isEmpty) {
collectFallbackNodes(plan)
} else {
collectFallbackFunc.get.apply(plan)
}
} finally {
removeTags(plan)
}
Expand Down Expand Up @@ -310,11 +338,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
* Returns the operator identifier for the supplied plan by retrieving the `operationId` tag
* value.
*/
def getOpId(plan: QueryPlan[_]): String = {
private def getOpId(plan: QueryPlan[_]): String = {
plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown")
}

def removeTags(plan: QueryPlan[_]): Unit = {
private def removeTags(plan: QueryPlan[_]): Unit = {
def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
p.unsetTagValue(QueryPlan.OP_ID_TAG)
children.foreach(removeTags)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import io.glutenproject.execution.WholeStageTransformer
import io.glutenproject.extension.{GlutenPlan, InMemoryTableScanHelper}

import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.GlutenExplainUtils._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.internal.SQLConf

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

// spotless:off
/**
* A helper class to get the Gluten fallback summary from a Spark [[Dataset]].
*
* Note that, if AQE is enabled, but the query is not materialized, then this method will re-plan
* the query execution with disabled AQE. It is a workaround to get the final plan, and it may
* cause the inconsistent results with a materialized query. However, we have no choice.
*
* For example:
*
* {{{
* import org.apache.spark.sql.execution.GlutenImplicits._
* val df = spark.sql("SELECT * FROM t")
* df.fallbackSummary
* }}}
*/
// spotless:on
object GlutenImplicits {

case class FallbackSummary(
numGlutenNodes: Int,
numFallbackNodes: Int,
physicalPlanDescription: Seq[String],
fallbackNodeToReason: Seq[Map[String, String]]) {}

private[sql] def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map {
key =>
if (conf.contains(key)) {
Some(conf.getConfString(key))
} else {
None
}
}
keys.zip(values).foreach {
case (k, v) =>
if (SQLConf.isStaticConfigKey(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
conf.setConfString(k, v)
}
try f
finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConfString(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

implicit class DatasetTransformer[T](dateset: Dataset[T]) {
private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
val args = p.argString(Int.MaxValue)
val index = args.indexOf("isFinalPlan=")
assert(index >= 0)
args.substring(index + "isFinalPlan=".length).trim.toBoolean
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]

def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
case _: WholeStageTransformer =>
case _: InputAdapter =>
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
collect(p.executedPlan)
case p: AdaptiveSparkPlanExec =>
// if we are here that means we are inside table cache.
val (innerNumGlutenNodes, innerFallbackNodeToReason) =
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// re-plan manually to skip cached data
val newSparkPlan = QueryExecution.createSparkPlan(
dateset.sparkSession,
dateset.sparkSession.sessionState.planner,
p.inputPlan.logicalLink.get)
val newExecutedPlan = QueryExecution.prepareExecutedPlan(
dateset.sparkSession,
newSparkPlan
)
processPlan(
newExecutedPlan,
new PlanStringConcat().append,
Some(plan => collectFallbackNodes(plan)))
}
numGlutenNodes += innerNumGlutenNodes
fallbackNodeToReason.++=(innerFallbackNodeToReason)
case p: QueryStageExec => collect(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
p.innerChildren.foreach(collect)
case i: InMemoryTableScanExec =>
if (InMemoryTableScanHelper.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
}
collect(i.relation.cachedPlan)
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
case _ =>
}
}

collect(plan)
(numGlutenNodes, fallbackNodeToReason.toMap)
}

private def collectQueryExecutionFallbackSummary(qe: QueryExecution): FallbackSummary = {
var totalNumGlutenNodes = 0
var totalNumFallbackNodes = 0
val totalPhysicalPlanDescription = new ArrayBuffer[String]()
val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()

def handlePlanWithAQEAndTableCache(
plan: SparkPlan,
logicalPlan: LogicalPlan,
isMaterialized: Boolean): Unit = {
val concat = new PlanStringConcat()
val collectFallbackFunc = Some(plan => collectFallbackNodes(plan))
val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// AQE is not materialized, so the columnar rules are not applied.
// For this case, We apply columnar rules manually with disable AQE.
val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan)
processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
}
} else {
processPlan(plan, concat.append, collectFallbackFunc)
}
totalNumGlutenNodes += numGlutenNodes
totalNumFallbackNodes += fallbackNodeToReason.size
totalPhysicalPlanDescription.append(concat.toString())
totalFallbackNodeToReason.append(fallbackNodeToReason)
}

// For command-like query, e.g., `INSERT INTO TABLE ...`
qe.commandExecuted.foreach {
case r: CommandResult =>
handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true)
case _ => // ignore
}

// For query, e.g., `SELECT * FROM ...`
if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
val isMaterialized = qe.executedPlan.find {
case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
case _ => false
}.isDefined
handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized)
}

FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription,
totalFallbackNodeToReason
)
}

def fallbackSummary(): FallbackSummary = {
collectQueryExecutionFallbackSummary(dateset.queryExecution)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,9 @@ class ClickHouseTestSettings extends BackendTestSettings {
"SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where int_Field=1")
.exclude("SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema")
enableSuite[SparkFunctionStatistics]

enableSuite[GlutenImplicitsTest]
.exclude("fallbackSummary with shuffle")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is not workable for CH backend? cc @zzcclp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work with CH backend. There are some different behavior, so I disable these tests for CH backend. e.g., velox backend would add one more project before shuffle, velox backend supports columnar cache, etc..

.exclude("fallbackSummary with cache")
.exclude("fallbackSummary with cached data and shuffle")
}
// scalastyle:on line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -1164,5 +1164,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenHiveSQLQuerySuite]
// ReaderFactory is not registered for format orc.
.exclude("hive orc scan")
enableSuite[GlutenImplicitsTest]
}
// scalastyle:on line.size.limit
Loading
Loading