Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49808][SQL] Fix a deadlock in subquery execution due to lazy vals #48391

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Oct 9, 2024

What changes were proposed in this pull request?

1, Introduce a helper class Lazy to replace the lazy vals
2, Fix a deadlock in subquery execution

Why are the changes needed?

we observed a deadlock between QueryPlan.canonicalized and QueryPlan.references:

The main thread TakeOrderedAndProject.doExecute is trying to compute outputOrdering, it top-down traverse the tree, and requires the lock of QueryPlan.canonicalized in the path.
In this deadlock, it successfully obtained the lock of WholeStageCodegenExec and requires the lock of HashAggregateExec;

Concurrently, a subquery execution thread is performing code generation and bottom-up traverses the tree via def consume, which checks WholeStageCodegenExec.usedInputs and refererences a lazy val QueryPlan.references. It requires the lock of QueryPlan.references in the path.
In this deadlock, it successfully obtained the lock of HashAggregateExec and requires the lock of WholeStageCodegenExec;

This is due to Scala's lazy val internally calls this.synchronized on the instance that contains the val. This creates a potential for deadlocks.

Does this PR introduce any user-facing change?

no

How was this patch tested?

manually test:

before the fix, the deadlock happened twice in first 20 runs;
after the fix, the deadlock didn't happen in consecutive 100+ runs

Was this patch authored or co-authored using generative AI tooling?

no

* the parent object.
* c) If thread 1 waits for thread 2 to join, a deadlock occurs.
*/
@SerialVersionUID(7964587975756091988L)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generated by:

(spark_dev_312) ➜  spark git:(query_plan_lazy_ref) /usr/bin/serialver -classpath /Users/ruifeng.zheng/Dev/spark/core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT.jar:/Users/ruifeng.zheng/Dev/spark/build/scala-2.12.18/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy:    private static final long serialVersionUID = 7964587975756091988L;
(spark_dev_312) ➜  spark git:(query_plan_lazy_ref) /usr/bin/serialver -classpath /Users/ruifeng.zheng/Dev/spark/core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT.jar:/Users/ruifeng.zheng/Dev/spark/build/scala-2.13.12/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy:    private static final long serialVersionUID = 7964587975756091988L;
(spark_dev_312) ➜  spark git:(query_plan_lazy_ref) /usr/bin/serialver -classpath /Users/ruifeng.zheng/Dev/spark/core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT.jar:/Users/ruifeng.zheng/Dev/spark/build/scala-2.13.13/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy:    private static final long serialVersionUID = 7964587975756091988L;

This comment was marked as outdated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't LazyTry need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this SerialVersionUID stems indirectly from the unit test case

test("Lazy val serializes the value, even if dereference was never called")

which looks like it is testing for some serialization behaviors related to whether or not we serialize the initializer closure.

In this PR's current implementation, the stream.defaultWriteObject() after field initialization means that we actually have strict / eager semantics when serializing these fields and that we won't serialize the initializer.

To see that more clearly, we can use a tool like cfr-decompiler to decompile the generated code, from which we see that the initializer reference gets cleared after its use in the lazy val evaluation

    private Function0<T> initializer;
    private volatile boolean bitmap$0;

    private T value$lzycompute() {
        Lazy lazy = this;
        synchronized (lazy) {
            if (!this.bitmap$0) {
                this.value = this.initializer.apply();
                this.bitmap$0 = true;
            }
        }
        this.initializer = null;
        return this.value;
    }

    private T value() {
        if (!this.bitmap$0) {
            return this.value$lzycompute();
        }
        return this.value;
    }

There's a bit of a trade-off space here:

  • Eager evaluation upon serialization may reduce the size of the serialized data in case the initializer lambda/closure is large in comparison to the value it produces and can avoid problems from non-serializable lambda/closures, or compatibility problems in case wire protocol bidirectional compatibility is needed (e.g. if you are somehow persisting a Lazy[T] and deserializing it with a different classpath than the one that generated it, because it then changes the compatibility surface to include compatibility of the lambda / closure serialization rather than the value).
  • But this comes at the cost of potentially forcing more evaluations than otherwise would have happened.

In our context of use here, I think that by default it's better to forego the SerialVersionUID and "eager serialization" semantics completely and instead just do what LazyTry did, as the "eager evaluation upon serialization" semantic may be unwanted in this context of use or may address a set of problems that we don't have in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, let me remove the SerialVersionUID and "eager serialization"

@panbingkun
Copy link
Contributor

panbingkun commented Oct 9, 2024

  • branch 3.4 + scala 2.12.17
(base) ➜  spark-trunk git:(branch-3.4) ✗ serialver -classpath core/target/spark-core_2.12-3.4.4-SNAPSHOT.jar:build/scala-2.12.17/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy:    private static final long serialVersionUID = 7964587975756091988L;

@zhengruifeng
Copy link
Contributor Author

@panbingkun thanks a lot for confirming the SerialVersionUID!

@zhengruifeng zhengruifeng marked this pull request as ready for review October 10, 2024 07:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants