Skip to content

Commit

Permalink
Compiles with Spark 3.5 and Scala 2.12
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Nov 21, 2023
1 parent 291e460 commit 9d9fbd2
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import org.apache.spark.util.ExecutorManager

import java.io.Serializable

import scala.collection.{immutable, mutable}
import scala.collection.compat._
import scala.collection.compat.immutable.ArraySeq
import scala.collection.mutable
import scala.language.implicitConversions

trait BaseGlutenPartition extends Partition with InputPartition {
def plan: Array[Byte]
Expand Down Expand Up @@ -122,7 +125,7 @@ class GlutenWholeStageColumnarRDD(
}

override def getPreferredLocations(split: Partition): Seq[String] = {
immutable.ArraySeq.unsafeWrapArray(castNativePartition(split)._1.preferredLocations())
ArraySeq.unsafeWrapArray(castNativePartition(split)._1.preferredLocations())
}

override protected def getPartitions: Array[Partition] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.util.collection.BitSet

import scala.collection.immutable
import scala.collection.compat._
import scala.collection.compat.immutable.ArraySeq
import scala.language.implicitConversions

case class InputPartitionsUtil(
relation: HadoopFsRelation,
Expand All @@ -49,7 +51,7 @@ case class InputPartitionsUtil(
val openCostInBytes = relation.sparkSession.sessionState.conf.filesOpenCostInBytes
val maxSplitBytes = FilePartition.maxSplitBytes(
relation.sparkSession,
immutable.ArraySeq.unsafeWrapArray(selectedPartitions))
ArraySeq.unsafeWrapArray(selectedPartitions))
logInfo(
s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
Expand All @@ -76,7 +78,7 @@ case class InputPartitionsUtil(

FilePartition.getFilePartitions(
relation.sparkSession,
immutable.ArraySeq.unsafeWrapArray(splitFiles),
ArraySeq.unsafeWrapArray(splitFiles),
maxSplitBytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
import org.apache.spark.sql.vectorized.ColumnarBatch

import scala.collection.immutable
import scala.collection.compat._
import scala.collection.compat.immutable.ArraySeq
import scala.language.implicitConversions

private case class FakeRowLogicAdaptor(child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
Expand Down Expand Up @@ -159,15 +161,15 @@ object GlutenWriterColumnarRules {
// if the child is columnar, we can just wrap&transfer the columnar data
case c2r: ColumnarToRowExecBase =>
rc.withNewChildren(
immutable.ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(c2r.child)))
ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(c2r.child)))
)
// If the child is aqe, we make aqe "support columnar",
// then aqe itself will guarantee to generate columnar outputs.
// So FakeRowAdaptor will always consumes columnar data,
// thus avoiding the case of c2r->aqe->r2c->writer
case aqe: AdaptiveSparkPlanExec =>
rc.withNewChildren(
immutable.ArraySeq.unsafeWrapArray(
ArraySeq.unsafeWrapArray(
Array(
FakeRowAdaptor(
AdaptiveSparkPlanExec(
Expand All @@ -178,7 +180,7 @@ object GlutenWriterColumnarRules {
supportsColumnar = true
)))))
case other =>
rc.withNewChildren(immutable.ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(other))))
rc.withNewChildren(ArraySeq.unsafeWrapArray(Array(FakeRowAdaptor(other))))
}
} else {
rc.withNewChildren(rc.children.map(apply))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.commons.lang3.StringUtils
import java.io.{File, IOException}
import java.nio.file.Paths

import scala.collection.compat.immutable.LazyList

/**
* Manages Gluten's local directories, for storing jars, libs, spill files, or other temporary
* stuffs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.sql.types.{DoubleType, StructType}
import java.io.File
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.immutable
import scala.collection.compat._
import scala.collection.compat.immutable.ArraySeq
import scala.io.Source
import scala.reflect.ClassTag

Expand Down Expand Up @@ -172,7 +173,7 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa
WholeStageTransformerSuite.checkFallBack(df, noFallBack)
}
customCheck(df)
immutable.ArraySeq.unsafeWrapArray(result)
ArraySeq.unsafeWrapArray(result)
}

def checkLengthAndPlan(df: DataFrame, len: Int = 100): Unit = {
Expand Down Expand Up @@ -252,7 +253,7 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa
var expected: Seq[Row] = null
withSQLConf(vanillaSparkConfs(): _*) {
val df = spark.sql(sqlStr)
expected = immutable.ArraySeq.unsafeWrapArray(df.collect())
expected = ArraySeq.unsafeWrapArray(df.collect())
}
val df = spark.sql(sqlStr)
if (cache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.scalatest.Assertions

import java.util.TimeZone

import scala.collection.immutable
import scala.collection.compat._
import scala.collection.compat.immutable.ArraySeq
import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe

Expand Down Expand Up @@ -132,7 +133,7 @@ abstract class GlutenQueryTest extends PlanTest {
}

protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, immutable.ArraySeq.unsafeWrapArray(expectedAnswer.collect()))
checkAnswer(df, ArraySeq.unsafeWrapArray(expectedAnswer.collect()))
}

/**
Expand Down

0 comments on commit 9d9fbd2

Please sign in to comment.