Skip to content

Commit

Permalink
GLUTEN-3361] Fix the compile issue with wrong Empty2Null package path…
Browse files Browse the repository at this point in the history
… in spark 3.4
  • Loading branch information
JkSelf authored Oct 23, 2023
1 parent 400a161 commit c9014ae
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.execution.ScalarSubquery
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null

object ExpressionMappings {

Expand Down Expand Up @@ -88,7 +87,6 @@ object ExpressionMappings {
Sig[StringRepeat](REPEAT),
Sig[StringTranslate](TRANSLATE),
Sig[StringSpace](SPACE),
Sig[Empty2Null](EMPTY2NULL),
Sig[InitCap](INITCAP),
Sig[Overlay](OVERLAY),
Sig[Conv](CONV),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package io.glutenproject.sql.shims.spark32

import io.glutenproject.expression.Sig
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}

import org.apache.spark.sql.SparkSession
Expand All @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
Expand All @@ -43,7 +44,7 @@ class Spark32Shims extends SparkShims {
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
}

override def expressionMappings: Seq[Sig] = Seq.empty
override def expressionMappings: Seq[Sig] = Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL))

override def convertPartitionTransforms(
partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
Expand All @@ -57,7 +58,8 @@ class Spark33Shims extends SparkShims {
list ++ Seq(
Sig[SplitPart](ExpressionNames.SPLIT_PART),
Sig[Sec](ExpressionNames.SEC),
Sig[Csc](ExpressionNames.CSC))
Sig[Csc](ExpressionNames.CSC),
Sig[Empty2Null](ExpressionNames.EMPTY2NULL))
}

override def convertPartitionTransforms(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.glutenproject.sql.shims.spark34

import io.glutenproject.GlutenConfig
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.expression.ExpressionNames.EMPTY2NULL
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -59,7 +60,9 @@ class Spark34Shims extends SparkShims {
list ++ Seq(
Sig[SplitPart](ExpressionNames.SPLIT_PART),
Sig[Sec](ExpressionNames.SEC),
Sig[Csc](ExpressionNames.CSC))
Sig[Csc](ExpressionNames.CSC),
Sig[Empty2Null](ExpressionNames.EMPTY2NULL))

}

override def convertPartitionTransforms(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,6 @@ object FileFormatWriter extends Logging {
customPartitionLocations: Map[TablePartitionSpec, String],
outputColumns: Seq[Attribute])

case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v
override def nullable: Boolean = true
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(
ctx,
ev,
c => {
s"""if ($c.numBytes() == 0) {
| ${ev.isNull} = true;
| ${ev.value} = null;
|} else {
| ${ev.value} = $c;
|}""".stripMargin
}
)
}

override protected def withNewChildInternal(newChild: Expression): Empty2Null =
copy(child = newChild)
}

/** Describes how concurrent output writers should be executed. */
case class ConcurrentOutputWriterSpec(
maxWriters: Int,
Expand Down

0 comments on commit c9014ae

Please sign in to comment.