diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml
index 6fe572f631de..4313e1d45052 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -52,7 +52,7 @@ concurrency:
jobs:
build-native-lib-centos-7:
runs-on: ubuntu-20.04
- container: apache/gluten:gluten-vcpkg-builder_2024_05_29 # centos7 with dependencies installed
+ container: apache/gluten:gluten-vcpkg-builder_2024_08_05 # centos7 with dependencies installed
steps:
- uses: actions/checkout@v2
- name: Generate cache key
@@ -1097,6 +1097,74 @@ jobs:
name: golden-files-spark35
path: /tmp/tpch-approved-plan/**
+ run-spark-test-spark35-scala213:
+ needs: build-native-lib-centos-8
+ runs-on: ubuntu-20.04
+ container: ghcr.io/facebookincubator/velox-dev:centos8
+ env:
+ CCACHE_DIR: "${{ github.workspace }}/.ccache"
+ steps:
+ - uses: actions/checkout@v2
+ - name: Download All Artifacts
+ uses: actions/download-artifact@v2
+ with:
+ name: velox-native-lib-centos-8-${{github.sha}}
+ path: ./cpp/build/releases
+ - name: Download UDF Example Lib
+ uses: actions/download-artifact@v2
+ with:
+ name: udf-example-lib-centos-8-${{github.sha}}
+ path: ./cpp/build/velox/udf/examples/
+ - name: Download Arrow Jars
+ uses: actions/download-artifact@v2
+ with:
+ name: arrow-jars-centos-8-${{github.sha}}
+ path: /root/.m2/repository/org/apache/arrow/
+ - name: Update mirror list
+ run: |
+ sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
+ sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
+ - name: Setup build dependency
+ run: |
+ yum install sudo patch java-1.8.0-openjdk-devel wget -y
+ wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
+ tar -xvf apache-maven-3.8.8-bin.tar.gz
+ mv apache-maven-3.8.8 /usr/lib/maven
+ echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
+ - name: Get Ccache
+ uses: actions/cache/restore@v3
+ with:
+ path: '${{ env.CCACHE_DIR }}'
+ key: ccache-centos-release-default
+ - name: Ensure Cache Dirs Exists
+ working-directory: ${{ github.workspace }}
+ run: |
+ mkdir -p '${{ env.CCACHE_DIR }}'
+ - name: Prepare spark.test.home for Spark 3.5.1 (other tests)
+ run: |
+ cd $GITHUB_WORKSPACE/ && \
+ wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
+ tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
+ rm -rf spark-3.5.1-bin-hadoop3.tgz && \
+ mkdir -p $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
+ mv jars $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
+ cd $GITHUB_WORKSPACE// && \
+ wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
+ tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
+ mkdir -p shims/spark35/spark_home/ && \
+ mv sql shims/spark35/spark_home/ && \
+ dnf module -y install python39 && \
+ alternatives --set python3 /usr/bin/python3.9 && \
+ pip3 install setuptools && \
+ pip3 install pyspark==3.5.1 cython && \
+ pip3 install pandas pyarrow
+ - name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests)
+ run: |
+ cd $GITHUB_WORKSPACE/
+ export SPARK_SCALA_VERSION=2.13
+ $MVN_CMD clean install -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
+ $MVN_CMD test -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
+
run-spark-test-spark35-slow:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
diff --git a/.scalafmt.conf b/.scalafmt.conf
index e65c0217fc58..937ab11383e3 100644
--- a/.scalafmt.conf
+++ b/.scalafmt.conf
@@ -1,7 +1,7 @@
runner.dialect = scala212
# Version is required to make sure IntelliJ picks the right version
-version = 3.5.9
+version = 3.8.3
preset = default
# Max column
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 0794b45158e6..05cd4b3d54cc 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
- // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+ // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 0794b45158e6..05cd4b3d54cc 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
- // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+ // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 9e79c4f2e984..6eec68efece3 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
- // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+ // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
index 7e688814381b..6c1fee39c423 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
@@ -370,8 +370,9 @@ case class CHHashAggregateExecTransformer(
// Use approxPercentile.nullable as the nullable of the struct type
// to make sure it returns null when input is empty
fields = fields :+ (approxPercentile.child.dataType, approxPercentile.nullable)
- fields = fields :+ (approxPercentile.percentageExpression.dataType,
- approxPercentile.percentageExpression.nullable)
+ fields = fields :+ (
+ approxPercentile.percentageExpression.dataType,
+ approxPercentile.percentageExpression.nullable)
(makeStructType(fields), attr.nullable)
case _ =>
(makeStructTypeSingleOne(attr.dataType, attr.nullable), attr.nullable)
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
index a1b5801daddf..5e160c902b65 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
@@ -391,13 +391,13 @@ class GlutenParquetFilterSuite
'p_size.int >= 1,
'p_partkey.long.isNotNull,
('p_brand.string === "Brand#12" &&
- ('p_container.string in ("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
+ ('p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
'p_size.int <= 5) ||
('p_brand.string === "Brand#23" &&
- ('p_container.string in ("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
+ ('p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
'p_size.int <= 10) ||
('p_brand.string === "Brand#34" &&
- ('p_container.string in ("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
+ ('p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
'p_size.int <= 15)
)
),
diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
index 194eccc50878..f8cd4bf57cc3 100644
--- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala
@@ -97,7 +97,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark
_numRows += batch.numRows
}
Iterator((_numRows, blockNativeWriter.collectAsByteArray()))
- // Iterator((_numRows, new Array[Byte](0)))
+ // Iterator((_numRows, new Array[Byte](0)))
}
.collect
val count0 = countsAndBytes.map(_._1).sum
diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9c1089a35bea..63bfcf2205ec 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -27,7 +27,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Count, Sum}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -430,7 +430,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
expr match {
// Block directly falling back the below functions by FallbackEmptySchemaRelation.
case alias: Alias => checkExpr(alias.child)
- case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID => true
+ case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: EulerNumber => true
case _ => false
}
}
diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index 8ceea8c14f6a..c1024ddcd55b 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -228,7 +228,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper {
}
}
- newProjections += Alias(CreateArray(fieldArray), generatePreAliasName)()
+ newProjections += Alias(CreateArray(fieldArray.toSeq), generatePreAliasName)()
}
// Plug in a Project between Generate and its child.
diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
index 4f33ae7c718c..9c5b68e7bff1 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala
@@ -396,7 +396,8 @@ abstract class HashAggregateExecTransformer(
childNodes.add(expressionNode)
}
}
- exprNodes.add(getRowConstructNode(args, childNodes, newInputAttributes, aggFunc))
+ exprNodes.add(
+ getRowConstructNode(args, childNodes, newInputAttributes.toSeq, aggFunc))
case other =>
throw new GlutenNotSupportException(s"$other is not supported.")
}
diff --git a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 88280ff2edde..0e01c9d5d82f 100644
--- a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++ b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -44,7 +44,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.{mutable, Seq}
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class ColumnarArrowPythonRunner(
@@ -54,7 +54,7 @@ class ColumnarArrowPythonRunner(
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
- extends BasePythonRunnerShim(funcs, evalType, argOffsets) {
+ extends BasePythonRunnerShim(funcs.toSeq, evalType, argOffsets) {
override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
@@ -239,7 +239,7 @@ case class ColumnarArrowEvalPythonExec(
val arrowSafeTypeCheck = Seq(
SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key ->
conf.arrowSafeTypeConversion.toString)
- Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*)
+ Map(timeZoneConf.toSeq ++ pandasColsByName.toSeq ++ arrowSafeTypeCheck: _*)
}
private val pythonRunnerConf = getPythonRunnerConfMap(conf)
@@ -280,7 +280,7 @@ case class ColumnarArrowEvalPythonExec(
case children =>
// There should not be any other UDFs, or the children can't be evaluated directly.
assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
- (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+ (ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}
@@ -410,7 +410,7 @@ object PullOutArrowEvalPythonPreProjectHelper extends PullOutProjectHelper {
val (chained, children) = collectFunctions(u)
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
- (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+ (ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}
diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
index 99f9faf9914a..109ab84947cd 100644
--- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
+++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala
@@ -116,12 +116,12 @@ case class UDFExpression(
object UDFResolver extends Logging {
private val UDFNames = mutable.HashSet[String]()
// (udf_name, arg1, arg2, ...) => return type
- private val UDFMap = mutable.HashMap[String, mutable.MutableList[UDFSignature]]()
+ private val UDFMap = mutable.HashMap[String, mutable.ListBuffer[UDFSignature]]()
private val UDAFNames = mutable.HashSet[String]()
// (udaf_name, arg1, arg2, ...) => return type, intermediate attributes
private val UDAFMap =
- mutable.HashMap[String, mutable.MutableList[UDAFSignature]]()
+ mutable.HashMap[String, mutable.ListBuffer[UDAFSignature]]()
private val LIB_EXTENSION = ".so"
@@ -145,7 +145,7 @@ object UDFResolver extends Logging {
variableArity: Boolean): Unit = {
assert(argTypes.dataType.isInstanceOf[StructType])
val v =
- UDFMap.getOrElseUpdate(name, mutable.MutableList[UDFSignature]())
+ UDFMap.getOrElseUpdate(name, mutable.ListBuffer[UDFSignature]())
v += UDFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
@@ -189,7 +189,7 @@ object UDFResolver extends Logging {
}
val v =
- UDAFMap.getOrElseUpdate(name, mutable.MutableList[UDAFSignature]())
+ UDAFMap.getOrElseUpdate(name, mutable.ListBuffer[UDAFSignature]())
v += UDAFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index cd9819c3e8e7..80fd72909f42 100644
--- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -672,6 +672,16 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest {
}
}
+ test("Test E function") {
+ runQueryAndCompare("""SELECT E() from lineitem limit 100""".stripMargin) {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ runQueryAndCompare("""SELECT E(), l_orderkey
+ | from lineitem limit 100""".stripMargin) {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ }
+
test("Test spark_partition_id function") {
runQueryAndCompare("""SELECT spark_partition_id(), l_orderkey
| from lineitem limit 100""".stripMargin) {
diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 230fc565d9eb..b5a8e74a8653 100644
--- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1458,9 +1458,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
path =>
(0 to 3).toDF("x").write.parquet(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
- runQueryAndCompare(
- "SELECT x FROM view WHERE cast(x as timestamp) " +
- "IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')")(_)
+ runQueryAndCompare(s"""
+ |SELECT x FROM view
+ |WHERE cast(x as timestamp)
+ |IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')
+ |""".stripMargin)(_ => ())
}
}
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index 1b28c1569390..6ba29cf79999 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -200,7 +200,6 @@ function process_setup_alinux3 {
sed -i 's/python39 python39-devel python39-pip //g' scripts/setup-centos8.sh
sed -i "s/.*pip.* install/#&/" scripts/setup-centos8.sh
sed -i 's/ADDITIONAL_FLAGS=""/ADDITIONAL_FLAGS="-Wno-stringop-overflow"/g' scripts/setup-helper-functions.sh
- sed -i "s/\${CMAKE_INSTALL_LIBDIR}/lib64/" third_party/CMakeLists.txt
}
function process_setup_tencentos32 {
diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 172a6e8cca69..04bb9d8cf400 100644
--- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -104,7 +104,7 @@ private NamedStruct buildNamedStruct() {
for (StructField field : fileSchema.fields()) {
structBuilder.addTypes(
ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf());
- namedStructBuilder.addNames(field.name());
+ namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name()));
}
namedStructBuilder.setStruct(structBuilder.build());
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 805ff94900fe..6911d5ee8061 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -682,6 +682,8 @@ object ExpressionConverter extends SQLConfHelper with Logging {
t.children.map(replaceWithExpressionTransformerInternal(_, attributeSeq, expressionsMap)),
t
)
+ case e: EulerNumber =>
+ LiteralTransformer(Literal(Math.E))
case expr =>
GenericExpressionTransformer(
substraitExprName,
diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 7063c3f67b80..20b00601531f 100644
--- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -163,6 +163,10 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
+ // These properties can be set by the same thread in last query submission.
+ session.sparkContext.setLocalProperty("isNativeApplicable", null)
+ session.sparkContext.setLocalProperty("nativeFormat", null)
+ session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
if (BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)) {
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
@@ -170,7 +174,7 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
// FIXME: We should only use context property if having no other approaches.
// Should see if there is another way to pass these options.
- session.sparkContext.setLocalProperty("isNativeAppliable", format.isDefined.toString)
+ session.sparkContext.setLocalProperty("isNativeApplicable", format.isDefined.toString)
session.sparkContext.setLocalProperty("nativeFormat", format.getOrElse(""))
if (format.isDefined) {
injectFakeRowAdaptor(rc, child)
@@ -178,12 +182,6 @@ object GlutenWriterColumnarRules {
rc.withNewChildren(rc.children.map(apply))
}
} else {
- session.sparkContext.setLocalProperty(
- "staticPartitionWriteOnly",
- BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
- session.sparkContext.setLocalProperty("isNativeAppliable", "false")
- session.sparkContext.setLocalProperty("nativeFormat", "")
-
rc.withNewChildren(rc.children.map(apply))
}
case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index c6c4fcc5fa1f..ea3e50e81282 100644
--- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -39,6 +39,8 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM, "2")
.set(GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS, "2")
+ val scalaVersion = scala.util.Properties.versionNumberString
+
def generateNativePartition1(): Unit = {
val partition = FilePartition(
0,
@@ -97,7 +99,13 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)
- assertResult(Set("host-1", "host-4", "host-5")) {
+ val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
+ Set("host-1", "host-4", "host-5")
+ } else if (scalaVersion.startsWith("2.13")) {
+ Set("host-5", "host-4", "host-2")
+ }
+
+ assertResult(affinityResultSet) {
nativePartition.preferredLocations().toSet
}
}
@@ -184,7 +192,13 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations)
- assertResult(Set("host-1", "host-5", "host-6")) {
+ val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
+ Set("host-1", "host-5", "host-6")
+ } else if (scalaVersion.startsWith("2.13")) {
+ Set("host-6", "host-5", "host-2")
+ }
+
+ assertResult(affinityResultSet) {
nativePartition.preferredLocations().toSet
}
}
diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
index 442ae74bac98..8c6161e0c44c 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
@@ -119,7 +119,7 @@ object ArrowAbiUtil {
}
}
- def exportField(allocator: BufferAllocator, field: Field, out: ArrowSchema) {
+ def exportField(allocator: BufferAllocator, field: Field, out: ArrowSchema): Unit = {
val dictProvider = new CDataDictionaryProvider
try {
Data.exportField(allocator, field, dictProvider, out)
@@ -128,7 +128,7 @@ object ArrowAbiUtil {
}
}
- def exportSchema(allocator: BufferAllocator, schema: Schema, out: ArrowSchema) {
+ def exportSchema(allocator: BufferAllocator, schema: Schema, out: ArrowSchema): Unit = {
val dictProvider = new CDataDictionaryProvider
try {
Data.exportSchema(allocator, schema, dictProvider, out)
diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
index 014956d84e9c..ec6ac35af3e7 100644
--- a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
@@ -134,7 +134,7 @@ object SparkArrowUtil {
val dt = fromArrowField(child)
StructField(child.getName, dt, child.isNullable)
}
- StructType(fields)
+ StructType(fields.toSeq)
case arrowType => fromArrowType(arrowType)
}
}
@@ -147,7 +147,7 @@ object SparkArrowUtil {
}
def fromArrowSchema(schema: Schema): StructType = {
- StructType(schema.getFields.asScala.map {
+ StructType(schema.getFields.asScala.toSeq.map {
field =>
val dt = fromArrowField(field)
StructField(field.getName, dt, field.isNullable)
diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
index 76eb53dbd022..fed837d308be 100644
--- a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, N
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.FileFormat
-import scala.collection._
+import scala.collection.mutable.ListBuffer
class DeltaRewriteTransformerRules extends RewriteTransformerRules {
override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil
@@ -87,8 +87,8 @@ object DeltaRewriteTransformerRules {
)(SparkSession.active)
// transform output's name into physical name so Reader can read data correctly
// should keep the columns order the same as the origin output
- val originColumnNames = mutable.ListBuffer.empty[String]
- val transformedAttrs = mutable.ListBuffer.empty[Attribute]
+ val originColumnNames = ListBuffer.empty[String]
+ val transformedAttrs = ListBuffer.empty[Attribute]
def mapAttribute(attr: Attribute) = {
val newAttr = if (!plan.isMetadataColumn(attr)) {
DeltaColumnMapping
@@ -142,7 +142,7 @@ object DeltaRewriteTransformerRules {
val expr = (transformedAttrs, originColumnNames).zipped.map {
(attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId)
}
- val projectExecTransformer = ProjectExecTransformer(expr, scanExecTransformer)
+ val projectExecTransformer = ProjectExecTransformer(expr.toSeq, scanExecTransformer)
projectExecTransformer
case _ => plan
}
diff --git a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala
index b5ea3fc3cf6e..4b3a675cd843 100644
--- a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala
+++ b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/path/PathFinderSuite.scala
@@ -262,18 +262,18 @@ class PathFinderSuite extends AnyFunSuite {
assert(path.plan() == Binary(n1, Group(1), Group(2)))
assert(
- path.dive(state, 1).map(_.plan()) == List(
+ path.dive(state, 1).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Group(3)), Unary(n3, Group(4)))))
assert(
- path.dive(state, 2).map(_.plan()) == List(
+ path.dive(state, 2).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))),
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1)))))
assert(
- path.dive(state, 3).map(_.plan()) == List(
+ path.dive(state, 3).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))),
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1)))))
assert(
- path.dive(state, RasPath.INF_DEPTH).map(_.plan()) == List(
+ path.dive(state, RasPath.INF_DEPTH).map(_.plan()).toList == List(
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n5, 1))),
Binary(n1, Unary(n2, Leaf(n4, 1)), Unary(n3, Leaf(n6, 1)))))
}
@@ -338,13 +338,13 @@ class PathFinderSuite extends AnyFunSuite {
path.dive(state, 1).map(_.plan()).toSeq == List(
Binary(n1, Binary(n2, Group(3), Group(4)), Leaf(n3, 1))))
assert(
- path.dive(state, 2).map(_.plan()) == List(
+ path.dive(state, 2).map(_.plan()).toList == List(
Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1))))
assert(
- path.dive(state, 3).map(_.plan()) == List(
+ path.dive(state, 3).map(_.plan()).toList == List(
Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1))))
assert(
- path.dive(state, RasPath.INF_DEPTH).map(_.plan()) == List(
+ path.dive(state, RasPath.INF_DEPTH).map(_.plan()).toList == List(
Binary(n1, Binary(n2, Leaf(n4, 1), Leaf(n5, 1)), Leaf(n3, 1))))
}
}
diff --git a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala
index d27292fb5361..077921b697bf 100644
--- a/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala
+++ b/gluten-ras/common/src/test/scala/org/apache/gluten/ras/specific/CyclicSearchSpaceSuite.scala
@@ -65,11 +65,12 @@ abstract class CyclicSearchSpaceSuite extends AnyFunSuite {
PathFinder.builder(ras, mockState).depth(depth).build().find(can)
}
- assert(find(node1, 1).map(p => p.plan()) == List(Unary("node1", Group(0))))
- assert(find(node1, 2).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1))))
- assert(find(node1, 3).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1))))
+ assert(find(node1, 1).map(p => p.plan()).toList == List(Unary("node1", Group(0))))
+ assert(find(node1, 2).map(p => p.plan()).toList == List(Unary("node1", Leaf("node2", 1))))
+ assert(find(node1, 3).map(p => p.plan()).toList == List(Unary("node1", Leaf("node2", 1))))
assert(
- find(node1, RasPath.INF_DEPTH).map(p => p.plan()) == List(Unary("node1", Leaf("node2", 1))))
+ find(node1, RasPath.INF_DEPTH).map(p => p.plan()).toList == List(
+ Unary("node1", Leaf("node2", 1))))
}
test("Cyclic - find best, simple self cycle") {
diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
index a4b530e637af..da22e60f932d 100644
--- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
+++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
@@ -18,38 +18,32 @@ package org.apache.spark.sql
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions.{expr, input_file_name}
-import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType}
class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait {
- testGluten("input_file_name with scan is fallback") {
- withTempPath {
- dir =>
- val rawData = Seq(
- Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))),
- Row(2, "Bob", Seq(Row(Seq(4, 5)))),
- Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9))))
- )
- val schema = StructType(
- Array(
- StructField("id", IntegerType, nullable = false),
- StructField("name", StringType, nullable = false),
- StructField(
- "nested_column",
- ArrayType(
- StructType(Array(
- StructField("array_in_struct", ArrayType(IntegerType), nullable = true)
- ))),
- nullable = true)
- ))
- val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema)
- data.write.parquet(dir.getCanonicalPath)
+ import testImplicits._
+ testGluten(
+ "input_file_name, input_file_block_start and input_file_block_length " +
+ "should fall back if scan falls back") {
+ withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) {
+ withTempPath {
+ dir =>
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
- val q =
- spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column"))
- val firstRow = q.head()
- assert(firstRow.getString(0).contains(dir.toURI.getPath))
- val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
- assert(project.size == 1)
+ val q =
+ spark.read
+ .parquet(dir.getCanonicalPath)
+ .select(
+ input_file_name(),
+ expr("input_file_block_start()"),
+ expr("input_file_block_length()"))
+ val firstRow = q.head()
+ assert(firstRow.getString(0).contains(dir.toURI.getPath))
+ assert(firstRow.getLong(1) == 0)
+ assert(firstRow.getLong(2) > 0)
+ val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
+ assert(project.size == 1)
+ }
}
}
}
diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index 4b75ce13c067..6703e7dc4363 100644
--- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -69,20 +69,19 @@ import scala.util.Try
* The format for input files is simple:
* 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot
* effectively separate the SQL queries in the test file(e.g. bracketed comments), please use
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query,
- * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and
- * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as
- * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running
- * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or
- * you can use multiple
- * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test
- * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing
- * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1
- * belongs to dimension 1. One dimension can have multiple lines, each line representing one config
- * set (one or more configs, separated by comma). Spark will run this testing file many times, each
- * time picks one config set from each dimension, until all the combinations are tried. For example,
- * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
+ * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START
+ * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is
+ * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is
+ * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3.
+ * Lines starting with --SET are used to specify the configs when running this testing file. You can
+ * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET
+ * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5.
+ * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The
+ * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to
+ * dimension 1. One dimension can have multiple lines, each line representing one config set (one or
+ * more configs, separated by comma). Spark will run this testing file many times, each time picks
+ * one config set from each dimension, until all the combinations are tried. For example, if
+ * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
* (cartesian product).
*
* For example:
diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
index a4b530e637af..da22e60f932d 100644
--- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
+++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
@@ -18,38 +18,32 @@ package org.apache.spark.sql
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions.{expr, input_file_name}
-import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType}
class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait {
- testGluten("input_file_name with scan is fallback") {
- withTempPath {
- dir =>
- val rawData = Seq(
- Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))),
- Row(2, "Bob", Seq(Row(Seq(4, 5)))),
- Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9))))
- )
- val schema = StructType(
- Array(
- StructField("id", IntegerType, nullable = false),
- StructField("name", StringType, nullable = false),
- StructField(
- "nested_column",
- ArrayType(
- StructType(Array(
- StructField("array_in_struct", ArrayType(IntegerType), nullable = true)
- ))),
- nullable = true)
- ))
- val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema)
- data.write.parquet(dir.getCanonicalPath)
+ import testImplicits._
+ testGluten(
+ "input_file_name, input_file_block_start and input_file_block_length " +
+ "should fall back if scan falls back") {
+ withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) {
+ withTempPath {
+ dir =>
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
- val q =
- spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column"))
- val firstRow = q.head()
- assert(firstRow.getString(0).contains(dir.toURI.getPath))
- val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
- assert(project.size == 1)
+ val q =
+ spark.read
+ .parquet(dir.getCanonicalPath)
+ .select(
+ input_file_name(),
+ expr("input_file_block_start()"),
+ expr("input_file_block_length()"))
+ val firstRow = q.head()
+ assert(firstRow.getString(0).contains(dir.toURI.getPath))
+ assert(firstRow.getLong(1) == 0)
+ assert(firstRow.getLong(2) > 0)
+ val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
+ assert(project.size == 1)
+ }
}
}
}
diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index 4536aa54057c..b052528b80a0 100644
--- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -69,20 +69,19 @@ import scala.util.Try
* The format for input files is simple:
* 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot
* effectively separate the SQL queries in the test file(e.g. bracketed comments), please use
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query,
- * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and
- * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as
- * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running
- * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or
- * you can use multiple
- * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test
- * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing
- * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1
- * belongs to dimension 1. One dimension can have multiple lines, each line representing one config
- * set (one or more configs, separated by comma). Spark will run this testing file many times, each
- * time picks one config set from each dimension, until all the combinations are tried. For example,
- * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
+ * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START
+ * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is
+ * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is
+ * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3.
+ * Lines starting with --SET are used to specify the configs when running this testing file. You can
+ * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET
+ * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5.
+ * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The
+ * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to
+ * dimension 1. One dimension can have multiple lines, each line representing one config set (one or
+ * more configs, separated by comma). Spark will run this testing file many times, each time picks
+ * one config set from each dimension, until all the combinations are tried. For example, if
+ * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
* (cartesian product).
*
* For example:
diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
index a4b530e637af..da22e60f932d 100644
--- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
+++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
@@ -18,38 +18,32 @@ package org.apache.spark.sql
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions.{expr, input_file_name}
-import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType}
class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait {
- testGluten("input_file_name with scan is fallback") {
- withTempPath {
- dir =>
- val rawData = Seq(
- Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))),
- Row(2, "Bob", Seq(Row(Seq(4, 5)))),
- Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9))))
- )
- val schema = StructType(
- Array(
- StructField("id", IntegerType, nullable = false),
- StructField("name", StringType, nullable = false),
- StructField(
- "nested_column",
- ArrayType(
- StructType(Array(
- StructField("array_in_struct", ArrayType(IntegerType), nullable = true)
- ))),
- nullable = true)
- ))
- val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema)
- data.write.parquet(dir.getCanonicalPath)
+ import testImplicits._
+ testGluten(
+ "input_file_name, input_file_block_start and input_file_block_length " +
+ "should fall back if scan falls back") {
+ withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) {
+ withTempPath {
+ dir =>
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
- val q =
- spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column"))
- val firstRow = q.head()
- assert(firstRow.getString(0).contains(dir.toURI.getPath))
- val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
- assert(project.size == 1)
+ val q =
+ spark.read
+ .parquet(dir.getCanonicalPath)
+ .select(
+ input_file_name(),
+ expr("input_file_block_start()"),
+ expr("input_file_block_length()"))
+ val firstRow = q.head()
+ assert(firstRow.getString(0).contains(dir.toURI.getPath))
+ assert(firstRow.getLong(1) == 0)
+ assert(firstRow.getLong(2) > 0)
+ val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
+ assert(project.size == 1)
+ }
}
}
}
diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index 0ea1f13ec2ef..19d7ac1eb725 100644
--- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -70,20 +70,19 @@ import scala.util.Try
* The format for input files is simple:
* 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot
* effectively separate the SQL queries in the test file(e.g. bracketed comments), please use
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query,
- * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and
- * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as
- * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running
- * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or
- * you can use multiple
- * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test
- * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing
- * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1
- * belongs to dimension 1. One dimension can have multiple lines, each line representing one config
- * set (one or more configs, separated by comma). Spark will run this testing file many times, each
- * time picks one config set from each dimension, until all the combinations are tried. For example,
- * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
+ * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START
+ * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is
+ * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is
+ * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3.
+ * Lines starting with --SET are used to specify the configs when running this testing file. You can
+ * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET
+ * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5.
+ * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The
+ * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to
+ * dimension 1. One dimension can have multiple lines, each line representing one config set (one or
+ * more configs, separated by comma). Spark will run this testing file many times, each time picks
+ * one config set from each dimension, until all the combinations are tried. For example, if
+ * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
* (cartesian product).
*
* For example:
diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
index 3d277b94cc3e..38024f62c411 100644
--- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
+++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
@@ -151,7 +151,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case SortExecTransformer(
Seq(
@@ -168,7 +169,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case _ => false
},
@@ -233,7 +235,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case SortExecTransformer(
Seq(
@@ -250,7 +253,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case _ => false
},
diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
index 8a28c4e98a26..da22e60f932d 100644
--- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
+++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala
@@ -18,38 +18,32 @@ package org.apache.spark.sql
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions.{expr, input_file_name}
-import org.apache.spark.sql.types._
class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait {
- testGluten("input_file_name with scan is fallback") {
- withTempPath {
- dir =>
- val rawData = Seq(
- Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))),
- Row(2, "Bob", Seq(Row(Seq(4, 5)))),
- Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9))))
- )
- val schema = StructType(
- Array(
- StructField("id", IntegerType, nullable = false),
- StructField("name", StringType, nullable = false),
- StructField(
- "nested_column",
- ArrayType(
- StructType(Array(
- StructField("array_in_struct", ArrayType(IntegerType), nullable = true)
- ))),
- nullable = true)
- ))
- val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema)
- data.write.parquet(dir.getCanonicalPath)
+ import testImplicits._
+ testGluten(
+ "input_file_name, input_file_block_start and input_file_block_length " +
+ "should fall back if scan falls back") {
+ withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) {
+ withTempPath {
+ dir =>
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
- val q =
- spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column"))
- val firstRow = q.head()
- assert(firstRow.getString(0).contains(dir.toURI.getPath))
- val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
- assert(project.size == 1)
+ val q =
+ spark.read
+ .parquet(dir.getCanonicalPath)
+ .select(
+ input_file_name(),
+ expr("input_file_block_start()"),
+ expr("input_file_block_length()"))
+ val firstRow = q.head()
+ assert(firstRow.getString(0).contains(dir.toURI.getPath))
+ assert(firstRow.getLong(1) == 0)
+ assert(firstRow.getLong(2) > 0)
+ val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p }
+ assert(project.size == 1)
+ }
}
}
}
diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index b1f3945bf192..2f21b38f299e 100644
--- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -70,20 +70,19 @@ import scala.util.Try
* The format for input files is simple:
* 1. A list of SQL queries separated by semicolons by default. If the semicolon cannot
* effectively separate the SQL queries in the test file(e.g. bracketed comments), please use
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with
- * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END represent the beginning and end of a query,
- * respectively. Code that is not surrounded by lines that begin with --QUERY-DELIMITER-START and
- * --QUERY-DELIMITER-END is still separated by semicolons. 2. Lines starting with -- are treated as
- * comments and ignored. 3. Lines starting with --SET are used to specify the configs when running
- * this testing file. You can set multiple configs in one --SET, using comma to separate them. Or
- * you can use multiple
- * --SET statements. 4. Lines starting with --IMPORT are used to load queries from another test
- * file. 5. Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing
- * file. The dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1
- * belongs to dimension 1. One dimension can have multiple lines, each line representing one config
- * set (one or more configs, separated by comma). Spark will run this testing file many times, each
- * time picks one config set from each dimension, until all the combinations are tried. For example,
- * if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
+ * --QUERY-DELIMITER-START and --QUERY-DELIMITER-END. Lines starting with --QUERY-DELIMITER-START
+ * and --QUERY-DELIMITER-END represent the beginning and end of a query, respectively. Code that is
+ * not surrounded by lines that begin with --QUERY-DELIMITER-START and --QUERY-DELIMITER-END is
+ * still separated by semicolons. 2. Lines starting with -- are treated as comments and ignored. 3.
+ * Lines starting with --SET are used to specify the configs when running this testing file. You can
+ * set multiple configs in one --SET, using comma to separate them. Or you can use multiple --SET
+ * statements. 4. Lines starting with --IMPORT are used to load queries from another test file. 5.
+ * Lines starting with --CONFIG_DIM are used to specify config dimensions of this testing file. The
+ * dimension name is decided by the string after --CONFIG_DIM. For example, --CONFIG_DIM1 belongs to
+ * dimension 1. One dimension can have multiple lines, each line representing one config set (one or
+ * more configs, separated by comma). Spark will run this testing file many times, each time picks
+ * one config set from each dimension, until all the combinations are tried. For example, if
+ * dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file will be run 6 times
* (cartesian product).
*
* For example:
diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
index fcaf75a4d5c1..5fc887d8d410 100644
--- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
+++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
@@ -152,7 +152,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case SortExecTransformer(
Seq(
@@ -169,7 +170,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case _ => false
},
@@ -233,7 +235,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case SortExecTransformer(
Seq(
@@ -250,7 +253,8 @@ class GlutenV1WriteCommandSuite
),
false,
_,
- _) =>
+ _
+ ) =>
true
case _ => false
},
diff --git a/pom.xml b/pom.xml
index fcbd20175e16..160abea5c3d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
32.0.1-jre
2.27.2
- 3.5.9
+ 3.8.3
package
/*
@@ -146,6 +146,7 @@
2.13.8
2.13
+ 3.8.3
diff --git a/shims/spark32/pom.xml b/shims/spark32/pom.xml
index 07eb5a52ec44..68dbeaf22f70 100644
--- a/shims/spark32/pom.xml
+++ b/shims/spark32/pom.xml
@@ -43,13 +43,13 @@
org.apache.spark
- spark-catalyst_2.12
+ spark-catalyst_${scala.binary.version}
provided
true
org.apache.spark
- spark-core_2.12
+ spark-core_${scala.binary.version}
provided
true
diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a5c857103910..96a044c0cbbe 100644
--- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -148,9 +148,9 @@ object FileFormatWriter extends Logging {
numStaticPartitionCols: Int = 0): Set[String] = {
val nativeEnabled =
- "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
+ "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
val staticPartitionWriteOnly =
- "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
+ "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
if (nativeEnabled) {
logInfo("Use Gluten partition write for hive")
@@ -257,7 +257,7 @@ object FileFormatWriter extends Logging {
}
val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
- if ("parquet".equals(nativeFormat)) {
+ if ("parquet" == nativeFormat) {
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
} else {
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 34873c46b09e..619fa64ace6d 100644
--- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenOrcWriterInjects
.getInstance()
.inferSchema(sparkSession, Map.empty[String, String], files)
@@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
val nativeConf =
GlutenOrcWriterInjects
diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c6b383136590..42a63c7ebcd1 100644
--- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
@@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files)
} else { // the vanilla spark case
ParquetUtils.inferSchema(sparkSession, parameters, files)
@@ -210,14 +210,10 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
/** Returns whether the reader will return the rows as batch or not. */
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
- true
- } else {
- val conf = sparkSession.sessionState.conf
- conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
- schema.length <= conf.wholeStageMaxNumFields &&
- schema.forall(_.dataType.isInstanceOf[AtomicType])
- }
+ val conf = sparkSession.sessionState.conf
+ conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
}
override def vectorTypes(
diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 162dd342bcf0..eb0f6a5d97df 100644
--- a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -100,9 +100,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// Avoid referencing the outer object.
val fileSinkConfSer = fileSinkConf
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
- val isParquetFormat = nativeFormat.equals("parquet")
+ val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
diff --git a/shims/spark33/pom.xml b/shims/spark33/pom.xml
index cbf732fc48e1..13554ee27e13 100644
--- a/shims/spark33/pom.xml
+++ b/shims/spark33/pom.xml
@@ -43,13 +43,13 @@
org.apache.spark
- spark-catalyst_2.12
+ spark-catalyst_${scala.binary.version}
provided
true
org.apache.spark
- spark-core_2.12
+ spark-core_${scala.binary.version}
provided
true
diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index ebf45e76e74e..f5e932337c02 100644
--- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,9 +140,9 @@ object FileFormatWriter extends Logging {
numStaticPartitionCols: Int = 0): Set[String] = {
val nativeEnabled =
- "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
+ "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
val staticPartitionWriteOnly =
- "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
+ "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
if (nativeEnabled) {
logInfo("Use Gluten partition write for hive")
@@ -277,7 +277,7 @@ object FileFormatWriter extends Logging {
}
val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
- if ("parquet".equals(nativeFormat)) {
+ if ("parquet" == nativeFormat) {
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
} else {
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 49ac28d73322..9891f6851d00 100644
--- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options, files)
} else { // the vanilla spark case
OrcUtils.inferSchema(sparkSession, files, options)
@@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
val nativeConf =
GlutenOrcWriterInjects
diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b0573f68e46d..403e31c1cb30 100644
--- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
@@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files)
} else { // the vanilla spark case
ParquetUtils.inferSchema(sparkSession, parameters, files)
@@ -206,13 +206,9 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
/** Returns whether the reader will return the rows as batch or not. */
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
- true
- } else {
- val conf = sparkSession.sessionState.conf
- ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
- !WholeStageCodegenExec.isTooManyFields(conf, schema)
- }
+ val conf = sparkSession.sessionState.conf
+ ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
+ !WholeStageCodegenExec.isTooManyFields(conf, schema)
}
override def vectorTypes(
diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 7a824c43670d..b9c1622cbee5 100644
--- a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -97,9 +97,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// Avoid referencing the outer object.
val fileSinkConfSer = fileSinkConf
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
+ if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
- val isParquetFormat = nativeFormat.equals("parquet")
+ val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
diff --git a/shims/spark34/pom.xml b/shims/spark34/pom.xml
index 9bcd7a840674..9a9ee55a1f45 100644
--- a/shims/spark34/pom.xml
+++ b/shims/spark34/pom.xml
@@ -43,13 +43,13 @@
org.apache.spark
- spark-catalyst_2.12
+ spark-catalyst_${scala.binary.version}
provided
true
org.apache.spark
- spark-core_2.12
+ spark-core_${scala.binary.version}
provided
true