Skip to content

Commit

Permalink
[GLUTEN-7143][VL] RAS: Revert #7731, disable the relevant test cases …
Browse files Browse the repository at this point in the history
…since RAS doesn't report fallback details due to #7763 (#7764)
  • Loading branch information
zhztheplayer authored Nov 1, 2024
1 parent 3a74a53 commit efeef65
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ object Memo {
}

// The new node already memorized to memo.

val cachedCluster = parent.cache(cacheKey)
if (cachedCluster == targetCluster) {
// The new node already memorized to memo and in the target cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.extension.columnar.enumerated

import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{FallbackTags, OffloadSingleNode}
import org.apache.gluten.extension.columnar.OffloadSingleNode
import org.apache.gluten.extension.columnar.rewrite.RewriteSingleNode
import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.ras.path.Pattern
Expand Down Expand Up @@ -81,7 +81,7 @@ object RasOffload {
validator.validate(node) match {
case Validator.Passed =>
case Validator.Failed(reason) =>
FallbackTags.add(node, reason)
// TODO: Tag the original plan with fallback reason.
return List.empty
}

Expand All @@ -105,15 +105,18 @@ object RasOffload {
val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case t: GlutenPlan => t }
val outComes = offloadedNodes.map(_.doValidate()).filter(!_.ok())
if (outComes.nonEmpty) {
// 5. If native validation fails on the offloaded node, return the
// original one.
outComes.foreach(FallbackTags.add(from, _))
// 4. If native validation fails on at least one of the offloaded nodes, return
// the original one.
//
// TODO: Tag the original plan with fallback reason. This is a non-trivial work
// in RAS as the query plan we got here may be a copy so may not propagate tags
// to original plan.
from
} else {
offloadedPlan
}
case Validator.Failed(reason) =>
FallbackTags.add(from, reason)
// TODO: Tag the original plan with fallback reason.
from
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.execution.FileSourceScanExecTransformer
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.execution.ProjectExec
Expand All @@ -32,6 +33,10 @@ import org.apache.spark.status.ElementTrackingStore
import scala.collection.mutable.ArrayBuffer

class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper {
override def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.RAS_ENABLED.key, "false")
}

testGluten("test fallback logging") {
val testAppender = new LogAppender("fallback reason")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,18 @@ class GlutenExpressionMappingSuite
testWithSpecifiedSparkVersion(
"GLUTEN-7213: Check fallback reason with CheckOverflowInTableInsert",
Some("3.4")) {
withTable("t1", "t2") {
sql("create table t1 (a float) using parquet")
sql("insert into t1 values(1.1)")
sql("create table t2 (b decimal(10,4)) using parquet")
withSQLConf(GlutenConfig.RAS_ENABLED.key -> "false") {
withTable("t1", "t2") {
sql("create table t1 (a float) using parquet")
sql("insert into t1 values(1.1)")
sql("create table t2 (b decimal(10,4)) using parquet")

val msg =
"CheckOverflowInTableInsert is used in ANSI mode, but Gluten does not support ANSI mode."
import org.apache.spark.sql.execution.GlutenImplicits._
val fallbackSummary = sql("insert overwrite t2 select * from t1").fallbackSummary()
assert(fallbackSummary.fallbackNodeToReason.flatMap(_.values).exists(_.contains(msg)))
val msg =
"CheckOverflowInTableInsert is used in ANSI mode, but Gluten does not support ANSI mode."
import org.apache.spark.sql.execution.GlutenImplicits._
val fallbackSummary = sql("insert overwrite t2 select * from t1").fallbackSummary()
assert(fallbackSummary.fallbackNodeToReason.flatMap(_.values).exists(_.contains(msg)))
}
}
}
}

0 comments on commit efeef65

Please sign in to comment.