From 4d1f33534533107501fdb8ac05dc32d02a40a1af Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 17 Nov 2023 22:49:02 -0800 Subject: [PATCH] Compiles with Spark 3.5 and Scala 2.12 --- .../execution/GlutenWholeStageColumnarRDD.scala | 7 +++++-- .../io/glutenproject/utils/InputPartitionsUtil.scala | 8 +++++--- .../datasources/GlutenWriterColumnarRules.scala | 10 ++++++---- .../org/apache/spark/util/SparkDirectoryUtil.scala | 2 ++ .../execution/WholeStageTransformerSuite.scala | 7 ++++--- .../scala/org/apache/spark/sql/GlutenQueryTest.scala | 5 +++-- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala index a758e1d95c176..9c07eba186654 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala @@ -32,7 +32,10 @@ import org.apache.spark.util.ExecutorManager import java.io.Serializable -import scala.collection.{immutable, mutable} +import scala.collection.compat._ +import scala.collection.compat.immutable.ArraySeq +import scala.collection.mutable +import scala.language.implicitConversions trait BaseGlutenPartition extends Partition with InputPartition { def plan: Array[Byte] @@ -122,7 +125,7 @@ class GlutenWholeStageColumnarRDD( } override def getPreferredLocations(split: Partition): Seq[String] = { - immutable.ArraySeq.unsafeWrapArray(castNativePartition(split)._1.preferredLocations()) + ArraySeq.unsafeWrapArray(castNativePartition(split)._1.preferredLocations()) } override protected def getPartitions: Array[Partition] = { diff --git a/gluten-core/src/main/scala/io/glutenproject/utils/InputPartitionsUtil.scala b/gluten-core/src/main/scala/io/glutenproject/utils/InputPartitionsUtil.scala index 73458f91d30f0..e9d309eb568b0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/utils/InputPartitionsUtil.scala +++ b/gluten-core/src/main/scala/io/glutenproject/utils/InputPartitionsUtil.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory} import org.apache.spark.util.collection.BitSet -import scala.collection.immutable +import scala.collection.compat._ +import scala.collection.compat.immutable.ArraySeq +import scala.language.implicitConversions case class InputPartitionsUtil( relation: HadoopFsRelation, @@ -49,7 +51,7 @@ case class InputPartitionsUtil( val openCostInBytes = relation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes( relation.sparkSession, - immutable.ArraySeq.unsafeWrapArray(selectedPartitions)) + ArraySeq.unsafeWrapArray(selectedPartitions)) logInfo( s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") @@ -76,7 +78,7 @@ case class InputPartitionsUtil( FilePartition.getFilePartitions( relation.sparkSession, - immutable.ArraySeq.unsafeWrapArray(splitFiles), + ArraySeq.unsafeWrapArray(splitFiles), maxSplitBytes) } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 264a69883a2b8..a81e3a8a21654 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -35,7 +35,9 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} import org.apache.spark.sql.vectorized.ColumnarBatch -import scala.collection.immutable +import scala.collection.compat._ +import scala.collection.compat.immutable.ArraySeq +import scala.language.implicitConversions private case class FakeRowLogicAdaptor(child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output @@ -159,7 +161,7 @@ object GlutenWriterColumnarRules { // if the child is columnar, we can just wrap&transfer the columnar data case c2r: ColumnarToRowExecBase => rc.withNewChildren( - immutable.ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(c2r.child))) + ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(c2r.child))) ) // If the child is aqe, we make aqe "support columnar", // then aqe itself will guarantee to generate columnar outputs. @@ -167,7 +169,7 @@ object GlutenWriterColumnarRules { // thus avoiding the case of c2r->aqe->r2c->writer case aqe: AdaptiveSparkPlanExec => rc.withNewChildren( - immutable.ArraySeq.unsafeWrapArray( + ArraySeq.unsafeWrapArray( Array( FakeRowAdaptor( AdaptiveSparkPlanExec( @@ -178,7 +180,7 @@ object GlutenWriterColumnarRules { supportsColumnar = true ))))) case other => - rc.withNewChildren(immutable.ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(other)))) + rc.withNewChildren(ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(other)))) } } else { rc.withNewChildren(rc.children.map(apply)) diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala index 2cfe82c41665c..b70587360280c 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala @@ -26,6 +26,8 @@ import org.apache.commons.lang3.StringUtils import java.io.{File, IOException} import java.nio.file.Paths +import scala.collection.compat.immutable.LazyList + /** * Manages Gluten's local directories, for storing jars, libs, spill files, or other temporary * stuffs. diff --git a/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala b/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala index 6644ebd5d0a50..34f9ae51a6263 100644 --- a/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala +++ b/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.types.{DoubleType, StructType} import java.io.File import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.immutable +import scala.collection.compat._ +import scala.collection.compat.immutable.ArraySeq import scala.io.Source import scala.reflect.ClassTag @@ -172,7 +173,7 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa WholeStageTransformerSuite.checkFallBack(df, noFallBack) } customCheck(df) - immutable.ArraySeq.unsafeWrapArray(result) + ArraySeq.unsafeWrapArray(result) } def checkLengthAndPlan(df: DataFrame, len: Int = 100): Unit = { @@ -252,7 +253,7 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa var expected: Seq[Row] = null withSQLConf(vanillaSparkConfs(): _*) { val df = spark.sql(sqlStr) - expected = immutable.ArraySeq.unsafeWrapArray(df.collect()) + expected = ArraySeq.unsafeWrapArray(df.collect()) } val df = spark.sql(sqlStr) if (cache) { diff --git a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala index 66cdf372bcd22..bd85da7b78d6f 100644 --- a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala +++ b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala @@ -34,7 +34,8 @@ import org.scalatest.Assertions import java.util.TimeZone -import scala.collection.immutable +import scala.collection.compat._ +import scala.collection.compat.immutable.ArraySeq import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe @@ -132,7 +133,7 @@ abstract class GlutenQueryTest extends PlanTest { } protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = { - checkAnswer(df, immutable.ArraySeq.unsafeWrapArray(expectedAnswer.collect())) + checkAnswer(df, ArraySeq.unsafeWrapArray(expectedAnswer.collect())) } /**