Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3361] Support spark 3.4 in Gluten #3360

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ jobs:
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=30g -s=10.0 --threads=32 --iterations=1'
- name: Build for Spark 3.4.1
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.4 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=30g -s=10.0 --threads=32 --iterations=1'
- name: Exit docker container
if: ${{ always() }}
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
val partitionColumns = mutable.ArrayBuffer.empty[Map[String, String]]
files.foreach {
file =>
paths.append(URLDecoder.decode(file.filePath, StandardCharsets.UTF_8.name()))
paths.append(URLDecoder.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.append(java.lang.Long.valueOf(file.start))
lengths.append(java.lang.Long.valueOf(file.length))

Expand Down
1 change: 1 addition & 0 deletions dev/buildbundle-veloxbe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ source "$BASEDIR/builddeps-veloxbe.sh"
cd $GLUTEN_DIR
mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.4 -DskipTests
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, Express
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode}
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.expression.ExpressionNode
Expand All @@ -45,7 +46,6 @@ import scala.collection.JavaConverters._
abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkPlan)
extends UnaryTransformSupport
with PredicateHelper
with AliasAwareOutputPartitioning
with Logging {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -104,8 +104,6 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
}
}

override protected def outputExpressions: Seq[NamedExpression] = output

override def output: Seq[Attribute] = {
child.output.map {
a =>
Expand Down Expand Up @@ -188,7 +186,6 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
case class ProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryTransformSupport
with PredicateHelper
with AliasAwareOutputPartitioning
with Logging {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -304,8 +301,6 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().")
}

override protected def outputExpressions: Seq[NamedExpression] = projectList

override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
copy(child = newChild)
}
Expand Down Expand Up @@ -505,7 +500,11 @@ object FilterHandler {
getLeftFilters(scan.dataFilters, flattenCondition(plan.condition))
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery)
new BatchScanExecTransformer(batchScan.output, scan, leftFilters ++ newPartitionFilters)
new BatchScanExecTransformer(
batchScan.output,
scan,
leftFilters ++ newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan))
case _ =>
if (batchScan.runtimeFilters.isEmpty) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -41,8 +44,13 @@ class BatchScanExecTransformer(
output: Seq[AttributeReference],
@transient scan: Scan,
runtimeFilters: Seq[Expression],
keyGroupedPartitioning: Option[Seq[Expression]] = None)
extends BatchScanExecShim(output, scan, runtimeFilters)
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None,
@transient table: Table,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false)
extends BatchScanExecShim(output, scan, runtimeFilters, table)
with BasicScanExecTransformer {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -117,7 +125,8 @@ class BatchScanExecTransformer(
new BatchScanExecTransformer(
canonicalized.output,
canonicalized.scan,
canonicalized.runtimeFilters
canonicalized.runtimeFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(canonicalized)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.ConverterUtils
import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.{GlutenTimeMetric, MetricsUpdater}
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import io.glutenproject.substrait.rel.ReadRelNode

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, PlanExpression, Predicate}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExpression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.{FileSourceScanExecShim, InSubqueryExec, ScalarSubquery, SQLExecution}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.collection.BitSet

import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.{mutable, JavaConverters}
import scala.collection.JavaConverters

class FileSourceScanExecTransformer(
@transient relation: HadoopFsRelation,
Expand All @@ -64,10 +62,10 @@ class FileSourceScanExecTransformer(
// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics: Map[String, SQLMetric] =
BackendsApiManager.getMetricsApiInstance
.genFileSourceScanTransformerMetrics(sparkContext) ++ staticMetrics
.genFileSourceScanTransformerMetrics(sparkContext) ++ staticMetricsAlias

/** SQL metrics generated only for scans using dynamic partition pruning. */
private lazy val staticMetrics =
private lazy val staticMetricsAlias =
if (partitionFilters.exists(FileSourceScanExecTransformer.isDynamicPruningFilter)) {
Map(
"staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"),
Expand Down Expand Up @@ -135,96 +133,6 @@ class FileSourceScanExecTransformer(
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics)

// The codes below are copied from FileSourceScanExec in Spark,
// all of them are private.
protected lazy val driverMetrics: mutable.HashMap[String, Long] = mutable.HashMap.empty

/**
* Send the driver-side metrics. Before calling this function, selectedPartitions has been
* initialized. See SPARK-26327 for more details.
*/
protected def sendDriverMetrics(): Unit = {
driverMetrics.foreach(e => metrics(e._1).add(e._2))
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
executionId,
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
}

protected def setFilesNumAndSizeMetric(
partitions: Seq[PartitionDirectory],
static: Boolean): Unit = {
val filesNum = partitions.map(_.files.size.toLong).sum
val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
if (!static || !partitionFilters.exists(FileSourceScanExecTransformer.isDynamicPruningFilter)) {
driverMetrics("numFiles") = filesNum
driverMetrics("filesSize") = filesSize
} else {
driverMetrics("staticFilesNum") = filesNum
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
}
}

@transient override lazy val selectedPartitions: Array[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
GlutenTimeMetric.withNanoTime {
val ret =
relation.location.listFiles(
partitionFilters.filterNot(FileSourceScanExecTransformer.isDynamicPruningFilter),
dataFilters)
setFilesNumAndSizeMetric(ret, static = true)
ret
}(t => driverMetrics("metadataTime") = NANOSECONDS.toMillis(t + optimizerMetadataTimeNs))
}.toArray

// We can only determine the actual partitions at runtime when a dynamic partition filter is
// present. This is because such a filter relies on information that is only available at run
// time (for instance the keys used in the other side of a join).
@transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
val dynamicPartitionFilters =
partitionFilters.filter(FileSourceScanExecTransformer.isDynamicPruningFilter)
val selected = if (dynamicPartitionFilters.nonEmpty) {
// When it includes some DynamicPruningExpression,
// it needs to execute InSubqueryExec first,
// because doTransform path can't execute 'doExecuteColumnar' which will
// execute prepare subquery first.
dynamicPartitionFilters.foreach {
case DynamicPruningExpression(inSubquery: InSubqueryExec) =>
executeInSubqueryForDynamicPruningExpression(inSubquery)
case e: Expression =>
e.foreach {
case s: ScalarSubquery => s.updateResult()
case _ =>
}
case _ =>
}
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except dynamic partition filters
val predicate = dynamicPartitionFilters.reduce(And)
val partitionColumns = relation.partitionSchema
val boundPredicate = Predicate.create(
predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
},
Nil
)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
}(t => driverMetrics("pruningTime") = t)
} else {
selectedPartitions
}
sendDriverMetrics()
selected
}

override val nodeNamePrefix: String = "NativeFile"

override val nodeName: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.glutenproject.execution._
import io.glutenproject.expression.ExpressionConverter
import io.glutenproject.extension.columnar._
import io.glutenproject.metrics.GlutenTimeMetric
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.{ColumnarShuffleUtil, LogLevelUtil, PhysicalPlanSelector}

import org.apache.spark.api.python.EvalPythonExecTransformer
Expand Down Expand Up @@ -578,7 +579,12 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
case _ =>
ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)
}
val transformer = new BatchScanExecTransformer(plan.output, plan.scan, newPartitionFilters)
val transformer = new BatchScanExecTransformer(
plan.output,
plan.scan,
newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan))

val validationResult = transformer.doValidate()
if (validationResult.isValid) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution._
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.PhysicalPlanSelector

import org.apache.spark.api.python.EvalPythonExecTransformer
Expand Down Expand Up @@ -333,8 +334,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
if (plan.runtimeFilters.nonEmpty) {
TransformHints.tagTransformable(plan)
} else {
val transformer =
new BatchScanExecTransformer(plan.output, plan.scan, plan.runtimeFilters)
val transformer = new BatchScanExecTransformer(
plan.output,
plan.scan,
plan.runtimeFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan))
TransformHints.tag(plan, transformer.doValidate().toTransformHint)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
private val driverEndpoint: RpcEndpointRef =
rpcEnv.setupEndpoint(GlutenRpcConstants.GLUTEN_DRIVER_ENDPOINT_NAME, this)

// TODO(yuan): get thread cnt from spark context
override def threadCount(): Int = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lwz9103 @zzcclp will this change threadCount() impact CK backend?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default value is 1 , right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ok to me.

override def receive: PartialFunction[Any, Unit] = {
case GlutenOnExecutionStart(executionId) =>
if (executionId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
@volatile var driverEndpointRef: RpcEndpointRef = null

rpcEnv.setupEndpoint(GlutenRpcConstants.GLUTEN_EXECUTOR_ENDPOINT_NAME, this)

// TODO(yuan): get thread cnt from spark context
override def threadCount(): Int = 1
override def onStart(): Unit = {
rpcEnv
.asyncSetupEndpointRefByURI(driverUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ object SoftAffinityUtil extends LogLevelUtil with Logging {
// using SoftAffinityManager to generate target executors.
// Only using the first file to calculate the target executors
// Only get one file to calculate the target host
val file = filePartition.files.sortBy(_.filePath).head
val locations = SoftAffinityManager.askExecutors(file.filePath)
val file = filePartition.files.sortBy(_.filePath.toString).head
val locations = SoftAffinityManager.askExecutors(file.filePath.toString)
if (!locations.isEmpty) {
logOnLevel(
softAffinityLogLevel,
Expand Down
Loading
Loading