Skip to content

Commit

Permalink
Merge pull request #164 from dmlloyd/fix-163
Browse files Browse the repository at this point in the history
Ensure that repeatable tasks are always cancellable
  • Loading branch information
dmlloyd authored Mar 27, 2024
2 parents 7c44fc3 + 2036741 commit d28ccdb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 28 deletions.
49 changes: 32 additions & 17 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2536,11 +2536,12 @@ public String toString() {

static final int ASF_ST_WAITING = 0;
static final int ASF_ST_CANCELLED = 1;
static final int ASF_ST_SUBMITTED = 2;
static final int ASF_ST_RUNNING = 3;
static final int ASF_ST_FINISHED = 4;
static final int ASF_ST_FAILED = 5;
static final int ASF_ST_REJECTED = 6;
static final int ASF_ST_CANCEL_PENDING = 2;
static final int ASF_ST_SUBMITTED = 3;
static final int ASF_ST_RUNNING = 4;
static final int ASF_ST_FINISHED = 5;
static final int ASF_ST_FAILED = 6;
static final int ASF_ST_REJECTED = 7;

static final AbstractScheduledFuture<?>[] NO_FUTURES = new AbstractScheduledFuture<?>[0];

Expand Down Expand Up @@ -2597,6 +2598,7 @@ public long getDelay(final TimeUnit unit) {
}

public boolean isCancelled() {
int state = this.state;
return state == ASF_ST_CANCELLED;
}

Expand All @@ -2617,11 +2619,16 @@ public boolean cancel(final boolean mayInterruptIfRunning) {
doCancel();
return true;
}
case ASF_ST_CANCEL_PENDING:
case ASF_ST_RUNNING: {
this.state = ASF_ST_CANCEL_PENDING;
if (mayInterruptIfRunning) {
liveThread.interrupt();
Thread liveThread = this.liveThread;
if (liveThread != null) {
liveThread.interrupt();
}
}
return false;
return true;
}
case ASF_ST_CANCELLED: {
return true;
Expand All @@ -2643,6 +2650,7 @@ public V get() throws InterruptedException, ExecutionException {
for (;;) {
state = this.state;
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_WAITING:
case ASF_ST_SUBMITTED:
case ASF_ST_RUNNING: {
Expand All @@ -2659,6 +2667,7 @@ public V get() throws InterruptedException, ExecutionException {
throw new ExecutionException((Throwable) result);
}
case ASF_ST_FINISHED: {
// never happens for repeatable tasks
return (V) result;
}
}
Expand All @@ -2675,6 +2684,7 @@ public V get(final long timeout, final TimeUnit unit) throws InterruptedExceptio
for (;;) {
state = this.state;
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_WAITING:
case ASF_ST_SUBMITTED:
case ASF_ST_RUNNING: {
Expand Down Expand Up @@ -2817,6 +2827,7 @@ void reject(RejectedExecutionException e) {
void fail(Throwable t) {
synchronized (this) {
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_WAITING:
case ASF_ST_SUBMITTED:
case ASF_ST_RUNNING: {
Expand All @@ -2840,11 +2851,13 @@ void fail(Throwable t) {
void finish(V result) {
// overridden in subclasses where the task repeats
synchronized (this) {
liveThread = null;
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_RUNNING: {
// for non-repeating tasks, a pending cancel does not invalidate finishing the task
this.result = result;
this.state = ASF_ST_FINISHED;
liveThread = null;
notifyAll();
return;
}
Expand Down Expand Up @@ -2936,29 +2949,31 @@ abstract class RepeatingScheduledFuture<V> extends AbstractScheduledFuture<V> {
*/
abstract void adjustTime();

public void run() {
super.run();
// if an exception is thrown, we will have failed already anyway
adjustTime();
void finish(final V result) {
synchronized (this) {
liveThread = null;
switch (state) {
case ASF_ST_CANCEL_PENDING: {
this.state = ASF_ST_CANCELLED;
notifyAll();
return;
}
case ASF_ST_RUNNING: {
// repeating tasks never actually finish
adjustTime();
state = ASF_ST_WAITING;
schedulerTask.schedule(this);
return;
}
default: {
// in all other cases, we failed so the task should not be rescheduled
// invalid state
fail(badState());
return;
}
}
}
}

void finish(final V result) {
// repeating tasks never actually finish
}

StringBuilder toString(final StringBuilder b) {
return super.toString(b.append("repeating "));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static org.junit.jupiter.api.Assertions.*;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -39,21 +41,16 @@ public void testCancelWhileRunning() throws Exception {
ScheduledFuture<Boolean> future = eqe.schedule(() -> { latch.countDown(); Thread.sleep(1_000_000_000L); return Boolean.TRUE; }, 1, TimeUnit.NANOSECONDS);
assertTrue(latch.await(5, TimeUnit.SECONDS), "Timely task execution");
assertFalse(future.isCancelled());
// task is running; cancel will fail
assertFalse(future.cancel(false));
// task is running
assertTrue(future.cancel(false));
assertFalse(future.isCancelled());
assertFalse(future.isDone());
// now try to interrupt it (cancel still fails but the interrupt should be delivered)
assertFalse(future.cancel(true));
// now try to interrupt it
assertTrue(future.cancel(true));
assertFalse(future.isCancelled());
// now get it
try {
future.get(100L, TimeUnit.MILLISECONDS);
fail("Expected exception");
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
assertTrue(cause instanceof InterruptedException, "Expected " + cause + " to be an InterruptedException");
}
Throwable cause = assertThrows(ExecutionException.class, () -> future.get(100L, TimeUnit.MILLISECONDS)).getCause();
assertInstanceOf(InterruptedException.class, cause);
assertTrue(future.isDone());
eqe.shutdown();
assertTrue(eqe.awaitTermination(5, TimeUnit.SECONDS), "Timely shutdown");
Expand Down Expand Up @@ -146,6 +143,22 @@ public void testFixedDelayExecution() throws Exception {
}
}

@Test
public void testThatFixedDelayTerminatesTask() {
EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
var r = new Runnable() {
final ScheduledFuture<?> future = eqe.scheduleWithFixedDelay(this, 0, 100, TimeUnit.MILLISECONDS);
final ArrayList<LocalDateTime> times = new ArrayList<>();
public void run() {
times.add(LocalDateTime.now());
if (times.size() >= 5) {
future.cancel(false);
}
}
};
assertThrows(CancellationException.class, () -> r.future.get(5, TimeUnit.SECONDS));
}

@Test
public void testCancelOnShutdown() throws Exception {
EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
Expand Down

0 comments on commit d28ccdb

Please sign in to comment.