Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49249][SPARK-49122] Artifact isolation in Spark Classic #48120

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fe143d6
make it work for sql
xupefei Sep 12, 2024
e2597c1
REPL
xupefei Sep 16, 2024
827e01e
.
xupefei Sep 16, 2024
73d13f9
.
xupefei Sep 19, 2024
caa4251
revert fmt change
xupefei Sep 23, 2024
5043ce3
try fix
xupefei Sep 25, 2024
3c8fef5
.
xupefei Sep 25, 2024
255e85d
the other way around
xupefei Sep 25, 2024
7e3ecfe
mima
xupefei Sep 25, 2024
a9c20e0
OOPS
xupefei Sep 25, 2024
355bea8
fmt
xupefei Sep 25, 2024
6564ccf
Merge remote-tracking branch 'databricks/master' into session-artifac…
xupefei Sep 25, 2024
06a658c
handle hive
xupefei Sep 25, 2024
225ec6f
fix addjar, break connect repl
xupefei Sep 26, 2024
e4f5a5c
ugly fix for streaming
xupefei Sep 26, 2024
7630e2f
clone
xupefei Oct 2, 2024
7b8f1da
.
xupefei Oct 2, 2024
786d48f
.
xupefei Oct 2, 2024
1849ac5
Merge branch 'clone-artifact-manager' into session-artifact-apply
xupefei Oct 2, 2024
a0ae922
undo
xupefei Oct 2, 2024
bfa6d85
address comments
xupefei Oct 3, 2024
fe7947f
address comments
xupefei Oct 4, 2024
04a5bb2
Merge branch 'clone-artifact-manager' into session-artifact-apply
xupefei Oct 4, 2024
24f99a5
.
xupefei Oct 4, 2024
39a8086
wip
xupefei Oct 7, 2024
7cce314
address comment
xupefei Oct 7, 2024
80289b8
.
xupefei Oct 8, 2024
4542a21
rvt
xupefei Oct 8, 2024
0b021d9
fix (hopefully) all tests
xupefei Oct 9, 2024
97c7d6c
remove reuse code
xupefei Oct 9, 2024
aa9c21d
.
xupefei Oct 9, 2024
a2849f8
fix pyspark
xupefei Oct 9, 2024
508ee7b
disable hive
xupefei Oct 9, 2024
fdcb05b
omg
xupefei Oct 9, 2024
c9cf1a2
why so slow
xupefei Oct 10, 2024
be49405
why so slow try 2
xupefei Oct 10, 2024
5c15612
Merge remote-tracking branch 'origin/clone-artifact-manager' into ses…
xupefei Oct 10, 2024
3899b22
make streaming great again
xupefei Oct 10, 2024
d8ec1d3
.
xupefei Oct 10, 2024
3bcda6d
optimizzzzze
xupefei Oct 11, 2024
216b467
lemme try if this can make things faster
xupefei Oct 11, 2024
4de3ce8
cache AppClassLoader
xupefei Oct 12, 2024
7a0910b
.
xupefei Oct 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added repl/src/test/resources/IntSumUdf.class
Binary file not shown.
22 changes: 22 additions & 0 deletions repl/src/test/resources/IntSumUdf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.spark.sql.api.java.UDF2

class IntSumUdf extends UDF2[Long, Long, Long] {
override def call(t1: Long, t2: Long): Long = t1 + t2
}
37 changes: 37 additions & 0 deletions repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,41 @@ class ReplSuite extends SparkFunSuite {
Main.sparkContext.stop()
System.clearProperty("spark.driver.port")
}

test("register artifacts via SparkSession.addArtifact") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use a UDF defined in the REPL? If so how does this work with a JobArtifactSet? Do we layer the globally defined classpath over the session specific classpath? (I'd be nice to document this somewhere).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added one more test, which defines a UDF that initialises an external class added as an artifact.

how does this work with a JobArtifactSet?

Can you elaborate? Afaik JobArtifactSet is not involved here since it's the artifact path that is applied when an active SparkSession is applied.

Classpath - It's the other way around: the session classpath is laid over the global one.

val artifactPath = new File("src/test/resources").toPath
val intSumUdfPath = artifactPath.resolve("IntSumUdf.class")
val output = runInterpreterInPasteMode("local",
s"""
|import org.apache.spark.sql.api.java.UDF2
|import org.apache.spark.sql.types.DataTypes
|
|spark.addArtifact("${intSumUdfPath.toString}")
|
|spark.udf.registerJava("intSum", "IntSumUdf", DataTypes.LongType)
|
|val r = spark.range(5)
| .withColumn("id2", col("id") + 1)
| .selectExpr("intSum(id, id2)")
| .collect()
|assert(r.map(_.getLong(0)).toSeq == Seq(1, 3, 5, 7, 9))
|
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertDoesNotContain("assertion failed", output)

// The UDF should not work in a nee REPL session.
val anotherOutput = runInterpreterInPasteMode("local",
s"""
|val r = spark.range(5)
| .withColumn("id2", col("id") + 1)
| .selectExpr("intSum(id, id2)")
| .collect()
|
""".stripMargin)
assertContains(
"[UNRESOLVED_ROUTINE] Cannot resolve routine `intSum` on search path",
anotherOutput)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils
* @since 1.3.0
*/
@Stable
class UDFRegistration private[sql] (functionRegistry: FunctionRegistry)
class UDFRegistration private[sql] (session: SparkSession, functionRegistry: FunctionRegistry)
extends api.UDFRegistration
with Logging {
protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = {
Expand Down Expand Up @@ -121,7 +121,9 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry)
*/
private[sql] def registerJavaUDAF(name: String, className: String): Unit = {
try {
val clazz = Utils.classForName[AnyRef](className)
val clazz = session.artifactManager.withResources {
Utils.classForName[AnyRef](className, noSparkClassLoader = true)
xupefei marked this conversation as resolved.
Show resolved Hide resolved
}
if (!classOf[UserDefinedAggregateFunction].isAssignableFrom(clazz)) {
throw QueryCompilationErrors
.classDoesNotImplementUserDefinedAggregateFunctionError(className)
Expand All @@ -145,9 +147,11 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry)
* @param returnDataType return type of udf. If it is null, spark would try to infer
* via reflection.
*/
private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
Comment on lines -148 to +151
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to make this method public so I can call it from REPL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not against this. I am trying to understand the user facing consequences though. I'd probably prefer that we add support for Scala UDFs as well. That can be done in a follow-up though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file a follow-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

try {
val clazz = Utils.classForName[AnyRef](className)
val clazz = session.artifactManager.withResources {
Utils.classForName[AnyRef](className)
}
val udfInterfaces = clazz.getGenericInterfaces
.filter(_.isInstanceOf[ParameterizedType])
.map(_.asInstanceOf[ParameterizedType])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ArtifactManager(session: SparkSession) extends Logging {
(ArtifactUtils.concatenatePaths(artifactPath, "classes"),
s"$artifactURI${File.separator}classes${File.separator}")

protected[artifact] val state: JobArtifactState =
protected[sql] val state: JobArtifactState =
JobArtifactState(session.sessionUUID, Option(classURI))

def withResources[T](f: => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,93 +115,95 @@ object SQLExecution extends Logging {
}
val redactedConfigs = sparkSession.sessionState.conf.redactOptions(modifiedConfigs)

withSQLConfPropagated(sparkSession) {
var ex: Option[Throwable] = None
var isExecutedPlanAvailable = false
val startTime = System.nanoTime()
val startEvent = SparkListenerSQLExecutionStart(
executionId = executionId,
rootExecutionId = Some(rootExecutionId),
description = desc,
details = callSite.longForm,
physicalPlanDescription = "",
sparkPlanInfo = SparkPlanInfo.EMPTY,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags(),
jobGroupId = Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
)
try {
body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
JobArtifactSet.withActiveJobArtifactState(sparkSession.artifactManager.state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check how this interacts with all the stuff we do in Connect to make this work? I feel that we are duplicating code now. cc @vicennial

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An FYI to other reviewers: look at this file with hidden whitespace.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, with this in the execution code path, we may not need SessionHolder#withSession in a few places and can be cleaned up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vicennial Is there a end-to-end test for this? I did some modifications and want to know if it won't break anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xupefei The ReplE2ESuite has some tests for the overall client->artifact->execution with artifact flow.
Python client package my have some E2E tests as well but I am not familiar of the current status.

withSQLConfPropagated(sparkSession) {
var ex: Option[Throwable] = None
var isExecutedPlanAvailable = false
val startTime = System.nanoTime()
val startEvent = SparkListenerSQLExecutionStart(
executionId = executionId,
rootExecutionId = Some(rootExecutionId),
description = desc,
details = callSite.longForm,
physicalPlanDescription = "",
sparkPlanInfo = SparkPlanInfo.EMPTY,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags(),
jobGroupId = Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
)
try {
body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
throw e
case Right(f) =>
val planDescriptionMode =
ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode)
val planDesc = queryExecution.explainString(planDescriptionMode)
val planInfo = try {
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
} catch {
case NonFatal(e) =>
logDebug("Failed to generate SparkPlanInfo", e)
// If the queryExecution already failed before this, we are not able to generate
// the the plan info, so we use and empty graphviz node to make the UI happy
SparkPlanInfo.EMPTY
}
sc.listenerBus.post(
startEvent.copy(physicalPlanDescription = planDesc, sparkPlanInfo = planInfo))
isExecutedPlanAvailable = true
f()
}
} catch {
case e: Throwable =>
ex = Some(e)
throw e
case Right(f) =>
val planDescriptionMode =
ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode)
val planDesc = queryExecution.explainString(planDescriptionMode)
val planInfo = try {
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
} catch {
case NonFatal(e) =>
logDebug("Failed to generate SparkPlanInfo", e)
// If the queryExecution already failed before this, we are not able to generate
// the the plan info, so we use and empty graphviz node to make the UI happy
SparkPlanInfo.EMPTY
}
sc.listenerBus.post(
startEvent.copy(physicalPlanDescription = planDesc, sparkPlanInfo = planInfo))
isExecutedPlanAvailable = true
f()
}
} catch {
case e: Throwable =>
ex = Some(e)
throw e
} finally {
val endTime = System.nanoTime()
val errorMessage = ex.map {
case e: SparkThrowable =>
SparkThrowableHelper.getMessage(e, ErrorMessageFormat.PRETTY)
case e =>
Utils.exceptionString(e)
}
if (queryExecution.shuffleCleanupMode != DoNotCleanup
&& isExecutedPlanAvailable) {
val shuffleIds = queryExecution.executedPlan match {
case ae: AdaptiveSparkPlanExec =>
ae.context.shuffleIds.asScala.keys
case _ =>
Iterable.empty
} finally {
val endTime = System.nanoTime()
val errorMessage = ex.map {
case e: SparkThrowable =>
SparkThrowableHelper.getMessage(e, ErrorMessageFormat.PRETTY)
case e =>
Utils.exceptionString(e)
}
shuffleIds.foreach { shuffleId =>
queryExecution.shuffleCleanupMode match {
case RemoveShuffleFiles =>
// Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister
// the shuffle on MapOutputTracker, so that stage retries would be triggered.
// Set blocking to Utils.isTesting to deflake unit tests.
sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting)
case SkipMigration =>
SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId)
case _ => // this should not happen
if (queryExecution.shuffleCleanupMode != DoNotCleanup
&& isExecutedPlanAvailable) {
val shuffleIds = queryExecution.executedPlan match {
case ae: AdaptiveSparkPlanExec =>
ae.context.shuffleIds.asScala.keys
case _ =>
Iterable.empty
}
shuffleIds.foreach { shuffleId =>
queryExecution.shuffleCleanupMode match {
case RemoveShuffleFiles =>
// Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister
// the shuffle on MapOutputTracker, so that stage retries would be triggered.
// Set blocking to Utils.isTesting to deflake unit tests.
sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting)
case SkipMigration =>
SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId)
case _ => // this should not happen
}
}
}
val event = SparkListenerSQLExecutionEnd(
executionId,
System.currentTimeMillis(),
// Use empty string to indicate no error, as None may mean events generated by old
// versions of Spark.
errorMessage.orElse(Some("")))
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the
// `name` parameter. The `ExecutionListenerManager` only watches SQL executions with
// name. We can specify the execution name in more places in the future, so that
// `QueryExecutionListener` can track more cases.
event.executionName = name
event.duration = endTime - startTime
event.qe = queryExecution
event.executionFailure = ex
sc.listenerBus.post(event)
}
val event = SparkListenerSQLExecutionEnd(
executionId,
System.currentTimeMillis(),
// Use empty string to indicate no error, as None may mean events generated by old
// versions of Spark.
errorMessage.orElse(Some("")))
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
// can specify the execution name in more places in the future, so that
// `QueryExecutionListener` can track more cases.
event.executionName = name
event.duration = endTime - startTime
event.qe = queryExecution
event.executionFailure = ex
sc.listenerBus.post(event)
}
}
} finally {
Expand Down Expand Up @@ -281,7 +283,10 @@ object SQLExecution extends Logging {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull
// `getCurrentJobArtifactState` will return a stat only in Spark Connect mode. In non-Connect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be safe to use the SparkSession's jobArtifactState. They should be the same. cc @vicennial.

// mode, we default back to the resources of the current Spark session.
val artifactState = JobArtifactSet.getCurrentJobArtifactState.getOrElse(
activeSession.artifactManager.state)
exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState) {
val originalSession = SparkSession.getActiveSession
val originalLocalProps = sc.getLocalProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ abstract class BaseSessionStateBuilder(
* Note 1: The user-defined functions must be deterministic.
* Note 2: This depends on the `functionRegistry` field.
*/
protected def udfRegistration: UDFRegistration = new UDFRegistration(functionRegistry)
protected def udfRegistration: UDFRegistration = new UDFRegistration(session, functionRegistry)

protected def udtfRegistration: UDTFRegistration = new UDTFRegistration(tableFunctionRegistry)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,11 @@ class ArtifactManagerSuite extends SharedSparkSession {
.asInstanceOf[UDF2[Long, Long, Long]]
spark.udf.register("intSum", instance, DataTypes.LongType)

artifactManager.withResources {
val r = spark.range(5)
.withColumn("id2", col("id") + 1)
.selectExpr("intSum(id, id2)")
.collect()
assert(r.map(_.getLong(0)).toSeq == Seq(1, 3, 5, 7, 9))
}
val r = spark.range(5)
.withColumn("id2", col("id") + 1)
.selectExpr("intSum(id, id2)")
.collect()
assert(r.map(_.getLong(0)).toSeq == Seq(1, 3, 5, 7, 9))
}

private def testAddArtifactToLocalSession(
Expand Down