Skip to content

Commit

Permalink
[GLUTEN-7143][VL] RAS: Fix failed UTs in GlutenSQLQueryTestSuite (#7754)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Nov 1, 2024
1 parent dc65d77 commit 07f84be
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.CHNativeExpressionEvaluator

import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -71,6 +71,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
}

override def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
val outputAttributes = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.validate.NativePlanValidationInfo
import org.apache.gluten.vectorized.NativePlanEvaluator

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -87,11 +87,16 @@ class VeloxValidatorApi extends ValidatorApi {
}

override def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
if (outputAttributes.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty output schema is not supported")
}
if (child.output.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty schema is not supported")
return Some("Shuffle with empty input schema is not supported")
}
doSchemaValidate(child.schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.plan.PlanNode

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -58,6 +58,7 @@ trait ValidatorApi {

/** Validate against ColumnarShuffleExchangeExec. */
def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,15 @@ object RasOffload {
// 0. If the node is already offloaded, fail fast.
assert(typeIdentifier.isInstance(node))

// 1. Rewrite the node to form that native library supports.
// 1. Pre-validate the input node. Fast fail if no good.
validator.validate(node) match {
case Validator.Passed =>
case Validator.Failed(reason) =>
FallbackTags.add(node, reason)
return List.empty
}

// 2. Rewrite the node to form that native library supports.
val rewritten = rewrites.foldLeft(node) {
case (node, rewrite) =>
node.transformUp {
Expand All @@ -87,17 +95,17 @@ object RasOffload {
}
}

// 2. Walk the rewritten tree.
// 3. Walk the rewritten tree.
val offloaded = rewritten.transformUp {
case from if typeIdentifier.isInstance(from) =>
// 3. Validate current node. If passed, offload it.
// 4. Validate current node. If passed, offload it.
validator.validate(from) match {
case Validator.Passed =>
val offloadedPlan = base.offload(from)
val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case t: GlutenPlan => t }
val outComes = offloadedNodes.map(_.doValidate()).filter(!_.ok())
if (outComes.nonEmpty) {
// 4. If native validation fails on the offloaded node, return the
// 5. If native validation fails on the offloaded node, return the
// original one.
outComes.foreach(FallbackTags.add(from, _))
from
Expand All @@ -110,12 +118,12 @@ object RasOffload {
}
}

// 5. If rewritten plan is not offload-able, discard it.
// 6. If rewritten plan is not offload-able, discard it.
if (offloaded.fastEquals(rewritten)) {
return List.empty
}

// 6. Otherwise, return the final tree.
// 7. Otherwise, return the final tree.
List(offloaded)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ case class ColumnarShuffleExchangeExec(

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance
.doColumnarShuffleExchangeExecValidate(outputPartitioning, child)
.doColumnarShuffleExchangeExecValidate(output, outputPartitioning, child)
.map {
reason =>
ValidationResult.failed(
Expand Down

0 comments on commit 07f84be

Please sign in to comment.