Skip to content

Commit

Permalink
Rethrow unhandled exceptions in await, add TaskerChain#executeFuture (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
dasavick committed Oct 25, 2021
1 parent 6dbcc82 commit ae444eb
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public BukkitTask run(Runnable runnable, Runnable callback, boolean async) {
return Bukkit.getScheduler().runTask(this.plugin, task);
}
}

@Override
public void cancel(BukkitTask task) {
task.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

public class BukkitTasker extends Tasker {

private final Plugin plugin;

protected BukkitTasker(TaskerExecutor<?> executor, Plugin plugin) {
super(executor);
this.plugin = plugin;
}

public static BukkitTasker newPool(Plugin plugin) {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/eu/okaeri/tasker/core/TaskerExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public interface TaskerExecutor<T> {
T schedule(Runnable runnable, boolean async);

T run(Runnable runnable, Runnable callback, boolean async);

void cancel(T task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class SharedChain<T> extends TaskerChain<T> {
private final Queue<Runnable> queue;
private final AtomicBoolean executed = new AtomicBoolean(false);

public SharedChain(TaskerExecutor executor, Queue<Runnable> queue) {
public SharedChain(TaskerExecutor<?> executor, Queue<Runnable> queue) {
super(executor);
this.queue = queue;
}
Expand Down
135 changes: 114 additions & 21 deletions core/src/main/java/eu/okaeri/tasker/core/chain/TaskerChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,41 @@

import eu.okaeri.tasker.core.TaskerExecutor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

@RequiredArgsConstructor
public class TaskerChain<T> {

private final AtomicBoolean abort = new AtomicBoolean(false);
private final AtomicBoolean lastAsync = new AtomicBoolean(false);
private final AtomicBoolean executed = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);

private final AtomicReference<Object> data = new AtomicReference<>();
private final AtomicReference<Exception> exception = new AtomicReference<>();
private final AtomicReference<Object> currentTask = new AtomicReference<>();

private final List<ChainTask> tasks = new ArrayList<>();
private final TaskerExecutor executor;
private final TaskerExecutor<Object> executor;

@SuppressWarnings("unchecked")
public TaskerChain(TaskerExecutor<?> executor) {
this.executor = (TaskerExecutor<Object>) executor;
}

// SYNC
public TaskerChain<T> sync(@NonNull Runnable runnable) {
Expand Down Expand Up @@ -144,7 +154,7 @@ public TaskerChain<T> abortIfException() {

// EXECUTION
@SuppressWarnings("unchecked")
private void _execute(Consumer<T> consumer) {
private void _execute(Consumer<T> consumer, Consumer<Exception> unhandledExceptionConsumer) {

if (this.executed.get()) {
throw new RuntimeException("Cannot execute already executed chain");
Expand All @@ -156,20 +166,27 @@ private void _execute(Consumer<T> consumer) {
// handle exception after last task
Exception unhandled = this.exception.get();
if (unhandled != null) {
throw new RuntimeException("Unhandled chain exception", unhandled);
if (unhandledExceptionConsumer != null) {
unhandledExceptionConsumer.accept(unhandled);
} else {
throw new RuntimeException("Unhandled chain exception", unhandled);
}
}

// callback consumer
if (consumer != null) {
consumer.accept((T) this.data.get());
}

// mark as done
this.done.set(true);
};

// run tasks
this._executeTask(0, abortCallback);
this._executeTask(0, abortCallback, unhandledExceptionConsumer);
}

private void _executeTask(int index, Runnable abortCallback) {
private void _executeTask(int index, Runnable abortCallback, Consumer<Exception> unhandledExceptionConsumer) {

// no more tasks
if (index >= this.tasks.size()) {
Expand All @@ -189,13 +206,23 @@ private void _executeTask(int index, Runnable abortCallback) {
// check for unhandled exceptions
Exception unhandled = this.exception.get();
if (unhandled != null && !task.isExceptionHandler()) {
throw new RuntimeException("Unhandled chain exception", unhandled);
if (unhandledExceptionConsumer != null) {
// pass exception to the consumer
unhandledExceptionConsumer.accept(unhandled);
// check if marked for abort
if (this.abort.get()) {
abortCallback.run();
return;
}
} else {
throw new RuntimeException("Unhandled chain exception", unhandled);
}
}

// prepare callback
Runnable callback = () -> {
this.lastAsync.set(task.isAsync());
this._executeTask(index + 1, abortCallback);
this._executeTask(index + 1, abortCallback, unhandledExceptionConsumer);
};

// create handling runnable
Expand All @@ -208,33 +235,99 @@ private void _executeTask(int index, Runnable abortCallback) {
};

// execute
this.executor.run(runnable, callback, task.isAsync());
this.currentTask.set(this.executor.run(runnable, callback, task.isAsync()));
}

public void execute(@NonNull Consumer<T> consumer) {
this._execute(consumer);
this._execute(consumer, null);
}

public void execute() {
this._execute(null);
this._execute(null, null);
}

public Future<T> executeFuture() {
return new Future<T>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return TaskerChain.this.cancel();
}

@Override
public boolean isCancelled() {
return TaskerChain.this.abort.get();
}

@Override
public boolean isDone() {
return TaskerChain.this.done.get();
}

@Override
public T get() {
return TaskerChain.this.await();
}

@Override
public T get(long timeout, TimeUnit unit) {
return TaskerChain.this.await(timeout, unit);
}
};
}

public T await() {
return this.await(-1, null);
}

@SneakyThrows
@SuppressWarnings("BusyWait")
public T await() {
public T await(long timeout, TimeUnit unit) {

AtomicBoolean done = new AtomicBoolean();
Instant start = unit == null ? null : Instant.now();
AtomicReference<T> resource = new AtomicReference<>();

this._execute((data) -> {
resource.set(data);
done.set(true);
});

while (!done.get()) {
AtomicReference<Exception> exception = new AtomicReference<>();

this._execute(
resource::set,
(unhandledException) -> {
this.abort.set(true);
exception.set(unhandledException);
}
);

while (!this.done.get()) {
if (unit != null) {
Duration waitDuration = Duration.between(start, Instant.now());
if (waitDuration.toNanos() >= unit.toNanos(timeout)) {
this.cancel();
throw new TimeoutException("No result after " + waitDuration);
}
}
Thread.sleep(1L);
}

Exception unhandledException = exception.get();
if (unhandledException != null) {
throw unhandledException;
}

return resource.get();
}

public boolean cancel() {

if (this.abort.get()) {
return false;
}

this.abort.set(true);
Object currentTask = this.currentTask.get();

if (currentTask != null) {
this.executor.cancel(currentTask);
}

return true;
}
}
4 changes: 4 additions & 0 deletions core/src/test/java/eu/okaeri/taskertest/TaskerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public Object run(Runnable runnable, Runnable callback, boolean async) {
callback.run();
return null;
}

@Override
public void cancel(Object task) {
}
});
}

Expand Down

0 comments on commit ae444eb

Please sign in to comment.