Skip to content

Commit

Permalink
Work a bit on getting 3.4 working again.
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Nov 17, 2023
1 parent 88d944f commit 1a97ffc
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExp
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -64,7 +65,8 @@ trait SparkShims {

def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
// Hack -- the type changes in a non-backwards compatible way
file: Object,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs._

class Spark34Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
Expand All @@ -51,9 +51,8 @@ class Spark34Shims extends SparkShims {
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
}

override def structFromAttributes(
attrs: Seq[Attribute]): StructType = {
StructType.fromAttributes(attrs)
override def structFromAttributes(attrs: Seq[Attribute]): StructType = {
StructType(attrs.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}

override def expressionMappings: Seq[Sig] = {
Expand Down Expand Up @@ -142,15 +141,15 @@ class Spark34Shims extends SparkShims {
cause = null)
}
override def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
sparkSession: SparkSession,
file: Object,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(
sparkSession,
file,
file.asInstanceOf[FileStatus],
filePath,
isSplitable,
maxSplitBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,16 @@ class Spark35Shims extends SparkShims {
}
override def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
file: Object,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues)
PartitionedFileUtil.splitFiles(
sparkSession,
file.asInstanceOf[FileStatusWithMetadata],
isSplitable,
maxSplitBytes,
partitionValues)
}
}

0 comments on commit 1a97ffc

Please sign in to comment.