Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in ForkJoinPoolHierarchicalTestExecutorService #3981

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.Lock;

import org.junit.platform.commons.util.Preconditions;

/**
* @since 1.3
*/
Expand All @@ -23,7 +25,7 @@ class CompositeLock implements ResourceLock {
private final List<Lock> locks;

CompositeLock(List<Lock> locks) {
this.locks = locks;
this.locks = Preconditions.notEmpty(locks, "Locks must not be empty");
}

// for tests only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -26,7 +28,6 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -51,7 +52,9 @@
public class ForkJoinPoolHierarchicalTestExecutorService implements HierarchicalTestExecutorService {

private final ForkJoinPool forkJoinPool;
private final TaskEventListener taskEventListener;
private final int parallelism;
private final ThreadLocal<ThreadLock> threadLocks = ThreadLocal.withInitial(ThreadLock::new);
marcphilipp marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a new {@code ForkJoinPoolHierarchicalTestExecutorService} based on
Expand All @@ -71,7 +74,13 @@ public ForkJoinPoolHierarchicalTestExecutorService(ConfigurationParameters confi
*/
@API(status = STABLE, since = "1.10")
public ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration) {
this(configuration, TaskEventListener.NOOP);
}

ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration,
TaskEventListener taskEventListener) {
forkJoinPool = createForkJoinPool(configuration);
this.taskEventListener = taskEventListener;
parallelism = forkJoinPool.getParallelism();
LoggerFactory.getLogger(getClass()).config(() -> "Using ForkJoinPool with parallelism of " + parallelism);
}
Expand Down Expand Up @@ -132,7 +141,7 @@ public Future<Void> submit(TestTask testTask) {
if (testTask.getExecutionMode() == CONCURRENT && ForkJoinTask.getSurplusQueuedTaskCount() < parallelism) {
return exclusiveTask.fork();
}
exclusiveTask.compute();
exclusiveTask.execSync();
return completedFuture(null);
}

Expand All @@ -143,7 +152,7 @@ private boolean isAlreadyRunningInForkJoinPool() {
@Override
public void invokeAll(List<? extends TestTask> tasks) {
if (tasks.size() == 1) {
new ExclusiveTask(tasks.get(0)).compute();
new ExclusiveTask(tasks.get(0)).execSync();
return;
}
Deque<ExclusiveTask> nonConcurrentTasks = new LinkedList<>();
Expand All @@ -169,14 +178,28 @@ private void forkConcurrentTasks(List<? extends TestTask> tasks, Deque<Exclusive

private void executeNonConcurrentTasks(Deque<ExclusiveTask> nonConcurrentTasks) {
for (ExclusiveTask task : nonConcurrentTasks) {
task.compute();
task.execSync();
}
}

private void joinConcurrentTasksInReverseOrderToEnableWorkStealing(
Deque<ExclusiveTask> concurrentTasksInReverseOrder) {
for (ExclusiveTask forkedTask : concurrentTasksInReverseOrder) {
forkedTask.join();
resubmitDeferredTasks();
}
}

private void resubmitDeferredTasks() {
ThreadLock threadLock = threadLocks.get();
if (threadLock != null) {
List<ExclusiveTask> deferredTasks = threadLock.deferredTasks;
for (ExclusiveTask deferredTask : deferredTasks) {
if (!deferredTask.isDone()) {
deferredTask.fork();
}
}
deferredTasks.clear();
}
}

Expand All @@ -186,26 +209,72 @@ public void close() {
}

// this class cannot not be serialized because TestTask is not Serializable
@SuppressWarnings("serial")
static class ExclusiveTask extends RecursiveAction {
@SuppressWarnings({ "serial", "RedundantSuppression" })
class ExclusiveTask extends ForkJoinTask<Void> {

private final TestTask testTask;

ExclusiveTask(TestTask testTask) {
this.testTask = testTask;
}

/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() {
return null;
}

/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) {
}

void execSync() {
boolean completed = exec();
if (!completed) {
throw new IllegalStateException(
"Task was deferred but should have been executed synchronously: " + testTask);
}
}

@SuppressWarnings("try")
@Override
public void compute() {
try (ResourceLock lock = testTask.getResourceLock().acquire()) {
public boolean exec() {
// Check if this task is compatible with the current resource lock, if there is any.
// If not, we put this task in the thread local as a deferred task
// and let the worker thread fork it once it is done with the current task.
ResourceLock resourceLock = testTask.getResourceLock();
ThreadLock threadLock = threadLocks.get();
if (!threadLock.areAllHeldLocksCompatibleWith(resourceLock)) {
threadLock.addDeferredTask(this);
taskEventListener.deferred(testTask);
// Return false to indicate that this task is not done yet
// this means that .join() will wait.
return false;
}
try (ResourceLock lock = resourceLock.acquire()) {
threadLock.incrementNesting(lock);
testTask.execute();
return true;
}
catch (InterruptedException e) {
throw ExceptionUtils.throwAsUncheckedException(e);
}
finally {
if (threadLock.decrementNesting()) {
threadLocks.remove();
}
}
}

@Override
public String toString() {
return "ExclusiveTask for " + testTask;
marcphilipp marked this conversation as resolved.
Show resolved Hide resolved
}
}

static class WorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
Expand All @@ -228,4 +297,35 @@ static class WorkerThread extends ForkJoinWorkerThread {

}

static class ThreadLock {
private final Deque<ResourceLock> locks = new ArrayDeque<>(2);
private final List<ExclusiveTask> deferredTasks = new ArrayList<>();

void addDeferredTask(ExclusiveTask task) {
deferredTasks.add(task);
}

void incrementNesting(ResourceLock lock) {
locks.push(lock);
}

@SuppressWarnings("resource")
boolean decrementNesting() {
locks.pop();
return locks.isEmpty();
}

boolean areAllHeldLocksCompatibleWith(ResourceLock lock) {
return locks.stream().allMatch(l -> l.isCompatible(lock));
}
}

interface TaskEventListener {

TaskEventListener NOOP = __ -> {
};

void deferred(TestTask testTask);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@

package org.junit.platform.engine.support.hierarchical;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static org.junit.platform.commons.util.CollectionUtils.getOnlyElement;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_KEY;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ_WRITE;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.LockMode.READ;

import java.util.Collection;
Expand Down Expand Up @@ -44,18 +48,27 @@ private static Comparator<String> globalKeyFirst() {
private final Map<String, ReadWriteLock> locksByKey = new ConcurrentHashMap<>();

ResourceLock getLockForResources(Collection<ExclusiveResource> resources) {
if (resources.size() == 1) {
return getLockForResource(getOnlyElement(resources));
}
List<Lock> locks = getDistinctSortedLocks(resources);
return toResourceLock(locks);
return toResourceLock(getDistinctSortedLocks(resources));
}

ResourceLock getLockForResource(ExclusiveResource resource) {
return new SingleLock(toLock(resource));
Lock lock = toLock(resource);
if (GLOBAL_READ.equals(resource)) {
return new SingleLock.GlobalReadLock(lock);
}
else if (GLOBAL_READ_WRITE.equals(resource)) {
return new SingleLock.GlobalReadWriteLock(lock);
}
marcphilipp marked this conversation as resolved.
Show resolved Hide resolved
return new SingleLock(lock);
}

private List<Lock> getDistinctSortedLocks(Collection<ExclusiveResource> resources) {
if (resources.isEmpty()) {
return emptyList();
}
if (resources.size() == 1) {
return singletonList(toLock(getOnlyElement(resources)));
marcphilipp marked this conversation as resolved.
Show resolved Hide resolved
}
// @formatter:off
Map<String, List<ExclusiveResource>> resourcesByKey = resources.stream()
.sorted(COMPARATOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public ExecutionMode getExecutionMode() {
return taskContext.getExecutionAdvisor().getForcedExecutionMode(testDescriptor).orElse(node.getExecutionMode());
}

@Override
public String toString() {
return testDescriptor.toString();
marcphilipp marked this conversation as resolved.
Show resolved Hide resolved
}

void setParentContext(C parentContext) {
this.parentContext = parentContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ default void close() {
release();
}

/**
* {@return whether the given lock is compatible with this lock}
* @param other the other lock to check for compatibility
*/
default boolean isCompatible(ResourceLock other) {
return other instanceof NopLock || this instanceof NopLock;
marcphilipp marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,20 @@ public boolean isReleasable() {

}

static class GlobalReadLock extends SingleLock {
GlobalReadLock(Lock lock) {
super(lock);
}

@Override
public boolean isCompatible(ResourceLock other) {
return !(other instanceof GlobalReadWriteLock);
}
}

static class GlobalReadWriteLock extends SingleLock {
GlobalReadWriteLock(Lock lock) {
super(lock);
}
}
}
Loading
Loading