diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java index 0d598cf6cba1..3322afe0b9d5 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java @@ -27,7 +27,6 @@ import io.glutenproject.substrait.plan.PlanNode; import com.google.protobuf.Any; -import io.substrait.proto.Plan; import org.apache.spark.SparkConf; import org.apache.spark.sql.internal.SQLConf; @@ -80,12 +79,12 @@ private PlanNode buildNativeConfNode(Map confs) { // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - Plan wsPlan, List iterList, boolean materializeInput) { + byte[] wsPlan, List iterList, boolean materializeInput) { long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(); long handle = jniWrapper.nativeCreateKernelWithIterator( allocId, - getPlanBytesBuf(wsPlan), + wsPlan, iterList.toArray(new GeneralInIterator[0]), buildNativeConfNode( GlutenConfig.getNativeBackendConf( @@ -115,10 +114,6 @@ public GeneralOutIterator createKernelWithBatchIterator( return createOutIterator(handle); } - private byte[] getPlanBytesBuf(Plan planNode) { - return planNode.toByteArray(); - } - private GeneralOutIterator createOutIterator(long nativeHandle) { return new BatchIterator(nativeHandle); } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index aea4bb15df12..4820e4f96aeb 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -91,19 +91,19 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { fileFormats(i)), SoftAffinityUtil.getFilePartitionLocations(f)) case _ => - throw new UnsupportedOperationException(s"Unsupport operators.") + throw new UnsupportedOperationException(s"Unsupported input partition.") }) wsCxt.substraitContext.initLocalFilesNodesIndex(0) wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) val substraitPlan = wsCxt.root.toProtobuf - if (index < 3) { + if (index == 0) { logOnLevel( GlutenConfig.getConf.substraitPlanLogLevel, s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil .substraitPlanToJson(substraitPlan)}" ) } - GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2) + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -185,7 +185,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { }.asJava) // we need to complete dependency RDD's firstly transKernel.createKernelWithBatchIterator( - rootNode.toProtobuf, + rootNode.toProtobuf.toByteArray, columnarNativeIterator, materializeInput) } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index bb5acb46a9ef..2aae3d9c8ca0 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -122,7 +122,7 @@ class IteratorApiImpl extends IteratorApi with Logging { wsCxt.substraitContext.initLocalFilesNodesIndex(0) wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) val substraitPlan = wsCxt.root.toProtobuf - GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2) + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -187,7 +187,9 @@ class IteratorApiImpl extends IteratorApi with Logging { iter => new ColumnarBatchInIterator(iter.asJava) }.asJava) val nativeResultIterator = - transKernel.createKernelWithBatchIterator(rootNode.toProtobuf, columnarNativeIterator) + transKernel.createKernelWithBatchIterator( + rootNode.toProtobuf.toByteArray, + columnarNativeIterator) pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild) diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java index d9ea9a5f589b..1452af194a48 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java @@ -27,6 +27,9 @@ import java.util.Map; public class PlanBuilder { + + public static byte[] EMPTY_PLAN = empty().toProtobuf().toByteArray(); + private PlanBuilder() {} public static PlanNode makePlan( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala index 33c234daf0b2..11b95251afb6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala @@ -30,21 +30,22 @@ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager -import io.substrait.proto.Plan - import scala.collection.mutable trait BaseGlutenPartition extends Partition with InputPartition { - def plan: Plan + def plan: Array[Byte] } -case class GlutenPartition(index: Int, plan: Plan, locations: Array[String] = Array.empty[String]) +case class GlutenPartition( + index: Int, + plan: Array[Byte], + locations: Array[String] = Array.empty[String]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = locations } -case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Plan) +case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Array[Byte]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = { // Computes total number of bytes can be retrieved from each host. @@ -74,15 +75,11 @@ case class GlutenMergeTreePartition( tablePath: String, minParts: Long, maxParts: Long, - plan: Plan = PlanBuilder.empty().toProtobuf) + plan: Array[Byte] = PlanBuilder.EMPTY_PLAN) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = { Array.empty[String] } - - def copySubstraitPlan(newSubstraitPlan: Plan): GlutenMergeTreePartition = { - this.copy(plan = newSubstraitPlan) - } } case class FirstZippedPartitionsPartition( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index b30e2a66f8aa..57df410defc5 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -265,7 +265,11 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f .genFilePartition(i, currentPartitions, allScanPartitionSchemas, fileFormats, wsCxt) }) (wsCxt, substraitPlanPartitions) - }(t => logOnLevel(substraitPlanLogLevel, s"Generating the Substrait plan took: $t ms.")) + }( + t => + logOnLevel( + substraitPlanLogLevel, + s"$nodeName generating the substrait plan took: $t ms.")) new GlutenWholeStageColumnarRDD( sparkContext, @@ -291,7 +295,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f * result, genFinalStageIterator rather than genFirstStageIterator will be invoked */ val resCtx = GlutenTimeMetric.withMillisTime(doWholeStageTransform()) { - t => logOnLevel(substraitPlanLogLevel, s"Generating the Substrait plan took: $t ms.") + t => + logOnLevel(substraitPlanLogLevel, s"$nodeName generating the substrait plan took: $t ms.") } new WholeStageZippedPartitionsRDD( sparkContext, diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala index fd293f40905a..1db768807e90 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala @@ -79,7 +79,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { expr: Expression, attributeSeq: Seq[Attribute]): ExpressionTransformer = { logDebug( - s"replaceWithExpressionTransformer expr: $expr class: ${expr.getClass}} " + + s"replaceWithExpressionTransformer expr: $expr class: ${expr.getClass} " + s"name: ${expr.prettyName}") expr match { diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index 75302ab8ab2b..34117333d124 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -62,7 +62,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-2", "host-3")) { nativePartition.preferredLocations().toSet } @@ -91,7 +91,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-4", "host-5")) { nativePartition.preferredLocations().toSet @@ -121,7 +121,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("executor_host-2_2", "executor_host-1_0")) { nativePartition.preferredLocations().toSet @@ -133,7 +133,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getNativeMergeTreePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("executor_host-1_1")) { nativePartition.preferredLocations().toSet @@ -163,7 +163,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-5", "host-6")) { nativePartition.preferredLocations().toSet diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java index 44a43f016f4b..468fc72ecf00 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java @@ -24,7 +24,6 @@ import io.glutenproject.utils.DebugUtil; import io.glutenproject.validate.NativePlanValidationInfo; -import io.substrait.proto.Plan; import org.apache.spark.TaskContext; import org.apache.spark.util.SparkDirectoryUtil; @@ -58,7 +57,7 @@ public NativePlanValidationInfo doNativeValidateWithFailureReason(byte[] subPlan // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - Plan wsPlan, List iterList) throws RuntimeException, IOException { + byte[] wsPlan, List iterList) throws RuntimeException, IOException { final AtomicReference outIterator = new AtomicReference<>(); final NativeMemoryManager nmm = NativeMemoryManagers.create( @@ -85,7 +84,7 @@ public GeneralOutIterator createKernelWithBatchIterator( long iterHandle = jniWrapper.nativeCreateKernelWithIterator( memoryManagerHandle, - getPlanBytesBuf(wsPlan), + wsPlan, iterList.toArray(new GeneralInIterator[0]), TaskContext.get().stageId(), TaskContext.getPartitionId(), @@ -100,8 +99,4 @@ private ColumnarBatchOutIterator createOutIterator( Runtime runtime, long iterHandle, NativeMemoryManager nmm) throws IOException { return new ColumnarBatchOutIterator(runtime, iterHandle, nmm); } - - private byte[] getPlanBytesBuf(Plan planNode) { - return planNode.toByteArray(); - } }