Skip to content

Commit

Permalink
use service loader
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Nov 29, 2023
1 parent 4a597b9 commit af544ab
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import io.glutenproject.substrait.`type`.ColumnTypeNode
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.plan.PlanBuilder
import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo}
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.collect.Lists
import scala.collection.JavaConverters._

import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import scala.collection.JavaConverters._

trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

/**
* Data sources v2 transformer should implement this trait so that they can register an alias to
* their data source v2 transformer. This allows users to give the data source v2 transformer alias
* as the format type over the fully qualified class name.
*/
trait DataSourceV2TransformerRegister {

/**
* The scan class name that this data source v2 transformer provider adapts. This is overridden by
* children to provide a alias for the data source v2 transformer. For example:
*
* {{{
* override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
* }}}
*/
def scanClassName(): String

def createDataSourceV2Transformer(
batchScan: BatchScanExec,
partitionFilters: Seq[Expression]): BatchScanExecTransformer
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}

import scala.reflect.runtime.{universe => ru}
import java.util.ServiceLoader
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

object ScanTransformerFactory {

private val IcebergScanClassName = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
private val IcebergTransformerClassName = "io.glutenproject.execution.IcebergScanTransformer"
private val dataSourceV2TransformerMap = new ConcurrentHashMap[String, Class[_]]()

def createFileSourceScanTransformer(
scanExec: FileSourceScanExec,
Expand Down Expand Up @@ -87,8 +89,12 @@ object ScanTransformerFactory {
}
val scan = batchScanExec.scan
scan match {
case _ if scan.getClass.getName == IcebergScanClassName =>
createBatchScanTransformer(IcebergTransformerClassName, batchScanExec, newPartitionFilters)
case _ if dataSourceV2TransformerExists(scan.getClass.getName) =>
val cls = lookupDataSourceV2Transformer(scan.getClass.getName)
cls
.newInstance()
.asInstanceOf[DataSourceV2TransformerRegister]
.createDataSourceV2Transformer(batchScanExec, newPartitionFilters)
case _ =>
new BatchScanExecTransformer(
batchScanExec.output,
Expand All @@ -100,18 +106,30 @@ object ScanTransformerFactory {

def supportedBatchScan(scan: Scan): Boolean = scan match {
case _: FileScan => true
case _ if scan.getClass.getName == IcebergScanClassName => true
case _ if dataSourceV2TransformerExists(scan.getClass.getName) => true
case _ => false
}

private def createBatchScanTransformer(
className: String,
params: Any*): BatchScanExecTransformer = {
val classMirror = ru.runtimeMirror(getClass.getClassLoader)
val classModule = classMirror.staticModule(className)
val mirror = classMirror.reflectModule(classModule)
val apply = mirror.symbol.typeSignature.member(ru.TermName("apply")).asMethod
val objMirror = classMirror.reflect(mirror.instance)
objMirror.reflectMethod(apply)(params: _*).asInstanceOf[BatchScanExecTransformer]
private def lookupDataSourceV2Transformer(scanClassName: String): Class[_] = {
dataSourceV2TransformerMap.computeIfAbsent(
scanClassName,
_ => {
val loader = Option(Thread.currentThread().getContextClassLoader)
.getOrElse(getClass.getClassLoader)
val serviceLoader = ServiceLoader.load(classOf[DataSourceV2TransformerRegister], loader)
serviceLoader.asScala
.filter(_.scanClassName().equalsIgnoreCase(scanClassName))
.toList match {
case head :: Nil =>
// there is exactly one registered alias
head.getClass
case _ => null
}
}
)
}

private def dataSourceV2TransformerExists(scanClassName: String): Boolean = {
lookupDataSourceV2Transformer(scanClassName) != null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.glutenproject.execution.IcebergTransformerProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

class IcebergTransformerProvider extends DataSourceV2TransformerRegister {

override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"

override def createDataSourceV2Transformer(
batchScan: BatchScanExec,
partitionFilters: Seq[Expression]): BatchScanExecTransformer = {
IcebergScanTransformer.apply(batchScan, partitionFilters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.softaffinity.SoftAffinityUtil
import org.apache.spark.sql.connector.read.{InputPartition, Scan}

import org.apache.iceberg.{FileFormat, FileScanTask, ScanTask}
import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask}

import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}

import scala.collection.JavaConverters._

object GlutenIcebergSourceUtil {

def genSplitInfo(inputPartition: InputPartition, index: Int): SplitInfo = inputPartition match {
case partition: SparkInputPartition =>
val paths = new JArrayList[String]()
Expand All @@ -39,54 +40,49 @@ object GlutenIcebergSourceUtil {
var fileFormat = ReadFileFormat.UnknownFormat

val tasks = partition.taskGroup[ScanTask]().tasks().asScala
if (tasks.forall(_.isInstanceOf[FileScanTask])) {
tasks.map(_.asInstanceOf[FileScanTask]).foreach {
task =>
paths.add(task.file().path().toString)
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(new JHashMap[String, String]())
val currentFileFormat = task.file().format() match {
case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
case FileFormat.ORC => ReadFileFormat.OrcReadFormat
case _ =>
throw new UnsupportedOperationException(
"Iceberg Only support parquet and orc file format.")
}
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
} else if (fileFormat != currentFileFormat) {
asFileScanTask(tasks.toList).foreach {
task =>
paths.add(task.file().path().toString)
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(new JHashMap[String, String]())
val currentFileFormat = task.file().format() match {
case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
case FileFormat.ORC => ReadFileFormat.OrcReadFormat
case _ =>
throw new UnsupportedOperationException(
s"Only one file format is supported, " +
s"find different file format $fileFormat and $currentFileFormat")
}
}
val preferredLoc = SoftAffinityUtil.getFilePartitionLocations(
paths.asScala.toArray,
inputPartition.preferredLocations())
IcebergLocalFilesBuilder.makeIcebergLocalFiles(
index,
paths,
starts,
lengths,
partitionColumns,
fileFormat,
preferredLoc.toList.asJava
)
} else {
throw new UnsupportedOperationException("Only support iceberg FileScanTask.")
"Iceberg Only support parquet and orc file format.")
}
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
} else if (fileFormat != currentFileFormat) {
throw new UnsupportedOperationException(
s"Only one file format is supported, " +
s"find different file format $fileFormat and $currentFileFormat")
}
}
val preferredLoc = SoftAffinityUtil.getFilePartitionLocations(
paths.asScala.toArray,
inputPartition.preferredLocations())
IcebergLocalFilesBuilder.makeIcebergLocalFiles(
index,
paths,
starts,
lengths,
partitionColumns,
fileFormat,
preferredLoc.toList.asJava
)
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.")
}

def getFileFormat(sparkScan: Scan): ReadFileFormat = sparkScan match {
case scan: SparkBatchQueryScan =>
val tasks = scan.tasks().asScala
tasks.map(_.asCombinedScanTask()).foreach {
asFileScanTask(tasks.toList).foreach {
task =>
val file = task.files().asScala.head.file()
file.format() match {
task.file().format() match {
case FileFormat.PARQUET => return ReadFileFormat.ParquetReadFormat
case FileFormat.ORC => return ReadFileFormat.OrcReadFormat
case _ =>
Expand All @@ -97,4 +93,14 @@ object GlutenIcebergSourceUtil {
throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.")
}

private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = {
if (tasks.forall(_.isFileScanTask)) {
tasks.map(_.asFileScanTask())
} else if (tasks.forall(_.isInstanceOf[CombinedScanTask])) {
tasks.flatMap(_.asCombinedScanTask().tasks().asScala)
} else {
throw new UnsupportedOperationException(
"Only support iceberg CombinedScanTask and FileScanTask.")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.SparkConf

class VeloxIcebergSuite extends WholeStageTransformerSuite {

protected val rootPath: String = getClass.getResource("/").getPath
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox")
}

test("iceberg transformer exists") {
spark.sql("""
|create table iceberg_tb using iceberg as
|(select 1 as col1, 2 as col2, 3 as col3)
|""".stripMargin)

runQueryAndCompare("""
|select * from iceberg_tb;
|""".stripMargin) {
checkOperatorMatch[IcebergScanTransformer]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,31 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
(table, tableDF)
}.toMap
}

test("iceberg transformer exists") {
runQueryAndCompare("""
|SELECT
| l_orderkey,
| o_orderdate
|FROM
| orders,
| lineitem
|WHERE
| l_orderkey = o_orderkey
|ORDER BY
| l_orderkey,
| o_orderdate
|LIMIT
| 10;
|""".stripMargin) {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[IcebergScanTransformer]
}) == 2)
}
}
}
}

0 comments on commit af544ab

Please sign in to comment.