From 415c722d61baca57526f130ac084e6f77e3a7cfc Mon Sep 17 00:00:00 2001 From: Wei-Ting Chen Date: Mon, 12 Aug 2024 21:40:32 +0800 Subject: [PATCH] [VL] Port #6746, #6627, #6318, #6397, #6326, #6363 to branch-1.2 (#6773) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [GLUTEN-6612] Fix ParquetFileFormat issue caused by the setting of local property isNativeApplicable (#6627) * [CORE] Fix schema mismatch between ReadRelNode and LocalFilesNode (#6746) Co-authored-by: 蒋添 * [UT] Test input_file_name, input_file_block_start & input_file_block_length when scan falls back (#6318) * [VL] Fix E function fallback issue (#6397) * [VL] Add Scala 2.13 support (#6326) * [VL] Add Scala 2.13 support * Fix scalaStyle issues * Fix Scala Style issues * Add Spark 3.5.1 and Scala 2.13 test in workflow * Add run-spark-test-spark35-scala213 job * Add Spark 3.5.1 and Scala 2.13 test in workflow * Fix tests failures * Fix tests failures * ScalaStyle fix * Fix SoftAffinitySuite * Fix ArrowUtil error * Fix backend-velox scala issues * Fix ColumnarArrowEvalPythonExec issues * Fix ColumnarArrowEvalPythonExec issues * Fix TestOperator.scala for style issues * Fix TestOperator.scala for style issues * Fix issues in DeltaRewriteTransformerRules.scala * DeltaRewriteTransformerRules fix * Fix style issues * Fix issues * Fix issues * Fix issues * Fix issues * Fix issues * Fix issues * Fix issues --------- Co-authored-by: Hongze Zhang * [VL] Fix Alinux3 arrow build issue (#6363) * update velox docker and port PR #6363 for get_velox.sh update --------- Co-authored-by: PHILO-HE Co-authored-by: jiangjiangtian <97602666+jiangjiangtian@users.noreply.github.com> Co-authored-by: 蒋添 Co-authored-by: 高阳阳 Co-authored-by: Preetesh2110 <110464118+Preetesh2110@users.noreply.github.com> Co-authored-by: Hongze Zhang Co-authored-by: Joey --- .github/workflows/velox_docker.yml | 70 ++++++++++++++++++- .scalafmt.conf | 2 +- .../ClickhouseOptimisticTransaction.scala | 2 +- .../ClickhouseOptimisticTransaction.scala | 2 +- .../ClickhouseOptimisticTransaction.scala | 2 +- .../CHHashAggregateExecTransformer.scala | 5 +- .../parquet/GlutenParquetFilterSuite.scala | 6 +- .../benchmarks/CHStorageJoinBenchmark.scala | 2 +- .../backendsapi/velox/VeloxBackend.scala | 4 +- .../execution/GenerateExecTransformer.scala | 2 +- .../HashAggregateExecTransformer.scala | 3 +- .../python/ColumnarArrowEvalPythonExec.scala | 10 +-- .../spark/sql/expression/UDFResolver.scala | 8 +-- .../ScalarFunctionsValidateSuite.scala | 10 +++ .../gluten/execution/TestOperator.scala | 8 ++- ep/build-velox/src/get_velox.sh | 1 - .../gluten/substrait/rel/LocalFilesNode.java | 2 +- .../expression/ExpressionConverter.scala | 2 + .../GlutenWriterColumnarRules.scala | 12 ++-- .../softaffinity/SoftAffinitySuite.scala | 18 ++++- .../apache/gluten/utils/ArrowAbiUtil.scala | 4 +- .../spark/sql/utils/SparkArrowUtil.scala | 4 +- .../DeltaRewriteTransformerRules.scala | 8 +-- .../gluten/ras/path/PathFinderSuite.scala | 14 ++-- .../ras/specific/CyclicSearchSpaceSuite.scala | 9 +-- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++-------- .../spark/sql/GlutenSQLQueryTestSuite.scala | 27 ++++--- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++-------- .../spark/sql/GlutenSQLQueryTestSuite.scala | 27 ++++--- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++-------- .../spark/sql/GlutenSQLQueryTestSuite.scala | 27 ++++--- .../GlutenV1WriteCommandSuite.scala | 12 ++-- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++-------- .../spark/sql/GlutenSQLQueryTestSuite.scala | 27 ++++--- .../GlutenV1WriteCommandSuite.scala | 12 ++-- pom.xml | 3 +- shims/spark32/pom.xml | 4 +- .../datasources/FileFormatWriter.scala | 6 +- .../datasources/orc/OrcFileFormat.scala | 4 +- .../parquet/ParquetFileFormat.scala | 16 ++--- .../sql/hive/execution/HiveFileFormat.scala | 4 +- shims/spark33/pom.xml | 4 +- .../datasources/FileFormatWriter.scala | 6 +- .../datasources/orc/OrcFileFormat.scala | 4 +- .../parquet/ParquetFileFormat.scala | 14 ++-- .../sql/hive/execution/HiveFileFormat.scala | 4 +- shims/spark34/pom.xml | 4 +- 47 files changed, 346 insertions(+), 277 deletions(-) diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index 6fe572f631de..4313e1d45052 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -52,7 +52,7 @@ concurrency: jobs: build-native-lib-centos-7: runs-on: ubuntu-20.04 - container: apache/gluten:gluten-vcpkg-builder_2024_05_29 # centos7 with dependencies installed + container: apache/gluten:gluten-vcpkg-builder_2024_08_05 # centos7 with dependencies installed steps: - uses: actions/checkout@v2 - name: Generate cache key @@ -1097,6 +1097,74 @@ jobs: name: golden-files-spark35 path: /tmp/tpch-approved-plan/** + run-spark-test-spark35-scala213: + needs: build-native-lib-centos-8 + runs-on: ubuntu-20.04 + container: ghcr.io/facebookincubator/velox-dev:centos8 + env: + CCACHE_DIR: "${{ github.workspace }}/.ccache" + steps: + - uses: actions/checkout@v2 + - name: Download All Artifacts + uses: actions/download-artifact@v2 + with: + name: velox-native-lib-centos-8-${{github.sha}} + path: ./cpp/build/releases + - name: Download UDF Example Lib + uses: actions/download-artifact@v2 + with: + name: udf-example-lib-centos-8-${{github.sha}} + path: ./cpp/build/velox/udf/examples/ + - name: Download Arrow Jars + uses: actions/download-artifact@v2 + with: + name: arrow-jars-centos-8-${{github.sha}} + path: /root/.m2/repository/org/apache/arrow/ + - name: Update mirror list + run: | + sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true + sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true + - name: Setup build dependency + run: | + yum install sudo patch java-1.8.0-openjdk-devel wget -y + wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz + tar -xvf apache-maven-3.8.8-bin.tar.gz + mv apache-maven-3.8.8 /usr/lib/maven + echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV + - name: Get Ccache + uses: actions/cache/restore@v3 + with: + path: '${{ env.CCACHE_DIR }}' + key: ccache-centos-release-default + - name: Ensure Cache Dirs Exists + working-directory: ${{ github.workspace }} + run: | + mkdir -p '${{ env.CCACHE_DIR }}' + - name: Prepare spark.test.home for Spark 3.5.1 (other tests) + run: | + cd $GITHUB_WORKSPACE/ && \ + wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \ + tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \ + rm -rf spark-3.5.1-bin-hadoop3.tgz && \ + mkdir -p $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \ + mv jars $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \ + cd $GITHUB_WORKSPACE// && \ + wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \ + tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \ + mkdir -p shims/spark35/spark_home/ && \ + mv sql shims/spark35/spark_home/ && \ + dnf module -y install python39 && \ + alternatives --set python3 /usr/bin/python3.9 && \ + pip3 install setuptools && \ + pip3 install pyspark==3.5.1 cython && \ + pip3 install pandas pyarrow + - name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests) + run: | + cd $GITHUB_WORKSPACE/ + export SPARK_SCALA_VERSION=2.13 + $MVN_CMD clean install -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + $MVN_CMD test -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest + run-spark-test-spark35-slow: needs: build-native-lib-centos-8 runs-on: ubuntu-20.04 diff --git a/.scalafmt.conf b/.scalafmt.conf index e65c0217fc58..937ab11383e3 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,7 +1,7 @@ runner.dialect = scala212 # Version is required to make sure IntelliJ picks the right version -version = 3.5.9 +version = 3.8.3 preset = default # Max column diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 0794b45158e6..05cd4b3d54cc 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction( // 1. insert FakeRowAdaptor // 2. DeltaInvariantCheckerExec transform // 3. DeltaTaskStatisticsTracker collect null count / min values / max values - // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable', + // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable', // 'nativeFormat' in the LocalProperty of the sparkcontext super.writeFiles(inputData, writeOptions, additionalConstraints) } diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 0794b45158e6..05cd4b3d54cc 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction( // 1. insert FakeRowAdaptor // 2. DeltaInvariantCheckerExec transform // 3. DeltaTaskStatisticsTracker collect null count / min values / max values - // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable', + // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable', // 'nativeFormat' in the LocalProperty of the sparkcontext super.writeFiles(inputData, writeOptions, additionalConstraints) } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 9e79c4f2e984..6eec68efece3 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction( // 1. insert FakeRowAdaptor // 2. DeltaInvariantCheckerExec transform // 3. DeltaTaskStatisticsTracker collect null count / min values / max values - // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable', + // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable', // 'nativeFormat' in the LocalProperty of the sparkcontext super.writeFiles(inputData, writeOptions, additionalConstraints) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala index 7e688814381b..6c1fee39c423 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala @@ -370,8 +370,9 @@ case class CHHashAggregateExecTransformer( // Use approxPercentile.nullable as the nullable of the struct type // to make sure it returns null when input is empty fields = fields :+ (approxPercentile.child.dataType, approxPercentile.nullable) - fields = fields :+ (approxPercentile.percentageExpression.dataType, - approxPercentile.percentageExpression.nullable) + fields = fields :+ ( + approxPercentile.percentageExpression.dataType, + approxPercentile.percentageExpression.nullable) (makeStructType(fields), attr.nullable) case _ => (makeStructTypeSingleOne(attr.dataType, attr.nullable), attr.nullable) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala index a1b5801daddf..5e160c902b65 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala @@ -391,13 +391,13 @@ class GlutenParquetFilterSuite 'p_size.int >= 1, 'p_partkey.long.isNotNull, ('p_brand.string === "Brand#12" && - ('p_container.string in ("SM CASE", "SM BOX", "SM PACK", "SM PKG")) && + ('p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG")) && 'p_size.int <= 5) || ('p_brand.string === "Brand#23" && - ('p_container.string in ("MED BAG", "MED BOX", "MED PKG", "MED PACK")) && + ('p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK")) && 'p_size.int <= 10) || ('p_brand.string === "Brand#34" && - ('p_container.string in ("LG CASE", "LG BOX", "LG PACK", "LG PKG")) && + ('p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG")) && 'p_size.int <= 15) ) ), diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala index 194eccc50878..f8cd4bf57cc3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala @@ -97,7 +97,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark _numRows += batch.numRows } Iterator((_numRows, blockNativeWriter.collectAsByteArray())) - // Iterator((_numRows, new Array[Byte](0))) + // Iterator((_numRows, new Array[Byte](0))) } .collect val count0 = countsAndBytes.map(_._1).sum diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 9c1089a35bea..63bfcf2205ec 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -27,7 +27,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat} import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid} +import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Count, Sum} import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -430,7 +430,7 @@ object VeloxBackendSettings extends BackendSettingsApi { expr match { // Block directly falling back the below functions by FallbackEmptySchemaRelation. case alias: Alias => checkExpr(alias.child) - case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID => true + case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: EulerNumber => true case _ => false } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala index 8ceea8c14f6a..c1024ddcd55b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala @@ -228,7 +228,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper { } } - newProjections += Alias(CreateArray(fieldArray), generatePreAliasName)() + newProjections += Alias(CreateArray(fieldArray.toSeq), generatePreAliasName)() } // Plug in a Project between Generate and its child. diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala index 4f33ae7c718c..9c5b68e7bff1 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala @@ -396,7 +396,8 @@ abstract class HashAggregateExecTransformer( childNodes.add(expressionNode) } } - exprNodes.add(getRowConstructNode(args, childNodes, newInputAttributes, aggFunc)) + exprNodes.add( + getRowConstructNode(args, childNodes, newInputAttributes.toSeq, aggFunc)) case other => throw new GlutenNotSupportException(s"$other is not supported.") } diff --git a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala index 88280ff2edde..0e01c9d5d82f 100644 --- a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala @@ -44,7 +44,7 @@ import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.{mutable, Seq} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class ColumnarArrowPythonRunner( @@ -54,7 +54,7 @@ class ColumnarArrowPythonRunner( schema: StructType, timeZoneId: String, conf: Map[String, String]) - extends BasePythonRunnerShim(funcs, evalType, argOffsets) { + extends BasePythonRunnerShim(funcs.toSeq, evalType, argOffsets) { override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback @@ -239,7 +239,7 @@ case class ColumnarArrowEvalPythonExec( val arrowSafeTypeCheck = Seq( SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key -> conf.arrowSafeTypeConversion.toString) - Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*) + Map(timeZoneConf.toSeq ++ pandasColsByName.toSeq ++ arrowSafeTypeCheck: _*) } private val pythonRunnerConf = getPythonRunnerConfMap(conf) @@ -280,7 +280,7 @@ case class ColumnarArrowEvalPythonExec( case children => // There should not be any other UDFs, or the children can't be evaluated directly. assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) - (ChainedPythonFunctions(Seq(udf.func)), udf.children) + (ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children) } } @@ -410,7 +410,7 @@ object PullOutArrowEvalPythonPreProjectHelper extends PullOutProjectHelper { val (chained, children) = collectFunctions(u) (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) case children => - (ChainedPythonFunctions(Seq(udf.func)), udf.children) + (ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children) } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 99f9faf9914a..109ab84947cd 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -116,12 +116,12 @@ case class UDFExpression( object UDFResolver extends Logging { private val UDFNames = mutable.HashSet[String]() // (udf_name, arg1, arg2, ...) => return type - private val UDFMap = mutable.HashMap[String, mutable.MutableList[UDFSignature]]() + private val UDFMap = mutable.HashMap[String, mutable.ListBuffer[UDFSignature]]() private val UDAFNames = mutable.HashSet[String]() // (udaf_name, arg1, arg2, ...) => return type, intermediate attributes private val UDAFMap = - mutable.HashMap[String, mutable.MutableList[UDAFSignature]]() + mutable.HashMap[String, mutable.ListBuffer[UDAFSignature]]() private val LIB_EXTENSION = ".so" @@ -145,7 +145,7 @@ object UDFResolver extends Logging { variableArity: Boolean): Unit = { assert(argTypes.dataType.isInstanceOf[StructType]) val v = - UDFMap.getOrElseUpdate(name, mutable.MutableList[UDFSignature]()) + UDFMap.getOrElseUpdate(name, mutable.ListBuffer[UDFSignature]()) v += UDFSignature( returnType, argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType), @@ -189,7 +189,7 @@ object UDFResolver extends Logging { } val v = - UDAFMap.getOrElseUpdate(name, mutable.MutableList[UDAFSignature]()) + UDAFMap.getOrElseUpdate(name, mutable.ListBuffer[UDAFSignature]()) v += UDAFSignature( returnType, argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType), diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala index cd9819c3e8e7..80fd72909f42 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala @@ -672,6 +672,16 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest { } } + test("Test E function") { + runQueryAndCompare("""SELECT E() from lineitem limit 100""".stripMargin) { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + runQueryAndCompare("""SELECT E(), l_orderkey + | from lineitem limit 100""".stripMargin) { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + } + test("Test spark_partition_id function") { runQueryAndCompare("""SELECT spark_partition_id(), l_orderkey | from lineitem limit 100""".stripMargin) { diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index 230fc565d9eb..b5a8e74a8653 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -1458,9 +1458,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla path => (0 to 3).toDF("x").write.parquet(path.getCanonicalPath) spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view") - runQueryAndCompare( - "SELECT x FROM view WHERE cast(x as timestamp) " + - "IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')")(_) + runQueryAndCompare(s""" + |SELECT x FROM view + |WHERE cast(x as timestamp) + |IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2') + |""".stripMargin)(_ => ()) } } diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 1b28c1569390..6ba29cf79999 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -200,7 +200,6 @@ function process_setup_alinux3 { sed -i 's/python39 python39-devel python39-pip //g' scripts/setup-centos8.sh sed -i "s/.*pip.* install/#&/" scripts/setup-centos8.sh sed -i 's/ADDITIONAL_FLAGS=""/ADDITIONAL_FLAGS="-Wno-stringop-overflow"/g' scripts/setup-helper-functions.sh - sed -i "s/\${CMAKE_INSTALL_LIBDIR}/lib64/" third_party/CMakeLists.txt } function process_setup_tencentos32 { diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index 172a6e8cca69..04bb9d8cf400 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -104,7 +104,7 @@ private NamedStruct buildNamedStruct() { for (StructField field : fileSchema.fields()) { structBuilder.addTypes( ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf()); - namedStructBuilder.addNames(field.name()); + namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name())); } namedStructBuilder.setStruct(structBuilder.build()); } diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 805ff94900fe..6911d5ee8061 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -682,6 +682,8 @@ object ExpressionConverter extends SQLConfHelper with Logging { t.children.map(replaceWithExpressionTransformerInternal(_, attributeSeq, expressionsMap)), t ) + case e: EulerNumber => + LiteralTransformer(Literal(Math.E)) case expr => GenericExpressionTransformer( substraitExprName, diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 7063c3f67b80..20b00601531f 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -163,6 +163,10 @@ object GlutenWriterColumnarRules { BackendsApiManager.getSettings.enableNativeWriteFiles() => injectFakeRowAdaptor(rc, rc.child) case rc @ DataWritingCommandExec(cmd, child) => + // These properties can be set by the same thread in last query submission. + session.sparkContext.setLocalProperty("isNativeApplicable", null) + session.sparkContext.setLocalProperty("nativeFormat", null) + session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null) if (BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)) { val format = getNativeFormat(cmd) session.sparkContext.setLocalProperty( @@ -170,7 +174,7 @@ object GlutenWriterColumnarRules { BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) // FIXME: We should only use context property if having no other approaches. // Should see if there is another way to pass these options. - session.sparkContext.setLocalProperty("isNativeAppliable", format.isDefined.toString) + session.sparkContext.setLocalProperty("isNativeApplicable", format.isDefined.toString) session.sparkContext.setLocalProperty("nativeFormat", format.getOrElse("")) if (format.isDefined) { injectFakeRowAdaptor(rc, child) @@ -178,12 +182,6 @@ object GlutenWriterColumnarRules { rc.withNewChildren(rc.children.map(apply)) } } else { - session.sparkContext.setLocalProperty( - "staticPartitionWriteOnly", - BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) - session.sparkContext.setLocalProperty("isNativeAppliable", "false") - session.sparkContext.setLocalProperty("nativeFormat", "") - rc.withNewChildren(rc.children.map(apply)) } case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply)) diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index c6c4fcc5fa1f..ea3e50e81282 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -39,6 +39,8 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate .set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2") .set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2") + val scalaVersion = scala.util.Properties.versionNumberString + def generateNativePartition1(): Unit = { val partition = FilePartition( 0, @@ -97,7 +99,13 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) - assertResult(Set("host-1", "host-4", "host-5")) { + val affinityResultSet = if (scalaVersion.startsWith("2.12")) { + Set("host-1", "host-4", "host-5") + } else if (scalaVersion.startsWith("2.13")) { + Set("host-5", "host-4", "host-2") + } + + assertResult(affinityResultSet) { nativePartition.preferredLocations().toSet } } @@ -184,7 +192,13 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) - assertResult(Set("host-1", "host-5", "host-6")) { + val affinityResultSet = if (scalaVersion.startsWith("2.12")) { + Set("host-1", "host-5", "host-6") + } else if (scalaVersion.startsWith("2.13")) { + Set("host-6", "host-5", "host-2") + } + + assertResult(affinityResultSet) { nativePartition.preferredLocations().toSet } } diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala index 442ae74bac98..8c6161e0c44c 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala @@ -119,7 +119,7 @@ object ArrowAbiUtil { } } - def exportField(allocator: BufferAllocator, field: Field, out: ArrowSchema) { + def exportField(allocator: BufferAllocator, field: Field, out: ArrowSchema): Unit = { val dictProvider = new CDataDictionaryProvider try { Data.exportField(allocator, field, dictProvider, out) @@ -128,7 +128,7 @@ object ArrowAbiUtil { } } - def exportSchema(allocator: BufferAllocator, schema: Schema, out: ArrowSchema) { + def exportSchema(allocator: BufferAllocator, schema: Schema, out: ArrowSchema): Unit = { val dictProvider = new CDataDictionaryProvider try { Data.exportSchema(allocator, schema, dictProvider, out) diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala index 014956d84e9c..ec6ac35af3e7 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala @@ -134,7 +134,7 @@ object SparkArrowUtil { val dt = fromArrowField(child) StructField(child.getName, dt, child.isNullable) } - StructType(fields) + StructType(fields.toSeq) case arrowType => fromArrowType(arrowType) } } @@ -147,7 +147,7 @@ object SparkArrowUtil { } def fromArrowSchema(schema: Schema): StructType = { - StructType(schema.getFields.asScala.map { + StructType(schema.getFields.asScala.toSeq.map { field => val dt = fromArrowField(field) StructField(field.getName, dt, field.isNullable) diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala index 76eb53dbd022..fed837d308be 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, N import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat -import scala.collection._ +import scala.collection.mutable.ListBuffer class DeltaRewriteTransformerRules extends RewriteTransformerRules { override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil @@ -87,8 +87,8 @@ object DeltaRewriteTransformerRules { )(SparkSession.active) // transform output's name into physical name so Reader can read data correctly // should keep the columns order the same as the origin output - val originColumnNames = mutable.ListBuffer.empty[String] - val transformedAttrs = mutable.ListBuffer.empty[Attribute] + val originColumnNames = ListBuffer.empty[String] + val transformedAttrs = ListBuffer.empty[Attribute] def mapAttribute(attr: Attribute) = { val newAttr = if (!plan.isMetadataColumn(attr)) { DeltaColumnMapping @@ -142,7 +142,7 @@ object DeltaRewriteTransformerRules { val expr = (transformedAttrs, originColumnNames).zipped.map { (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) } - val projectExecTransformer = ProjectExecTransformer(expr, scanExecTransformer) + val projectExecTransformer = ProjectExecTransformer(expr.toSeq, scanExecTransformer) projectExecTransformer case _ => plan } diff --git a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala index b5ea3fc3cf6e..4b3a675cd843 100644 --- a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala +++ b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala @@ -262,18 +262,18 @@ class PathFinderSuite extends AnyFunSuite { assert(path.plan() == Binary(n1, Group(1), Group(2))) assert( - path.dive(state, 1).map(_.plan()) == List( + path.dive(state, 1).map(_.plan()).toList == List( Binary(n1, Unary(n2, Group(3)), Unary(n3, Group(4))))) assert( - path.dive(state, 2).map(_.plan()) == List( + path.dive(state, 2).map(_.plan()).toList == List( Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))), Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1))))) assert( - path.dive(state, 3).map(_.plan()) == List( + path.dive(state, 3).map(_.plan()).toList == List( Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))), Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1))))) assert( - path.dive(state, RasPath.INF_DEPTH).map(_.plan()) == List( + path.dive(state, RasPath.INF_DEPTH).map(_.plan()).toList == List( Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))), Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1))))) } @@ -338,13 +338,13 @@ class PathFinderSuite extends AnyFunSuite { path.dive(state, 1).map(_.plan()).toSeq == List( Binary(n1, Binary(n2, Group(3), Group(4)), Leaf(n3, 1)))) assert( - path.dive(state, 2).map(_.plan()) == List( + path.dive(state, 2).map(_.plan()).toList == List( Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1)))) assert( - path.dive(state, 3).map(_.plan()) == List( + path.dive(state, 3).map(_.plan()).toList == List( Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1)))) assert( - path.dive(state, RasPath.INF_DEPTH).map(_.plan()) == List( + path.dive(state, RasPath.INF_DEPTH).map(_.plan()).toList == List( Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1)))) } } diff --git a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala index d27292fb5361..077921b697bf 100644 --- a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala +++ b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala @@ -65,11 +65,12 @@ abstract class CyclicSearchSpaceSuite extends AnyFunSuite { PathFinder.builder(ras, mockState).depth(depth).build().find(can) } - assert(find(node1, 1).map(p => p.plan()) == List(Unary("node1", Group(0)))) - assert(find(node1, 2).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1)))) - assert(find(node1, 3).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1)))) + assert(find(node1, 1).map(p => p.plan()).toList == List(Unary("node1", Group(0)))) + assert(find(node1, 2).map(p => p.plan()).toList == List(Unary("node1", Leaf("node2", 1)))) + assert(find(node1, 3).map(p => p.plan()).toList == List(Unary("node1", Leaf("node2", 1)))) assert( - find(node1, RasPath.INF_DEPTH).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1)))) + find(node1, RasPath.INF_DEPTH).map(p => p.plan()).toList == List( + Unary("node1", Leaf("node2", 1)))) } test("Cyclic - find best, simple self cycle") { diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index a4b530e637af..da22e60f932d 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala index 4b75ce13c067..6703e7dc4363 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala @@ -69,20 +69,19 @@ import scala.util.Try * The format for input files is simple: * 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot * effectively separate the SQL queries in the test file(e.g. bracketed comments), please use - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query, - * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and - * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as - * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running - * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or - * you can use multiple - * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test - * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing - * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 - * belongs to dimension 1. One dimension can have multiple lines, each line representing one config - * set (one or more configs, separated by comma). Spark will run this testing file many times, each - * time picks one config set from each dimension, until all the combinations are tried. For example, - * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times + * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START + * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is + * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is + * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3. + * Lines starting with --SET are used to specify the configs when running this testing file. You can + * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET + * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5. + * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The + * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to + * dimension 1. One dimension can have multiple lines, each line representing one config set (one or + * more configs, separated by comma). Spark will run this testing file many times, each time picks + * one config set from each dimension, until all the combinations are tried. For example, if + * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times * (cartesian product). * * For example: diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index a4b530e637af..da22e60f932d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala index 4536aa54057c..b052528b80a0 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala @@ -69,20 +69,19 @@ import scala.util.Try * The format for input files is simple: * 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot * effectively separate the SQL queries in the test file(e.g. bracketed comments), please use - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query, - * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and - * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as - * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running - * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or - * you can use multiple - * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test - * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing - * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 - * belongs to dimension 1. One dimension can have multiple lines, each line representing one config - * set (one or more configs, separated by comma). Spark will run this testing file many times, each - * time picks one config set from each dimension, until all the combinations are tried. For example, - * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times + * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START + * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is + * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is + * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3. + * Lines starting with --SET are used to specify the configs when running this testing file. You can + * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET + * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5. + * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The + * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to + * dimension 1. One dimension can have multiple lines, each line representing one config set (one or + * more configs, separated by comma). Spark will run this testing file many times, each time picks + * one config set from each dimension, until all the combinations are tried. For example, if + * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times * (cartesian product). * * For example: diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index a4b530e637af..da22e60f932d 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala index 0ea1f13ec2ef..19d7ac1eb725 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala @@ -70,20 +70,19 @@ import scala.util.Try * The format for input files is simple: * 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot * effectively separate the SQL queries in the test file(e.g. bracketed comments), please use - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query, - * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and - * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as - * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running - * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or - * you can use multiple - * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test - * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing - * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 - * belongs to dimension 1. One dimension can have multiple lines, each line representing one config - * set (one or more configs, separated by comma). Spark will run this testing file many times, each - * time picks one config set from each dimension, until all the combinations are tried. For example, - * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times + * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START + * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is + * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is + * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3. + * Lines starting with --SET are used to specify the configs when running this testing file. You can + * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET + * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5. + * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The + * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to + * dimension 1. One dimension can have multiple lines, each line representing one config set (one or + * more configs, separated by comma). Spark will run this testing file many times, each time picks + * one config set from each dimension, until all the combinations are tried. For example, if + * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times * (cartesian product). * * For example: diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala index 3d277b94cc3e..38024f62c411 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala @@ -151,7 +151,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case SortExecTransformer( Seq( @@ -168,7 +169,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case _ => false }, @@ -233,7 +235,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case SortExecTransformer( Seq( @@ -250,7 +253,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case _ => false }, diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index 8a28c4e98a26..da22e60f932d 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types._ class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala index b1f3945bf192..2f21b38f299e 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala @@ -70,20 +70,19 @@ import scala.util.Try * The format for input files is simple: * 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot * effectively separate the SQL queries in the test file(e.g. bracketed comments), please use - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with - * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query, - * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and - * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as - * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running - * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or - * you can use multiple - * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test - * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing - * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 - * belongs to dimension 1. One dimension can have multiple lines, each line representing one config - * set (one or more configs, separated by comma). Spark will run this testing file many times, each - * time picks one config set from each dimension, until all the combinations are tried. For example, - * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times + * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START + * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is + * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is + * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3. + * Lines starting with --SET are used to specify the configs when running this testing file. You can + * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET + * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5. + * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The + * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to + * dimension 1. One dimension can have multiple lines, each line representing one config set (one or + * more configs, separated by comma). Spark will run this testing file many times, each time picks + * one config set from each dimension, until all the combinations are tried. For example, if + * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times * (cartesian product). * * For example: diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala index fcaf75a4d5c1..5fc887d8d410 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala @@ -152,7 +152,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case SortExecTransformer( Seq( @@ -169,7 +170,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case _ => false }, @@ -233,7 +235,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case SortExecTransformer( Seq( @@ -250,7 +253,8 @@ class GlutenV1WriteCommandSuite ), false, _, - _) => + _ + ) => true case _ => false }, diff --git a/pom.xml b/pom.xml index fcbd20175e16..160abea5c3d1 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ 32.0.1-jre 2.27.2 - 3.5.9 + 3.8.3 package /* @@ -146,6 +146,7 @@ 2.13.8 2.13 + 3.8.3 diff --git a/shims/spark32/pom.xml b/shims/spark32/pom.xml index 07eb5a52ec44..68dbeaf22f70 100644 --- a/shims/spark32/pom.xml +++ b/shims/spark32/pom.xml @@ -43,13 +43,13 @@ org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} provided true org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} provided true diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a5c857103910..96a044c0cbbe 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -148,9 +148,9 @@ object FileFormatWriter extends Logging { numStaticPartitionCols: Int = 0): Set[String] = { val nativeEnabled = - "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) + "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") val staticPartitionWriteOnly = - "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) + "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly") if (nativeEnabled) { logInfo("Use Gluten partition write for hive") @@ -257,7 +257,7 @@ object FileFormatWriter extends Logging { } val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - if ("parquet".equals(nativeFormat)) { + if ("parquet" == nativeFormat) { (GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) } else { (GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 34873c46b09e..619fa64ace6d 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenOrcWriterInjects .getInstance() .inferSchema(sparkSession, Map.empty[String, String], files) @@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable .asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val nativeConf = GlutenOrcWriterInjects diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c6b383136590..42a63c7ebcd1 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val conf = ContextUtil.getConfiguration(job) @@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files) } else { // the vanilla spark case ParquetUtils.inferSchema(sparkSession, parameters, files) @@ -210,14 +210,10 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging /** Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { - true - } else { - val conf = sparkSession.sessionState.conf - conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && - schema.length <= conf.wholeStageMaxNumFields && - schema.forall(_.dataType.isInstanceOf[AtomicType]) - } + val conf = sparkSession.sessionState.conf + conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && + schema.length <= conf.wholeStageMaxNumFields && + schema.forall(_.dataType.isInstanceOf[AtomicType]) } override def vectorTypes( diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 162dd342bcf0..eb0f6a5d97df 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -100,9 +100,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - val isParquetFormat = nativeFormat.equals("parquet") + val isParquetFormat = nativeFormat == "parquet" val compressionCodec = if (fileSinkConf.compressed) { // hive related configurations fileSinkConf.compressCodec diff --git a/shims/spark33/pom.xml b/shims/spark33/pom.xml index cbf732fc48e1..13554ee27e13 100644 --- a/shims/spark33/pom.xml +++ b/shims/spark33/pom.xml @@ -43,13 +43,13 @@ org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} provided true org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} provided true diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index ebf45e76e74e..f5e932337c02 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -140,9 +140,9 @@ object FileFormatWriter extends Logging { numStaticPartitionCols: Int = 0): Set[String] = { val nativeEnabled = - "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) + "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") val staticPartitionWriteOnly = - "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) + "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly") if (nativeEnabled) { logInfo("Use Gluten partition write for hive") @@ -277,7 +277,7 @@ object FileFormatWriter extends Logging { } val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - if ("parquet".equals(nativeFormat)) { + if ("parquet" == nativeFormat) { (GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) } else { (GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 49ac28d73322..9891f6851d00 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options, files) } else { // the vanilla spark case OrcUtils.inferSchema(sparkSession, files, options) @@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable .asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val nativeConf = GlutenOrcWriterInjects diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b0573f68e46d..403e31c1cb30 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val conf = ContextUtil.getConfiguration(job) @@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files) } else { // the vanilla spark case ParquetUtils.inferSchema(sparkSession, parameters, files) @@ -206,13 +206,9 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging /** Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { - true - } else { - val conf = sparkSession.sessionState.conf - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && - !WholeStageCodegenExec.isTooManyFields(conf, schema) - } + val conf = sparkSession.sessionState.conf + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && + !WholeStageCodegenExec.isTooManyFields(conf, schema) } override def vectorTypes( diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 7a824c43670d..b9c1622cbee5 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -97,9 +97,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - val isParquetFormat = nativeFormat.equals("parquet") + val isParquetFormat = nativeFormat == "parquet" val compressionCodec = if (fileSinkConf.compressed) { // hive related configurations fileSinkConf.compressCodec diff --git a/shims/spark34/pom.xml b/shims/spark34/pom.xml index 9bcd7a840674..9a9ee55a1f45 100644 --- a/shims/spark34/pom.xml +++ b/shims/spark34/pom.xml @@ -43,13 +43,13 @@ org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} provided true org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} provided true