Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8342486: Implement JEP draft: Structured Concurrency (Fourth Preview) #21760

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/java.base/share/classes/java/lang/ScopedValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,19 @@
* {@snippet lang=java :
* private static final ScopedValue<String> NAME = ScopedValue.newInstance();

* ScopedValue.runWhere(NAME, "duke", () -> {
* try (var scope = new StructuredTaskScope<String>()) {
* ScopedValue.where(NAME, "duke").run(() -> {
* // @link substring="open" target="StructuredTaskScope#open()" :
* try (var scope = StructuredTaskScope.open()) {
*
* scope.fork(() -> childTask1());
* scope.fork(() -> childTask2());
* scope.fork(() -> childTask3());
* // @link substring="fork" target="StructuredTaskScope#fork(java.util.concurrent.Callable)" :
* scope.fork(() -> childTask1());
* scope.fork(() -> childTask2());
* scope.fork(() -> childTask3());
*
* ...
* // @link substring="join" target="StructuredTaskScope#join()" :
* scope.join();
*
* ..
* }
* });
* }
Expand Down
2,082 changes: 1,195 additions & 887 deletions src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum Feature {
IMPLICIT_CLASSES,
@JEP(number=481, title="Scoped Values", status="Third Preview")
SCOPED_VALUES,
@JEP(number=480, title="Structured Concurrency", status="Third Preview")
@JEP(number=999_999, title="Structured Concurrency", status="Fourth Preview")
STRUCTURED_CONCURRENCY,
@JEP(number=466, title="ClassFile API", status="Second Preview")
CLASSFILE_API,
Expand Down
11 changes: 0 additions & 11 deletions src/java.base/share/classes/jdk/internal/misc/ThreadFlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,8 @@ public Thread start(Thread thread) {
* Shutdown this flock so that no new threads can be started, existing threads
* in the flock will continue to run. This method is a no-op if the flock is
* already shutdown or closed.
*
* <p> This method may only be invoked by the flock owner or threads {@linkplain
* #containsThread(Thread) contained} in the flock.
*
* @throws WrongThreadException if the current thread is not the owner or a thread
* contained in the flock
*/
public void shutdown() {
ensureOwnerOrContainsThread();
if (!shutdown) {
shutdown = true;
}
Expand Down Expand Up @@ -370,12 +363,8 @@ public boolean awaitAll(Duration timeout)
* <p> If the owner is blocked in {@code awaitAll} then it will return immediately.
* If the owner is not blocked in {@code awaitAll} then its next call to wait
* will return immediately. The method does nothing when the flock is closed.
*
* @throws WrongThreadException if the current thread is not the owner or a thread
* contained in the flock
*/
public void wakeup() {
ensureOwnerOrContainsThread();
if (!getAndSetPermit(true) && Thread.currentThread() != owner()) {
LockSupport.unpark(owner());
}
Expand Down
5 changes: 3 additions & 2 deletions test/jdk/java/lang/ScopedValue/StressStackOverflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.StructureViolationException;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Joiner;
import java.util.function.Supplier;

public class StressStackOverflow {
Expand Down Expand Up @@ -169,7 +170,7 @@ long fibonacci_pad(int n, Runnable op) {
void runInNewThread(Runnable op) {
var threadFactory
= (ThreadLocalRandom.current().nextBoolean() ? Thread.ofPlatform() : Thread.ofVirtual()).factory();
try (var scope = new StructuredTaskScope<>("", threadFactory)) {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(threadFactory))) {
var handle = scope.fork(() -> {
op.run();
return null;
Expand All @@ -186,7 +187,7 @@ void runInNewThread(Runnable op) {
public void run() {
try {
ScopedValue.where(inheritedValue, 42).where(el, 0).run(() -> {
try (var scope = new StructuredTaskScope<>()) {
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
try {
if (ThreadLocalRandom.current().nextBoolean()) {
// Repeatedly test Scoped Values set by ScopedValue::call(), get(), and run()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -23,15 +23,15 @@

/*
* @test
* @bug 8311867
* @summary Stress test of StructuredTaskScope.shutdown with running and starting threads
* @summary Stress test of StructuredTaskScope cancellation with running and starting threads
* @enablePreview
* @run junit StressShutdown
* @run junit StressCancellation
*/

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Joiner;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.ThreadFactory;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -41,12 +41,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.*;

class StressShutdown {

static final Callable<Void> SLEEP_FOR_A_DAY = () -> {
Thread.sleep(Duration.ofDays(1));
return null;
};
class StressCancellation {

static Stream<Arguments> testCases() {
Stream<ThreadFactory> factories = Stream.of(
Expand All @@ -59,34 +54,47 @@ static Stream<Arguments> testCases() {
}

/**
* Test StructuredTaskScope.shutdown with running threads and concurrently with
* threads that are starting. The shutdown should interrupt all threads so that
* join wakes up.
* Test StructuredTaskScope cancellation with running threads and concurrently with
* threads that are starting. The cancellation should interrupt all running threads,
* join should wakeup, and close would complete quickly.
*
* @param factory the ThreadFactory to use
* @param beforeShutdown the number of subtasks to fork before shutdown
* @param afterShutdown the number of subtasks to fork after shutdown
* @param beforeCancel the number of subtasks to fork before cancel
* @param afterCancel the number of subtasks to fork after cancel
*/
@ParameterizedTest
@MethodSource("testCases")
void testShutdown(ThreadFactory factory, int beforeShutdown, int afterShutdown)
throws InterruptedException
{
try (var scope = new StructuredTaskScope<>(null, factory)) {
void test(ThreadFactory factory, int beforeCancel, int afterCancel) throws Exception {
var joiner = new Joiner<Boolean, Void>() {
@Override
public boolean onComplete(Subtask<? extends Boolean> subtask) {
boolean cancel = subtask.get();
return cancel;
}
@Override
public Void result() {
return null;
}
};

try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
// fork subtasks
for (int i = 0; i < beforeShutdown; i++) {
scope.fork(SLEEP_FOR_A_DAY);
for (int i = 0; i < beforeCancel; i++) {
scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return false;
});
}

// fork subtask to shutdown
scope.fork(() -> {
scope.shutdown();
return null;
});
// fork subtask to cancel
scope.fork(() -> true);

// fork after forking subtask to shutdown
for (int i = 0; i < afterShutdown; i++) {
scope.fork(SLEEP_FOR_A_DAY);
// fork after forking subtask to cancel
for (int i = 0; i < afterCancel; i++) {
scope.fork(() -> {
Thread.sleep(Duration.ofDays(1));
return false;
});
}

scope.join();
Expand Down
Loading