-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49249][SPARK-49122] Artifact isolation in Spark Classic #48120
base: master
Are you sure you want to change the base?
Conversation
private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = { | ||
def registerJava(name: String, className: String, returnDataType: DataType): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to make this method public so I can call it from REPL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not against this. I am trying to understand the user facing consequences though. I'd probably prefer that we add support for Scala UDFs as well. That can be done in a follow-up though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you file a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
body match { | ||
case Left(e) => | ||
sc.listenerBus.post(startEvent) | ||
JobArtifactSet.withActiveJobArtifactState(sparkSession.artifactManager.state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check how this interacts with all the stuff we do in Connect to make this work? I feel that we are duplicating code now. cc @vicennial
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An FYI to other reviewers: look at this file with hidden whitespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, with this in the execution code path, we may not need SessionHolder#withSession in a few places and can be cleaned up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vicennial Is there a end-to-end test for this? I did some modifications and want to know if it won't break anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xupefei The ReplE2ESuite has some tests for the overall client->artifact->execution with artifact flow.
Python client package my have some E2E tests as well but I am not familiar of the current status.
sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
Outdated
Show resolved
Hide resolved
@@ -396,4 +396,41 @@ class ReplSuite extends SparkFunSuite { | |||
Main.sparkContext.stop() | |||
System.clearProperty("spark.driver.port") | |||
} | |||
|
|||
test("register artifacts via SparkSession.addArtifact") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use a UDF defined in the REPL? If so how does this work with a JobArtifactSet? Do we layer the globally defined classpath over the session specific classpath? (I'd be nice to document this somewhere).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added one more test, which defines a UDF that initialises an external class added as an artifact.
how does this work with a JobArtifactSet?
Can you elaborate? Afaik JobArtifactSet
is not involved here since it's the artifact path that is applied when an active SparkSession is applied.
Classpath - It's the other way around: the session classpath is laid over the global one.
body match { | ||
case Left(e) => | ||
sc.listenerBus.post(startEvent) | ||
JobArtifactSet.withActiveJobArtifactState(sparkSession.artifactManager.state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, with this in the execution code path, we may not need SessionHolder#withSession in a few places and can be cleaned up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I meant to leave a comment (ignore the approval, I haven't gone through the whole PR)
@@ -121,7 +120,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) | |||
*/ | |||
private[sql] def registerJavaUDAF(name: String, className: String): Unit = { | |||
try { | |||
val clazz = Utils.classForName[AnyRef](className) | |||
val clazz = session.artifactManager.classloader.loadClass(className) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One follow-up here would be to cache the ArtifactManager classloader. I think we create that thing over and over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We can invalidate the cache when a new JAR is added.
@@ -281,7 +281,10 @@ object SQLExecution extends Logging { | |||
val activeSession = sparkSession | |||
val sc = sparkSession.sparkContext | |||
val localProps = Utils.cloneProperties(sc.getLocalProperties) | |||
val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull | |||
// `getCurrentJobArtifactState` will return a stat only in Spark Connect mode. In non-Connect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be safe to use the SparkSession's jobArtifactState. They should be the same. cc @vicennial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - Pending @vicennial's sign-off.
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala # sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
SparkSession.addArtifact
work with REPLccb1554
to
be49405
Compare
What changes were proposed in this pull request?
This PR makes the isolation feature introduced by
SparkSession.addArtifact
API (added in #47631) work with Spark SQL.Note that this PR does not enable isolation for the following two use cases:
SparkSession
, which resources escaped from our session scope.Why are the changes needed?
Because it didn't work before :)
Does this PR introduce any user-facing change?
Yes, the user can add a new artifact in the REPL and use it in the current REPL session.
How was this patch tested?
Added a new test.
Was this patch authored or co-authored using generative AI tooling?
No.