diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala index b4ee3ff62cf79..5a6e6647fff7c 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala @@ -263,4 +263,8 @@ class CHTransformerApi extends TransformerApi with Logging { val typeNode = ConverterUtils.getTypeNode(dataType, nullable) ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode) } + + override def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String = { + throw new UnsupportedOperationException("CH backend does not support this method") + } } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala index 18bc2f61b1b6f..61ad92b6b9319 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala @@ -17,10 +17,12 @@ package io.glutenproject.backendsapi.velox import io.glutenproject.backendsapi.TransformerApi +import io.glutenproject.exec.Runtimes import io.glutenproject.expression.ConverterUtils import io.glutenproject.extension.ValidationResult import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode} import io.glutenproject.utils.InputPartitionsUtil +import io.glutenproject.vectorized.PlanEvaluatorJniWrapper import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateMap, Explode, Generator, JsonTuple, Literal, PosExplode} @@ -123,4 +125,14 @@ class TransformerApiImpl extends TransformerApi with Logging { val typeNode = ConverterUtils.getTypeNode(dataType, nullable) ExpressionBuilder.makeCast(typeNode, childNode, !nullOnOverflow) } + + override def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String = { + val tmpRuntime = Runtimes.tmpInstance() + try { + val jniWrapper = PlanEvaluatorJniWrapper.forRuntime(tmpRuntime) + jniWrapper.nativePlanString(substraitPlan, details) + } finally { + tmpRuntime.release() + } + } } diff --git a/backends-velox/src/test/scala/io/glutenproject/benchmarks/NativeBenchmarkPlanGenerator.scala b/backends-velox/src/test/scala/io/glutenproject/benchmarks/NativeBenchmarkPlanGenerator.scala index 2613f27010b09..b5d59d62a8546 100644 --- a/backends-velox/src/test/scala/io/glutenproject/benchmarks/NativeBenchmarkPlanGenerator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/benchmarks/NativeBenchmarkPlanGenerator.scala @@ -59,10 +59,10 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite { val executedPlan = df.queryExecution.executedPlan val lastStageTransformer = executedPlan.find(_.isInstanceOf[WholeStageTransformer]) assert(lastStageTransformer.nonEmpty) - var planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].getPlanJson + var planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson assert(planJson.isEmpty) executedPlan.execute() - planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].getPlanJson + planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson assert(planJson.nonEmpty) } spark.sparkContext.setLogLevel(logLevel) @@ -82,7 +82,7 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite { val finalPlan = executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer]) assert(lastStageTransformer.nonEmpty) - val planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].getPlanJson + val planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson assert(planJson.nonEmpty) } spark.sparkContext.setLogLevel(logLevel) @@ -141,7 +141,7 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite { val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer]) assert(lastStageTransformer.nonEmpty) val plan = - lastStageTransformer.get.asInstanceOf[WholeStageTransformer].getPlanJson.split('\n') + lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n') val exampleJsonFile = Paths.get(generatedPlanDir, "example.json") Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala index c35c25fa18c4c..9fd15a57c7147 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala @@ -19,12 +19,13 @@ package io.glutenproject.execution import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.execution.{GenerateExec, RDDScanExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{avg, col, udf} import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType} import scala.collection.JavaConverters -class TestOperator extends VeloxWholeStageTransformerSuite { +class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { protected val rootPath: String = getClass.getResource("/").getPath override protected val backend: String = "velox" @@ -585,4 +586,17 @@ class TestOperator extends VeloxWholeStageTransformerSuite { } } } + + test("Support get native plan tree string") { + runQueryAndCompare("select l_partkey + 1, count(*) from lineitem group by l_partkey + 1") { + df => + val wholeStageTransformers = collect(df.queryExecution.executedPlan) { + case w: WholeStageTransformer => w + } + val nativePlanString = wholeStageTransformers.head.nativePlanString() + assert(nativePlanString.contains("Aggregation[FINAL")) + assert(nativePlanString.contains("Aggregation[PARTIAL")) + assert(nativePlanString.contains("TableScan")) + } + } } diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index beb9bb9cc1ef3..32f7b1b364ba3 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -84,6 +84,8 @@ class Runtime : public std::enable_shared_from_this { GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); } + virtual std::string planString(bool details, const std::unordered_map& sessionConf) = 0; + // Just for benchmark ::substrait::Plan& getPlan() { return substraitPlan_; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 189c674f32953..b785645478d90 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -331,6 +331,24 @@ JNIEXPORT void JNICALL Java_io_glutenproject_exec_RuntimeJniWrapper_releaseRunti JNI_METHOD_END() } +JNIEXPORT jstring JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativePlanString( // NOLINT + JNIEnv* env, + jobject wrapper, + jbyteArray planArray, + jboolean details) { + JNI_METHOD_START + + auto planData = reinterpret_cast(env->GetByteArrayElements(planArray, 0)); + auto planSize = env->GetArrayLength(planArray); + auto ctx = gluten::getRuntime(env, wrapper); + ctx->parsePlan(planData, planSize); + auto& conf = ctx->getConfMap(); + auto planString = ctx->planString(details, conf); + return env->NewStringUTF(planString.c_str()); + + JNI_METHOD_END(nullptr) +} + JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithIterator( // NOLINT JNIEnv* env, diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 3766b5af22996..a449b261234eb 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -32,8 +32,12 @@ using namespace facebook; VeloxPlanConverter::VeloxPlanConverter( const std::vector>& inputIters, velox::memory::MemoryPool* veloxPool, - const std::unordered_map& confMap) - : inputIters_(inputIters), substraitVeloxPlanConverter_(veloxPool, confMap), pool_(veloxPool) {} + const std::unordered_map& confMap, + bool validationMode) + : inputIters_(inputIters), + validationMode_(validationMode), + substraitVeloxPlanConverter_(veloxPool, confMap, validationMode), + pool_(veloxPool) {} void VeloxPlanConverter::setInputPlanNode(const ::substrait::FetchRel& fetchRel) { if (fetchRel.has_input()) { @@ -118,9 +122,6 @@ void VeloxPlanConverter::setInputPlanNode(const ::substrait::ReadRel& sread) { if (iterIdx == -1) { return; } - if (inputIters_.size() == 0) { - throw std::runtime_error("Invalid input iterator."); - } // Get the input schema of this iterator. uint64_t colNum = 0; @@ -140,8 +141,16 @@ void VeloxPlanConverter::setInputPlanNode(const ::substrait::ReadRel& sread) { outNames.emplace_back(colName); } + std::shared_ptr iterator; + if (!validationMode_) { + if (inputIters_.size() == 0) { + throw std::runtime_error("Invalid input iterator."); + } + iterator = inputIters_[iterIdx]; + } + auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); - auto vectorStream = std::make_shared(pool_, std::move(inputIters_[iterIdx]), outputType); + auto vectorStream = std::make_shared(pool_, std::move(iterator), outputType); auto valuesNode = std::make_shared(nextPlanNodeId(), outputType, std::move(vectorStream)); substraitVeloxPlanConverter_.insertInputNode(iterIdx, valuesNode, planNodeId_); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 7307f714e494e..90c58774aa0dc 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -32,7 +32,8 @@ class VeloxPlanConverter { explicit VeloxPlanConverter( const std::vector>& inputIters, facebook::velox::memory::MemoryPool* veloxPool, - const std::unordered_map& confMap); + const std::unordered_map& confMap, + bool validationMode = false); std::shared_ptr toVeloxPlan(::substrait::Plan& substraitPlan); @@ -71,6 +72,8 @@ class VeloxPlanConverter { std::vector> inputIters_; + bool validationMode_; + SubstraitToVeloxPlanConverter substraitVeloxPlanConverter_; facebook::velox::memory::MemoryPool* pool_; diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 9cdc61f60a50f..00524edc956bb 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -69,6 +69,14 @@ void VeloxRuntime::getInfoAndIds( } } +std::string VeloxRuntime::planString(bool details, const std::unordered_map& sessionConf) { + std::vector> inputs; + auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); + VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), sessionConf, true); + auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_); + return veloxPlan->toString(details, true); +} + std::shared_ptr VeloxRuntime::createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 797a13d46e950..48309436c2c4f 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -107,6 +107,8 @@ class VeloxRuntime final : public Runtime { arrow::MemoryPool* arrowPool, struct ArrowSchema* cSchema) override; + std::string planString(bool details, const std::unordered_map& sessionConf) override; + std::shared_ptr getVeloxPlan() { return veloxPlan_; } diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 831619b75963c..87b5a3e77d2aa 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -84,6 +84,9 @@ class DummyRuntime final : public Runtime { std::shared_ptr select(MemoryManager*, std::shared_ptr, std::vector) override { throw GlutenException("Not yet implemented"); } + std::string planString(bool details, std::unordered_map& sessionConf) override { + throw GlutenException("Not yet implemented"); + } private: ResourceMap> resultIteratorHolder_; diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala index 58350b9f9b078..30fd9bfde2a1f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala @@ -83,4 +83,6 @@ trait TransformerApi { dataType: DecimalType, nullable: Boolean, nullOnOverflow: Boolean): ExpressionNode + + def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index 57df410defc50..ea18ac9d5c62a 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -107,13 +107,28 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo val substraitPlanLogLevel: String = GlutenConfig.getConf.substraitPlanLogLevel - private var planJson: String = "" - - def getPlanJson: String = { - if (log.isDebugEnabled() && planJson.isEmpty) { - logWarning("Plan in JSON string is empty. This may due to the plan has not been executed.") + @transient + private var wholeStageTransformerContext: Option[WholeStageTransformContext] = None + + def substraitPlan: PlanNode = { + if (wholeStageTransformerContext.isDefined) { + // TODO: remove this work around after we make `RelNode#toProtobuf` idempotent + // see `SubstraitContext#getCurrentLocalFileNode`. + wholeStageTransformerContext.get.substraitContext.initLocalFilesNodesIndex(0) + wholeStageTransformerContext.get.root + } else { + generateWholeStageTransformContext().root } - planJson + } + + def substraitPlanJson: String = { + SubstraitPlanPrinterUtil.substraitPlanToJson(substraitPlan.toProtobuf) + } + + def nativePlanString(details: Boolean = true): String = { + BackendsApiManager.getTransformerApiInstance.getNativePlanString( + substraitPlan.toProtobuf.toByteArray, + details) } override def output: Seq[Attribute] = child.output @@ -145,9 +160,9 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f maxFields, printNodeId = printNodeId, indent) - if (verbose && planJson.nonEmpty) { + if (verbose && wholeStageTransformerContext.isDefined) { append(prefix + "Substrait plan:\n") - append(planJson) + append(substraitPlanJson) append("\n") } } @@ -157,10 +172,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // See buildSparkPlanGraphNode in SparkPlanGraph.scala of Spark. override def nodeName: String = s"WholeStageCodegenTransformer ($transformStageId)" - def doWholeStageTransform(): WholeStageTransformContext = { - // invoke SparkPlan.prepare to do subquery preparation etc. - super.prepare() - + private def generateWholeStageTransformContext(): WholeStageTransformContext = { val substraitContext = new SubstraitContext val childCtx = child .asInstanceOf[TransformSupport] @@ -191,13 +203,19 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f PlanBuilder.makePlan(substraitContext, Lists.newArrayList(childCtx.root), outNames) } - if (log.isDebugEnabled()) { - planJson = SubstraitPlanPrinterUtil.substraitPlanToJson(planNode.toProtobuf) - } - WholeStageTransformContext(planNode, substraitContext) } + def doWholeStageTransform(): WholeStageTransformContext = { + // invoke SparkPlan.prepare to do subquery preparation etc. + super.prepare() + val context = generateWholeStageTransformContext() + if (conf.getConf(GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT)) { + wholeStageTransformerContext = Some(context) + } + context + } + /** Find all BasicScanExecTransformers in one WholeStageTransformer */ private def findAllScanTransformers(): Seq[BasicScanExecTransformer] = { val basicScanExecTransformers = new mutable.ListBuffer[BasicScanExecTransformer]() diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java index a6ad93ec456e7..3af3e8924c946 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java @@ -54,6 +54,8 @@ public long handle() { */ native NativePlanValidationInfo nativeValidateWithFailureReason(byte[] subPlan); + public native String nativePlanString(byte[] substraitPlan, Boolean details); + /** * Create a native compute kernel and return a columnar result iterator. * diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 287ae58076511..1a14a34a88235 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -1311,4 +1311,12 @@ object GlutenConfig { "'spark.bloom_filter.max_num_bits'") .longConf .createWithDefault(4194304L) + + val CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT = + buildConf("spark.gluten.sql.cacheWholeStageTransformerContext") + .internal() + .doc("When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` " + + "when executing") + .booleanConf + .createWithDefault(false) }