diff --git a/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitExecutor.java b/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitExecutor.java index 1797c56..5d20e1d 100644 --- a/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitExecutor.java +++ b/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitExecutor.java @@ -39,4 +39,9 @@ public BukkitTask run(Runnable runnable, Runnable callback, boolean async) { return Bukkit.getScheduler().runTask(this.plugin, task); } } + + @Override + public void cancel(BukkitTask task) { + task.cancel(); + } } diff --git a/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitTasker.java b/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitTasker.java index b132b88..b97ad20 100644 --- a/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitTasker.java +++ b/bukkit/src/main/java/eu/okaeri/tasker/bukkit/BukkitTasker.java @@ -6,11 +6,8 @@ public class BukkitTasker extends Tasker { - private final Plugin plugin; - protected BukkitTasker(TaskerExecutor executor, Plugin plugin) { super(executor); - this.plugin = plugin; } public static BukkitTasker newPool(Plugin plugin) { diff --git a/core/src/main/java/eu/okaeri/tasker/core/TaskerExecutor.java b/core/src/main/java/eu/okaeri/tasker/core/TaskerExecutor.java index 5206c90..94a1eac 100644 --- a/core/src/main/java/eu/okaeri/tasker/core/TaskerExecutor.java +++ b/core/src/main/java/eu/okaeri/tasker/core/TaskerExecutor.java @@ -7,4 +7,6 @@ public interface TaskerExecutor { T schedule(Runnable runnable, boolean async); T run(Runnable runnable, Runnable callback, boolean async); + + void cancel(T task); } diff --git a/core/src/main/java/eu/okaeri/tasker/core/chain/SharedChain.java b/core/src/main/java/eu/okaeri/tasker/core/chain/SharedChain.java index b10b9b8..8b9a72f 100644 --- a/core/src/main/java/eu/okaeri/tasker/core/chain/SharedChain.java +++ b/core/src/main/java/eu/okaeri/tasker/core/chain/SharedChain.java @@ -12,7 +12,7 @@ public class SharedChain extends TaskerChain { private final Queue queue; private final AtomicBoolean executed = new AtomicBoolean(false); - public SharedChain(TaskerExecutor executor, Queue queue) { + public SharedChain(TaskerExecutor executor, Queue queue) { super(executor); this.queue = queue; } diff --git a/core/src/main/java/eu/okaeri/tasker/core/chain/TaskerChain.java b/core/src/main/java/eu/okaeri/tasker/core/chain/TaskerChain.java index 5b627f5..5a19366 100644 --- a/core/src/main/java/eu/okaeri/tasker/core/chain/TaskerChain.java +++ b/core/src/main/java/eu/okaeri/tasker/core/chain/TaskerChain.java @@ -2,12 +2,16 @@ import eu.okaeri.tasker.core.TaskerExecutor; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -15,18 +19,24 @@ import java.util.function.Predicate; import java.util.function.Supplier; -@RequiredArgsConstructor public class TaskerChain { private final AtomicBoolean abort = new AtomicBoolean(false); private final AtomicBoolean lastAsync = new AtomicBoolean(false); private final AtomicBoolean executed = new AtomicBoolean(false); + private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference data = new AtomicReference<>(); private final AtomicReference exception = new AtomicReference<>(); + private final AtomicReference currentTask = new AtomicReference<>(); private final List tasks = new ArrayList<>(); - private final TaskerExecutor executor; + private final TaskerExecutor executor; + + @SuppressWarnings("unchecked") + public TaskerChain(TaskerExecutor executor) { + this.executor = (TaskerExecutor) executor; + } // SYNC public TaskerChain sync(@NonNull Runnable runnable) { @@ -144,7 +154,7 @@ public TaskerChain abortIfException() { // EXECUTION @SuppressWarnings("unchecked") - private void _execute(Consumer consumer) { + private void _execute(Consumer consumer, Consumer unhandledExceptionConsumer) { if (this.executed.get()) { throw new RuntimeException("Cannot execute already executed chain"); @@ -156,20 +166,27 @@ private void _execute(Consumer consumer) { // handle exception after last task Exception unhandled = this.exception.get(); if (unhandled != null) { - throw new RuntimeException("Unhandled chain exception", unhandled); + if (unhandledExceptionConsumer != null) { + unhandledExceptionConsumer.accept(unhandled); + } else { + throw new RuntimeException("Unhandled chain exception", unhandled); + } } // callback consumer if (consumer != null) { consumer.accept((T) this.data.get()); } + + // mark as done + this.done.set(true); }; // run tasks - this._executeTask(0, abortCallback); + this._executeTask(0, abortCallback, unhandledExceptionConsumer); } - private void _executeTask(int index, Runnable abortCallback) { + private void _executeTask(int index, Runnable abortCallback, Consumer unhandledExceptionConsumer) { // no more tasks if (index >= this.tasks.size()) { @@ -189,13 +206,23 @@ private void _executeTask(int index, Runnable abortCallback) { // check for unhandled exceptions Exception unhandled = this.exception.get(); if (unhandled != null && !task.isExceptionHandler()) { - throw new RuntimeException("Unhandled chain exception", unhandled); + if (unhandledExceptionConsumer != null) { + // pass exception to the consumer + unhandledExceptionConsumer.accept(unhandled); + // check if marked for abort + if (this.abort.get()) { + abortCallback.run(); + return; + } + } else { + throw new RuntimeException("Unhandled chain exception", unhandled); + } } // prepare callback Runnable callback = () -> { this.lastAsync.set(task.isAsync()); - this._executeTask(index + 1, abortCallback); + this._executeTask(index + 1, abortCallback, unhandledExceptionConsumer); }; // create handling runnable @@ -208,33 +235,99 @@ private void _executeTask(int index, Runnable abortCallback) { }; // execute - this.executor.run(runnable, callback, task.isAsync()); + this.currentTask.set(this.executor.run(runnable, callback, task.isAsync())); } public void execute(@NonNull Consumer consumer) { - this._execute(consumer); + this._execute(consumer, null); } public void execute() { - this._execute(null); + this._execute(null, null); + } + + public Future executeFuture() { + return new Future() { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return TaskerChain.this.cancel(); + } + + @Override + public boolean isCancelled() { + return TaskerChain.this.abort.get(); + } + + @Override + public boolean isDone() { + return TaskerChain.this.done.get(); + } + + @Override + public T get() { + return TaskerChain.this.await(); + } + + @Override + public T get(long timeout, TimeUnit unit) { + return TaskerChain.this.await(timeout, unit); + } + }; + } + + public T await() { + return this.await(-1, null); } @SneakyThrows @SuppressWarnings("BusyWait") - public T await() { + public T await(long timeout, TimeUnit unit) { - AtomicBoolean done = new AtomicBoolean(); + Instant start = unit == null ? null : Instant.now(); AtomicReference resource = new AtomicReference<>(); - - this._execute((data) -> { - resource.set(data); - done.set(true); - }); - - while (!done.get()) { + AtomicReference exception = new AtomicReference<>(); + + this._execute( + resource::set, + (unhandledException) -> { + this.abort.set(true); + exception.set(unhandledException); + } + ); + + while (!this.done.get()) { + if (unit != null) { + Duration waitDuration = Duration.between(start, Instant.now()); + if (waitDuration.toNanos() >= unit.toNanos(timeout)) { + this.cancel(); + throw new TimeoutException("No result after " + waitDuration); + } + } Thread.sleep(1L); } + Exception unhandledException = exception.get(); + if (unhandledException != null) { + throw unhandledException; + } + return resource.get(); } + + public boolean cancel() { + + if (this.abort.get()) { + return false; + } + + this.abort.set(true); + Object currentTask = this.currentTask.get(); + + if (currentTask != null) { + this.executor.cancel(currentTask); + } + + return true; + } } diff --git a/core/src/test/java/eu/okaeri/taskertest/TaskerTest.java b/core/src/test/java/eu/okaeri/taskertest/TaskerTest.java index 7f0c8f0..1243c03 100644 --- a/core/src/test/java/eu/okaeri/taskertest/TaskerTest.java +++ b/core/src/test/java/eu/okaeri/taskertest/TaskerTest.java @@ -35,6 +35,10 @@ public Object run(Runnable runnable, Runnable callback, boolean async) { callback.run(); return null; } + + @Override + public void cancel(Object task) { + } }); }