Skip to content

Commit

Permalink
Add ColumnarInputAdapter back to recover UI graph
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Dec 6, 2023
1 parent 81bb6c9 commit a82e822
Showing 1 changed file with 48 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.rel.RelBuilder

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -39,6 +41,9 @@ import scala.collection.JavaConverters._
* would be transformed to `ValueStreamNode` at native side.
*/
case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupport {
// `InputAdapter` is a case class, so `ColumnarInputAdapter.withNewChildren()` will return
// `InputAdapter`.
assert(child.isInstanceOf[InputAdapter])

@transient
override lazy val metrics: Map[String, SQLMetric] =
Expand Down Expand Up @@ -66,6 +71,48 @@ case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupp
}
}

/**
* InputAdapter is used to hide a SparkPlan from a subtree that supports transform. Note, if we
* remove this adaptor, the SQL UI graph would be broken.
*/
class ColumnarInputAdapter(child: SparkPlan) extends InputAdapter(child) {

// This is not strictly needed because the codegen transformation happens after the columnar
// transformation but just for consistency
override def supportsColumnar: Boolean = child.supportsColumnar

// this is the most important effect of this class
override def supportCodegen: Boolean = false

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar()
}

override def nodeName: String = s"InputAdapter"

override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit = {
child.generateTreeString(
depth,
lastChildren,
append,
verbose,
prefix = "",
addSuffix = false,
maxFields,
printNodeId,
indent)
}
}

/**
* Implemented by referring to spark's CollapseCodegenStages.
*
Expand Down Expand Up @@ -153,6 +200,6 @@ object ColumnarCollapseTransformStages {
val transformStageCounter = new AtomicInteger(0)

def wrapInputIteratorTransformer(plan: SparkPlan): TransformSupport = {
InputIteratorTransformer(plan)
InputIteratorTransformer(new ColumnarInputAdapter(plan))
}
}

0 comments on commit a82e822

Please sign in to comment.