Skip to content

Commit

Permalink
Fix usages of some async utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Vampire committed Oct 14, 2023
1 parent 4b325b3 commit 4a09c7a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ public void runIterations(IDataIterator dataIterator, IIterationRunner iteration
dataIterator.forEachRemaining(data::add);
for (int attempt = 0; attempt < maxAttempts; attempt++) {
for (Object[] dataValues : data) {
try {
ExecutionResult executionResult = iterationRunner.runIteration(dataValues, maxIterations).get();
if (executionResult == ExecutionResult.FAILED) {
return;
}
} catch (InterruptedException | ExecutionException e) {
ExecutionResult executionResult = iterationRunner.runIteration(dataValues, maxIterations).join();
if (executionResult == ExecutionResult.FAILED) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,57 @@ public void run() {
long timeoutAt = 0;
int unsuccessfulInterruptAttempts = 0;

syncWithThread(startLatch, "feature", methodName);
boolean syncedWithFeature = false;
try {
startLatch.countDown();
syncedWithFeature = startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
// this is our own thread, so we can ignore the interruption safely
}
if (!syncedWithFeature) {
System.out.printf("[spock.lang.Timeout] Could not sync with Feature for method '%s'", methodName);
}

while (waitMillis > 0) {
long waitStart = System.nanoTime();
try {
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
// this is our own thread, so we can ignore the interruption safely and continue the remaining waiting
waitMillis -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - waitStart);
continue;
}
break;
}
if (!synced) {
stackTrace = mainThread.getStackTrace();
waitMillis = 250;
}
while (!synced) {
mainThread.interrupt();
try {
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
// The mission of this thread is to repeatedly interrupt the main thread until
// the latter returns. Once this mission has been accomplished, this thread will die quickly
}
if (!synced) {
long now = System.nanoTime();
if (stackTrace.length == 0) {
logMethodTimeout(methodName, timeoutSeconds);
stackTrace = mainThread.getStackTrace();
waitMillis = 250;
timeoutAt = now;
} else {
waitMillis *= 2;
logUnsuccessfulInterrupt(methodName, now, timeoutAt, waitMillis, ++unsuccessfulInterruptAttempts);
}
mainThread.interrupt();
System.out.printf("[spock.lang.Timeout] Method '%s' has not yet returned - interrupting. Next try in %1.2f seconds.\n",
methodName, waitMillis / 1000.);
}
}
}
}.start();

syncWithThread(startLatch, "watcher", methodName);
boolean syncedWithWatcher = false;
try {
startLatch.countDown();
syncedWithWatcher = startLatch.await(5, TimeUnit.SECONDS);
} finally {
if (!syncedWithWatcher) {
System.out.printf("[spock.lang.Timeout] Could not sync with Watcher for method '%s'", invocation.getMethod().getName());
}
}

Throwable saved = null;
try {
Expand Down Expand Up @@ -216,13 +240,4 @@ private static Pair<Integer, Integer> findThreadSection(List<String> lines, Stri

return null;
}

private static void syncWithThread(CountDownLatch startLatch, String threadName, String methodName) {
try {
startLatch.countDown();
startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
System.out.printf("[spock.lang.Timeout] Could not sync with %s thread for method '%s'", threadName, methodName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,14 @@ public void await() throws Throwable {
* @throws Throwable the first exception thrown by an evaluate block
*/
public void await(double seconds) throws Throwable {
latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
boolean evalBlocksFinished = latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
if (!exceptions.isEmpty())
throw exceptions.poll();

long pendingEvalBlocks = latch.getCount();
if (pendingEvalBlocks > 0) {
if (!evalBlocksFinished) {
String msg = String.format("Async conditions timed out " +
"after %1.2f seconds; %d out of %d evaluate blocks did not complete in time",
seconds, pendingEvalBlocks, numEvalBlocks);
seconds, latch.getCount(), numEvalBlocks);
throw new SpockTimeoutError(seconds, msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class ParallelSpec extends EmbeddedSpecification {
@ResourceLock(value = "a", mode = ResourceAccessMode.READ)
def writeA() {
when:
incrementAndBlock(atomicInteger, latch)
incrementAndBlock(atomicInteger, latch, 10000)
then:
atomicInteger.get() == 3
Expand Down Expand Up @@ -519,15 +519,15 @@ class ParallelSpec extends EmbeddedSpecification {
throws InterruptedException {
int value = sharedResource.incrementAndGet()
countDownLatch.countDown()
countDownLatch.await(timeout, MILLISECONDS)
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
return value
}

static void storeAndBlockAndCheck(AtomicInteger sharedResource, CountDownLatch countDownLatch, long timeout = 100)
throws InterruptedException {
int value = sharedResource.get()
countDownLatch.countDown()
countDownLatch.await(timeout, MILLISECONDS)
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
assert value == sharedResource.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
latch.await(10, TimeUnit.SECONDS)
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand All @@ -68,7 +68,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
latch.await(10, TimeUnit.SECONDS)
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand All @@ -93,7 +93,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
latch.await(10, TimeUnit.SECONDS)
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand Down

0 comments on commit 4a09c7a

Please sign in to comment.