diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala index a388a9e02538..4cc0349606eb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.execution.ScalarSubquery -import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null object ExpressionMappings { @@ -88,7 +87,6 @@ object ExpressionMappings { Sig[StringRepeat](REPEAT), Sig[StringTranslate](TRANSLATE), Sig[StringSpace](SPACE), - Sig[Empty2Null](EMPTY2NULL), Sig[InitCap](INITCAP), Sig[Overlay](OVERLAY), Sig[Conv](CONV), diff --git a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala index 1cacc2f75be9..580cc93bdf75 100644 --- a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala @@ -16,7 +16,7 @@ */ package io.glutenproject.sql.shims.spark32 -import io.glutenproject.expression.Sig +import io.glutenproject.expression.{ExpressionNames, Sig} import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims} import org.apache.spark.sql.SparkSession @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.text.TextScan import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil @@ -43,7 +44,7 @@ class Spark32Shims extends SparkShims { HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil } - override def expressionMappings: Seq[Sig] = Seq.empty + override def expressionMappings: Seq[Sig] = Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) override def convertPartitionTransforms( partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { diff --git a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala index b593b6da7066..50e536610e7b 100644 --- a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.text.TextScan import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil @@ -57,7 +58,8 @@ class Spark33Shims extends SparkShims { list ++ Seq( Sig[SplitPart](ExpressionNames.SPLIT_PART), Sig[Sec](ExpressionNames.SEC), - Sig[Csc](ExpressionNames.CSC)) + Sig[Csc](ExpressionNames.CSC), + Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) } override def convertPartitionTransforms( diff --git a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala index fbaf04cbc7c2..cdc42f3b43fd 100644 --- a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala @@ -18,6 +18,7 @@ package io.glutenproject.sql.shims.spark34 import io.glutenproject.GlutenConfig import io.glutenproject.expression.{ExpressionNames, Sig} +import io.glutenproject.expression.ExpressionNames.EMPTY2NULL import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims} import org.apache.spark.SparkException @@ -59,7 +60,9 @@ class Spark34Shims extends SparkShims { list ++ Seq( Sig[SplitPart](ExpressionNames.SPLIT_PART), Sig[Sec](ExpressionNames.SEC), - Sig[Csc](ExpressionNames.CSC)) + Sig[Csc](ExpressionNames.CSC), + Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) + } override def convertPartitionTransforms( diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 65215f379d77..cbdf13159c08 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -53,28 +53,6 @@ object FileFormatWriter extends Logging { customPartitionLocations: Map[TablePartitionSpec, String], outputColumns: Seq[Attribute]) - case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { - override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v - override def nullable: Boolean = true - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen( - ctx, - ev, - c => { - s"""if ($c.numBytes() == 0) { - | ${ev.isNull} = true; - | ${ev.value} = null; - |} else { - | ${ev.value} = $c; - |}""".stripMargin - } - ) - } - - override protected def withNewChildInternal(newChild: Expression): Empty2Null = - copy(child = newChild) - } - /** Describes how concurrent output writers should be executed. */ case class ConcurrentOutputWriterSpec( maxWriters: Int,