Skip to content

Commit

Permalink
Fix deadlock in ForkJoinPoolHierarchicalTestExecutorService
Browse files Browse the repository at this point in the history
The service now checks if the `ExclusiveTask` that should run
is executed on a thread that is already executing another task.
If this is scenario is detected, it checks if the lock is compatible to
the enclosing locks.
1. If compatible, it is executed and marked done
2. If incompatible, it is added to a list of deferred tasks and left
   unfinished. The deferred tasks will be re-forked afterward.

fixes #3945

Co-authored-by: Marc Philipp <[email protected]>
  • Loading branch information
leonard84 and marcphilipp committed Sep 11, 2024
1 parent e951f59 commit 6da042f
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.Lock;

import org.junit.platform.commons.util.Preconditions;

/**
* @since 1.3
*/
Expand All @@ -23,7 +25,7 @@ class CompositeLock implements ResourceLock {
private final List<Lock> locks;

CompositeLock(List<Lock> locks) {
this.locks = locks;
this.locks = Preconditions.notEmpty(locks, "Locks must not be empty");
}

// for tests only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -26,7 +28,6 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -52,6 +53,7 @@ public class ForkJoinPoolHierarchicalTestExecutorService implements Hierarchical

private final ForkJoinPool forkJoinPool;
private final int parallelism;
private final ThreadLocal<ThreadLock> threadLocks = new ThreadLocal<>();

/**
* Create a new {@code ForkJoinPoolHierarchicalTestExecutorService} based on
Expand Down Expand Up @@ -132,7 +134,7 @@ public Future<Void> submit(TestTask testTask) {
if (testTask.getExecutionMode() == CONCURRENT && ForkJoinTask.getSurplusQueuedTaskCount() < parallelism) {
return exclusiveTask.fork();
}
exclusiveTask.compute();
exclusiveTask.exec();
return completedFuture(null);
}

Expand All @@ -143,7 +145,7 @@ private boolean isAlreadyRunningInForkJoinPool() {
@Override
public void invokeAll(List<? extends TestTask> tasks) {
if (tasks.size() == 1) {
new ExclusiveTask(tasks.get(0)).compute();
new ExclusiveTask(tasks.get(0)).exec();
return;
}
Deque<ExclusiveTask> nonConcurrentTasks = new LinkedList<>();
Expand All @@ -169,14 +171,24 @@ private void forkConcurrentTasks(List<? extends TestTask> tasks, Deque<Exclusive

private void executeNonConcurrentTasks(Deque<ExclusiveTask> nonConcurrentTasks) {
for (ExclusiveTask task : nonConcurrentTasks) {
task.compute();
task.exec();
}
}

private void joinConcurrentTasksInReverseOrderToEnableWorkStealing(
Deque<ExclusiveTask> concurrentTasksInReverseOrder) {
for (ExclusiveTask forkedTask : concurrentTasksInReverseOrder) {
forkedTask.join();
ThreadLock threadLock = threadLocks.get();
if (threadLock != null) {
List<ExclusiveTask> deferredTasks = threadLock.deferredTasks;
for (ExclusiveTask deferredTask : deferredTasks) {
if (deferredTask.isDone()) {
deferredTask.fork();
}
}
deferredTasks.clear();
}
}
}

Expand All @@ -186,24 +198,61 @@ public void close() {
}

// this class cannot not be serialized because TestTask is not Serializable
@SuppressWarnings("serial")
static class ExclusiveTask extends RecursiveAction {
class ExclusiveTask extends ForkJoinTask<Void> {

private final TestTask testTask;

ExclusiveTask(TestTask testTask) {
this.testTask = testTask;
}

/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() {
return null;
}

/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) {
}

@SuppressWarnings("try")
@Override
public void compute() {
try (ResourceLock lock = testTask.getResourceLock().acquire()) {
public boolean exec() {
// Check if this task is compatible with the current resource lock, if there is any.
// If not, we put this task in the thread local as a deferred task
// and let the worker thread fork it once it is done with the current task.
ResourceLock resourceLock = testTask.getResourceLock();
ThreadLock threadLock = threadLocks.get();
if (threadLock != null) {
if (!threadLock.isLockCompatible(resourceLock)) {
threadLock.addDeferredTask(this);
// Return false to indicate that this task is not done yet
// this means that .join() will wait.
return false;
}
}
try (ResourceLock lock = resourceLock.acquire()) {
if (threadLock == null) {
threadLocks.set(threadLock = new ThreadLock());
}
threadLock.incrementNesting(lock);
testTask.execute();
return true;
}
catch (InterruptedException e) {
throw ExceptionUtils.throwAsUncheckedException(e);
}
finally {
if (threadLock != null && threadLock.decrementNesting()) {
threadLocks.remove();
}
}
}

}
Expand All @@ -228,4 +277,26 @@ static class WorkerThread extends ForkJoinWorkerThread {

}

static class ThreadLock {
private final Deque<ResourceLock> locks = new ArrayDeque<>(2);
private final List<ExclusiveTask> deferredTasks = new ArrayList<>();

void addDeferredTask(ExclusiveTask task) {
deferredTasks.add(task);
}

void incrementNesting(ResourceLock lock) {
locks.push(lock);
}

boolean decrementNesting() {
locks.pop();
return locks.isEmpty();
}

boolean isLockCompatible(ResourceLock lock) {
return locks.stream().allMatch(lock::isCompatible);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static java.util.stream.Collectors.toList;
import static org.junit.platform.commons.util.CollectionUtils.getOnlyElement;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_KEY;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.LockMode.READ;

import java.util.Collection;
Expand Down Expand Up @@ -44,15 +45,18 @@ private static Comparator<String> globalKeyFirst() {
private final Map<String, ReadWriteLock> locksByKey = new ConcurrentHashMap<>();

ResourceLock getLockForResources(Collection<ExclusiveResource> resources) {
if (resources.isEmpty()) {
return NopLock.INSTANCE;
}
if (resources.size() == 1) {
return getLockForResource(getOnlyElement(resources));
}
List<Lock> locks = getDistinctSortedLocks(resources);
return toResourceLock(locks);
return new CompositeLock(locks);
}

ResourceLock getLockForResource(ExclusiveResource resource) {
return new SingleLock(toLock(resource));
return new SingleLock(toLock(resource), GLOBAL_READ.equals(resource));
}

private List<Lock> getDistinctSortedLocks(Collection<ExclusiveResource> resources) {
Expand All @@ -74,15 +78,4 @@ private Lock toLock(ExclusiveResource resource) {
return resource.getLockMode() == READ ? lock.readLock() : lock.writeLock();
}

private ResourceLock toResourceLock(List<Lock> locks) {
switch (locks.size()) {
case 0:
return NopLock.INSTANCE;
case 1:
return new SingleLock(locks.get(0));
default:
return new CompositeLock(locks);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ default void close() {
release();
}

/**
* {@return whether the given lock is compatible with this lock}
* @param other the other lock to check for compatibility
*/
default boolean isCompatible(ResourceLock other) {
return other instanceof NopLock || this instanceof NopLock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
class SingleLock implements ResourceLock {

private final Lock lock;
private final boolean isGlobalReadLock;

SingleLock(Lock lock) {
SingleLock(Lock lock, boolean isGlobalReadLock) {
this.lock = lock;
this.isGlobalReadLock = isGlobalReadLock;
}

// for tests only
Expand Down Expand Up @@ -60,4 +62,9 @@ public boolean isReleasable() {

}

@Override
public boolean isCompatible(ResourceLock other) {
return ResourceLock.super.isCompatible(other)
|| (other instanceof SingleLock && ((SingleLock) other).isGlobalReadLock && isGlobalReadLock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v2.0 which
* accompanies this distribution and is available at
*
* https://www.eclipse.org/legal/epl-v20.html
*/

package org.junit.platform.engine.support.hierarchical;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.jupiter.api.Test;

class ResourceLockTests {

private final ReentrantLock reentrantLock = new ReentrantLock();

@Test
void singleResourceLocksThatBothHaveTheGlobalReadLockFlagAreCompatible() {
SingleLock lock1 = new SingleLock(reentrantLock, true);
SingleLock lock2 = new SingleLock(reentrantLock, true);
assertSymmetry(lock1, lock2);
}

@Test
void singleResourceLocksThatNotBothHaveTheGlobalReadLockFlagAreIncompatible() {
SingleLock lock1 = new SingleLock(reentrantLock, true);
SingleLock lock2 = new SingleLock(reentrantLock, false);
SingleLock lock3 = new SingleLock(reentrantLock, false);
assertSymmetryNotCompatible(lock1, lock2);
assertSymmetryNotCompatible(lock1, lock3);
assertSymmetryNotCompatible(lock2, lock3);
}

@Test
void nopLocksAreCompatibleWithEverything() {
ResourceLock nop = NopLock.INSTANCE;
SingleLock singleLockGR = new SingleLock(reentrantLock, true);
SingleLock singleLock = new SingleLock(reentrantLock, false);
CompositeLock compositeLock = new CompositeLock(List.of(reentrantLock));
assertSymmetry(nop, singleLockGR);
assertSymmetry(nop, singleLock);
assertSymmetry(nop, compositeLock);
}

@Test
void compositeLocksAreIncompatibleWithNonNopLocks() {
CompositeLock compositeLock = new CompositeLock(List.of(reentrantLock));
SingleLock singleLockGR = new SingleLock(reentrantLock, true);
SingleLock singleLock = new SingleLock(reentrantLock, false);
assertSymmetryNotCompatible(compositeLock, singleLockGR);
assertSymmetryNotCompatible(compositeLock, singleLock);
assertSymmetryNotCompatible(compositeLock, compositeLock);
}

void assertSymmetry(ResourceLock a, ResourceLock b) {
assertTrue(a.isCompatible(b));
assertTrue(b.isCompatible(a));
}

void assertSymmetryNotCompatible(ResourceLock a, ResourceLock b) {
assertFalse(a.isCompatible(b));
assertFalse(b.isCompatible(a));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SingleLockTests {
void acquire() throws Exception {
var lock = new ReentrantLock();

new SingleLock(lock).acquire();
new SingleLock(lock, true).acquire();

assertTrue(lock.isLocked());
}
Expand All @@ -37,7 +37,7 @@ void acquire() throws Exception {
void release() throws Exception {
var lock = new ReentrantLock();

new SingleLock(lock).acquire().close();
new SingleLock(lock, true).acquire().close();

assertFalse(lock.isLocked());
}
Expand Down

0 comments on commit 6da042f

Please sign in to comment.