From 07f84be2ba2b95717ea74356859a9d08bb9dab10 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 1 Nov 2024 10:15:55 +0800 Subject: [PATCH] [GLUTEN-7143][VL] RAS: Fix failed UTs in GlutenSQLQueryTestSuite (#7754) --- .../clickhouse/CHValidatorApi.scala | 3 ++- .../backendsapi/velox/VeloxValidatorApi.scala | 9 +++++++-- .../gluten/backendsapi/ValidatorApi.scala | 3 ++- .../columnar/enumerated/RasOffload.scala | 20 +++++++++++++------ .../ColumnarShuffleExchangeExec.scala | 2 +- 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala index 5fe740694624..eed493cffe1e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.CHNativeExpressionEvaluator import org.apache.spark.internal.Logging import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -71,6 +71,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg } override def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], outputPartitioning: Partitioning, child: SparkPlan): Option[String] = { val outputAttributes = child.output diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 00a8f8cb0e09..ddf77e5fa3d4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.validate.NativePlanValidationInfo import org.apache.gluten.vectorized.NativePlanEvaluator -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types._ @@ -87,11 +87,16 @@ class VeloxValidatorApi extends ValidatorApi { } override def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], outputPartitioning: Partitioning, child: SparkPlan): Option[String] = { + if (outputAttributes.isEmpty) { + // See: https://github.com/apache/incubator-gluten/issues/7600. + return Some("Shuffle with empty output schema is not supported") + } if (child.output.isEmpty) { // See: https://github.com/apache/incubator-gluten/issues/7600. - return Some("Shuffle with empty schema is not supported") + return Some("Shuffle with empty input schema is not supported") } doSchemaValidate(child.schema) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala index 90f132d78ba5..4a18a618bfc7 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi import org.apache.gluten.extension.ValidationResult import org.apache.gluten.substrait.plan.PlanNode -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.DataType @@ -58,6 +58,7 @@ trait ValidatorApi { /** Validate against ColumnarShuffleExchangeExec. */ def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], outputPartitioning: Partitioning, child: SparkPlan): Option[String] } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala index 1299fffb995c..15522e4986bb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala @@ -77,7 +77,15 @@ object RasOffload { // 0. If the node is already offloaded, fail fast. assert(typeIdentifier.isInstance(node)) - // 1. Rewrite the node to form that native library supports. + // 1. Pre-validate the input node. Fast fail if no good. + validator.validate(node) match { + case Validator.Passed => + case Validator.Failed(reason) => + FallbackTags.add(node, reason) + return List.empty + } + + // 2. Rewrite the node to form that native library supports. val rewritten = rewrites.foldLeft(node) { case (node, rewrite) => node.transformUp { @@ -87,17 +95,17 @@ object RasOffload { } } - // 2. Walk the rewritten tree. + // 3. Walk the rewritten tree. val offloaded = rewritten.transformUp { case from if typeIdentifier.isInstance(from) => - // 3. Validate current node. If passed, offload it. + // 4. Validate current node. If passed, offload it. validator.validate(from) match { case Validator.Passed => val offloadedPlan = base.offload(from) val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case t: GlutenPlan => t } val outComes = offloadedNodes.map(_.doValidate()).filter(!_.ok()) if (outComes.nonEmpty) { - // 4. If native validation fails on the offloaded node, return the + // 5. If native validation fails on the offloaded node, return the // original one. outComes.foreach(FallbackTags.add(from, _)) from @@ -110,12 +118,12 @@ object RasOffload { } } - // 5. If rewritten plan is not offload-able, discard it. + // 6. If rewritten plan is not offload-able, discard it. if (offloaded.fastEquals(rewritten)) { return List.empty } - // 6. Otherwise, return the final tree. + // 7. Otherwise, return the final tree. List(offloaded) } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index f12e3ae0b33e..4f62377b09e3 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -115,7 +115,7 @@ case class ColumnarShuffleExchangeExec( override protected def doValidateInternal(): ValidationResult = { BackendsApiManager.getValidatorApiInstance - .doColumnarShuffleExchangeExecValidate(outputPartitioning, child) + .doColumnarShuffleExchangeExecValidate(output, outputPartitioning, child) .map { reason => ValidationResult.failed(