From 20367418db73670c89410e4ca5ba67c7f802fac0 Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Wed, 27 Mar 2024 15:23:50 -0500 Subject: [PATCH] Ensure that repeatable tasks are always cancellable Prevent a problem where a repeating task cannot be cancelled if it is already running. Fixes #163. --- .../jboss/threads/EnhancedQueueExecutor.java | 49 ++++++++++++------- .../ScheduledEnhancedQueueExecutorTest.java | 35 ++++++++----- 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index a3139bb..9d270ef 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -2536,11 +2536,12 @@ public String toString() { static final int ASF_ST_WAITING = 0; static final int ASF_ST_CANCELLED = 1; - static final int ASF_ST_SUBMITTED = 2; - static final int ASF_ST_RUNNING = 3; - static final int ASF_ST_FINISHED = 4; - static final int ASF_ST_FAILED = 5; - static final int ASF_ST_REJECTED = 6; + static final int ASF_ST_CANCEL_PENDING = 2; + static final int ASF_ST_SUBMITTED = 3; + static final int ASF_ST_RUNNING = 4; + static final int ASF_ST_FINISHED = 5; + static final int ASF_ST_FAILED = 6; + static final int ASF_ST_REJECTED = 7; static final AbstractScheduledFuture[] NO_FUTURES = new AbstractScheduledFuture[0]; @@ -2597,6 +2598,7 @@ public long getDelay(final TimeUnit unit) { } public boolean isCancelled() { + int state = this.state; return state == ASF_ST_CANCELLED; } @@ -2617,11 +2619,16 @@ public boolean cancel(final boolean mayInterruptIfRunning) { doCancel(); return true; } + case ASF_ST_CANCEL_PENDING: case ASF_ST_RUNNING: { + this.state = ASF_ST_CANCEL_PENDING; if (mayInterruptIfRunning) { - liveThread.interrupt(); + Thread liveThread = this.liveThread; + if (liveThread != null) { + liveThread.interrupt(); + } } - return false; + return true; } case ASF_ST_CANCELLED: { return true; @@ -2643,6 +2650,7 @@ public V get() throws InterruptedException, ExecutionException { for (;;) { state = this.state; switch (state) { + case ASF_ST_CANCEL_PENDING: case ASF_ST_WAITING: case ASF_ST_SUBMITTED: case ASF_ST_RUNNING: { @@ -2659,6 +2667,7 @@ public V get() throws InterruptedException, ExecutionException { throw new ExecutionException((Throwable) result); } case ASF_ST_FINISHED: { + // never happens for repeatable tasks return (V) result; } } @@ -2675,6 +2684,7 @@ public V get(final long timeout, final TimeUnit unit) throws InterruptedExceptio for (;;) { state = this.state; switch (state) { + case ASF_ST_CANCEL_PENDING: case ASF_ST_WAITING: case ASF_ST_SUBMITTED: case ASF_ST_RUNNING: { @@ -2817,6 +2827,7 @@ void reject(RejectedExecutionException e) { void fail(Throwable t) { synchronized (this) { switch (state) { + case ASF_ST_CANCEL_PENDING: case ASF_ST_WAITING: case ASF_ST_SUBMITTED: case ASF_ST_RUNNING: { @@ -2840,11 +2851,13 @@ void fail(Throwable t) { void finish(V result) { // overridden in subclasses where the task repeats synchronized (this) { + liveThread = null; switch (state) { + case ASF_ST_CANCEL_PENDING: case ASF_ST_RUNNING: { + // for non-repeating tasks, a pending cancel does not invalidate finishing the task this.result = result; this.state = ASF_ST_FINISHED; - liveThread = null; notifyAll(); return; } @@ -2936,29 +2949,31 @@ abstract class RepeatingScheduledFuture extends AbstractScheduledFuture { */ abstract void adjustTime(); - public void run() { - super.run(); - // if an exception is thrown, we will have failed already anyway - adjustTime(); + void finish(final V result) { synchronized (this) { + liveThread = null; switch (state) { + case ASF_ST_CANCEL_PENDING: { + this.state = ASF_ST_CANCELLED; + notifyAll(); + return; + } case ASF_ST_RUNNING: { + // repeating tasks never actually finish + adjustTime(); state = ASF_ST_WAITING; schedulerTask.schedule(this); return; } default: { - // in all other cases, we failed so the task should not be rescheduled + // invalid state + fail(badState()); return; } } } } - void finish(final V result) { - // repeating tasks never actually finish - } - StringBuilder toString(final StringBuilder b) { return super.toString(b.append("repeating ")); } diff --git a/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java b/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java index 54d5320..fc6749e 100644 --- a/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java +++ b/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java @@ -2,6 +2,8 @@ import static org.junit.jupiter.api.Assertions.*; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -39,21 +41,16 @@ public void testCancelWhileRunning() throws Exception { ScheduledFuture future = eqe.schedule(() -> { latch.countDown(); Thread.sleep(1_000_000_000L); return Boolean.TRUE; }, 1, TimeUnit.NANOSECONDS); assertTrue(latch.await(5, TimeUnit.SECONDS), "Timely task execution"); assertFalse(future.isCancelled()); - // task is running; cancel will fail - assertFalse(future.cancel(false)); + // task is running + assertTrue(future.cancel(false)); assertFalse(future.isCancelled()); assertFalse(future.isDone()); - // now try to interrupt it (cancel still fails but the interrupt should be delivered) - assertFalse(future.cancel(true)); + // now try to interrupt it + assertTrue(future.cancel(true)); assertFalse(future.isCancelled()); // now get it - try { - future.get(100L, TimeUnit.MILLISECONDS); - fail("Expected exception"); - } catch (ExecutionException ee) { - Throwable cause = ee.getCause(); - assertTrue(cause instanceof InterruptedException, "Expected " + cause + " to be an InterruptedException"); - } + Throwable cause = assertThrows(ExecutionException.class, () -> future.get(100L, TimeUnit.MILLISECONDS)).getCause(); + assertInstanceOf(InterruptedException.class, cause); assertTrue(future.isDone()); eqe.shutdown(); assertTrue(eqe.awaitTermination(5, TimeUnit.SECONDS), "Timely shutdown"); @@ -146,6 +143,22 @@ public void testFixedDelayExecution() throws Exception { } } + @Test + public void testThatFixedDelayTerminatesTask() { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + var r = new Runnable() { + final ScheduledFuture future = eqe.scheduleWithFixedDelay(this, 0, 100, TimeUnit.MILLISECONDS); + final ArrayList times = new ArrayList<>(); + public void run() { + times.add(LocalDateTime.now()); + if (times.size() >= 5) { + future.cancel(false); + } + } + }; + assertThrows(CancellationException.class, () -> r.future.get(5, TimeUnit.SECONDS)); + } + @Test public void testCancelOnShutdown() throws Exception { EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();