diff --git a/src/thebigcrafter/Hydrogen/Hydrogen.php b/src/thebigcrafter/Hydrogen/Hydrogen.php index bc88747..f71f89e 100644 --- a/src/thebigcrafter/Hydrogen/Hydrogen.php +++ b/src/thebigcrafter/Hydrogen/Hydrogen.php @@ -13,6 +13,8 @@ use pocketmine\plugin\Plugin; use pocketmine\Server; +use thebigcrafter\Hydrogen\future\Future; +use thebigcrafter\Hydrogen\future\FutureState; use thebigcrafter\Hydrogen\tasks\CheckUpdatesTask; class Hydrogen { @@ -23,4 +25,43 @@ class Hydrogen { public static function checkForUpdates(Plugin $plugin) : void { Server::getInstance()->getAsyncPool()->submitTask(new CheckUpdatesTask($plugin->getName(), $plugin->getDescription()->getVersion())); } + + /** + * Creates a new fiber asynchronously using the given closure, returning a Future that is completed with the + * eventual return value of the passed function or will fail if the closure throws an exception. + * + * @template T + * + * @param \Closure(...):T $closure + * @param mixed ...$args Arguments forwarded to the closure when starting the fiber. + * + * @return Future + */ +public static function async(\Closure $closure, mixed ...$args) : Future +{ + static $run = null; + + $run ??= static function (FutureState $state, \Closure $closure, array $args) : void { + $s = $state; + $c = $closure; + + /* Null function arguments so an exception thrown from the closure does not contain the FutureState object + * in the stack trace, which would create a circular reference, preventing immediate garbage collection */ + $state = $closure = null; + + try { + // Clear $args to allow garbage collection of arguments during fiber execution + $s->complete($c(...$args, ...($args = []))); + } catch (\Throwable $exception) { + $s->error($exception); + } + }; + + $state = new FutureState(); + + EventLoop::queue($run, $state, $closure, $args); + + return new Future($state); +} + } diff --git a/src/thebigcrafter/Hydrogen/exceptions/Cancellation.php b/src/thebigcrafter/Hydrogen/exceptions/Cancellation.php new file mode 100644 index 0000000..5bde5a3 --- /dev/null +++ b/src/thebigcrafter/Hydrogen/exceptions/Cancellation.php @@ -0,0 +1,47 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\exceptions; + +interface Cancellation +{ + /** + * Subscribes a new handler to be invoked on a cancellation request. + * + * This handler might be invoked immediately in case the cancellation has already been requested. Any unhandled + * exceptions will be thrown into the event loop. + * + * @param \Closure(CancelledException) $callback Callback to be invoked on a cancellation request. Will receive a + * `CancelledException` as first argument that may be used to fail the operation. + * + * @return string Identifier that can be used to cancel the subscription. + */ + public function subscribe(\Closure $callback) : string; + + /** + * Unsubscribes a previously registered handler. + * + * The handler will no longer be called as long as this method isn't invoked from a subscribed callback. + */ + public function unsubscribe(string $id) : void; + + /** + * Returns whether cancellation has been requested yet. + */ + public function isRequested() : bool; + + /** + * Throws the `CancelledException` if cancellation has been requested, otherwise does nothing. + * + * @throws CancelledException + */ + public function throwIfRequested() : void; +} diff --git a/src/thebigcrafter/Hydrogen/exceptions/NullCancellation.php b/src/thebigcrafter/Hydrogen/exceptions/NullCancellation.php new file mode 100644 index 0000000..cc84c08 --- /dev/null +++ b/src/thebigcrafter/Hydrogen/exceptions/NullCancellation.php @@ -0,0 +1,35 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\exceptions; + +final class NullCancellation implements Cancellation +{ + public function subscribe(\Closure $callback) : string + { + return "null-cancellation"; + } + + public function unsubscribe(string $id) : void + { + // nothing to do + } + + public function isRequested() : bool + { + return false; + } + + public function throwIfRequested() : void + { + // nothing to do + } +} diff --git a/src/thebigcrafter/Hydrogen/future/Future.php b/src/thebigcrafter/Hydrogen/future/Future.php new file mode 100644 index 0000000..7191e1f --- /dev/null +++ b/src/thebigcrafter/Hydrogen/future/Future.php @@ -0,0 +1,268 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\future; + +use thebigcrafter\Hydrogen\EventLoop; +use thebigcrafter\Hydrogen\exceptions\Cancellation; +use thebigcrafter\Hydrogen\trait\ForbidCloning; +use thebigcrafter\Hydrogen\trait\ForbidSerialization; +use function is_array; + +/** + * @template-covariant T + */ +final class Future +{ + use ForbidCloning; + use ForbidSerialization; + + /** + * Iterate over the given futures in completion order. + * + * @template Tk + * @template Tv + * + * @param iterable> $futures + * @param Cancellation|null $cancellation Optional cancellation. + * + * @return iterable> + */ + public static function iterate(iterable $futures, ?Cancellation $cancellation = null) : iterable + { + $iterator = new FutureIterator($cancellation); + + // Directly iterate in case of an array, because there can't be suspensions during iteration + if (is_array($futures)) { + foreach ($futures as $key => $future) { + if (!$future instanceof self) { + throw new \TypeError('Array must only contain instances of ' . self::class); + } + $iterator->enqueue($future->state, $key, $future); + } + $iterator->complete(); + } else { + // Use separate fiber for iteration over non-array, because not all items might be immediately available + // while other futures are already completed. + EventLoop::queue(static function () use ($futures, $iterator) : void { + try { + foreach ($futures as $key => $future) { + if (!$future instanceof self) { + throw new \TypeError('Iterable must only provide instances of ' . self::class); + } + $iterator->enqueue($future->state, $key, $future); + } + $iterator->complete(); + } catch (\Throwable $exception) { + $iterator->error($exception); + } + }); + } + + while ($item = $iterator->consume()) { + yield $item[0] => $item[1]; + } + } + + /** + * @template Tv + * + * @param Tv $value + * + * @return Future + */ + public static function complete(mixed $value = null) : self + { + $state = new FutureState(); + $state->complete($value); + + return new self($state); + } + + /** + * @return Future + */ + public static function error(\Throwable $throwable) : self + { + /** @var FutureState $state */ + $state = new FutureState(); + $state->error($throwable); + + return new self($state); + } + + /** @var FutureState */ + private readonly FutureState $state; + + /** + * @param FutureState $state + * + * @internal Use {@see DeferredFuture} or {@see async()} to create and resolve a Future. + */ + public function __construct(FutureState $state) + { + $this->state = $state; + } + + /** + * @return bool True if the operation has completed. + */ + public function isComplete() : bool + { + return $this->state->isComplete(); + } + + /** + * Do not forward unhandled errors to the event loop handler. + * + * @return Future + */ + public function ignore() : self + { + $this->state->ignore(); + + return $this; + } + + /** + * Attaches a callback that is invoked if this future completes. The returned future is completed with the return + * value of the callback, or errors with an exception thrown from the callback. + * + * @psalm-suppress InvalidTemplateParam + * + * @template Tr + * + * @param \Closure(T):Tr $map + * + * @return Future + */ + public function map(\Closure $map) : self + { + $state = new FutureState(); + + $this->state->subscribe(static function (?\Throwable $error, mixed $value) use ($state, $map) : void { + if ($error) { + $state->error($error); + return; + } + + try { + /** @var T $value */ + $state->complete($map($value)); + } catch (\Throwable $exception) { + $state->error($exception); + } + }); + + return new self($state); + } + + /** + * Attaches a callback that is invoked if this future errors. The returned future is completed with the return + * value of the callback, or errors with an exception thrown from the callback. + * + * @template Tr + * + * @param \Closure(\Throwable):Tr $catch + * + * @return Future + */ + public function catch(\Closure $catch) : self + { + $state = new FutureState(); + + $this->state->subscribe(static function (?\Throwable $error, mixed $value) use ($state, $catch) : void { + if (!$error) { + $state->complete($value); + return; + } + + try { + $state->complete($catch($error)); + } catch (\Throwable $exception) { + $state->error($exception); + } + }); + + return new self($state); + } + + /** + * Attaches a callback that is always invoked when the future is completed. The returned future resolves with the + * same value as this future once the callback has finished execution. If the callback throws, the returned future + * will error with the thrown exception. + * + * @param \Closure():void $finally + * + * @return Future + */ + public function finally(\Closure $finally) : self + { + $state = new FutureState(); + + $this->state->subscribe(static function (?\Throwable $error, mixed $value) use ($state, $finally) : void { + try { + $finally(); + + if ($error) { + $state->error($error); + } else { + $state->complete($value); + } + } catch (\Throwable $exception) { + $state->error($exception); + } + }); + + return new self($state); + } + + /** + * Awaits the operation to complete. + * + * Throws an exception if the operation fails. + * + * @return T + */ + public function await(?Cancellation $cancellation = null) : mixed + { + $suspension = EventLoop::getSuspension(); + + $callbackId = $this->state->subscribe(static function (?\Throwable $error, mixed $value) use ( + $suspension + ) : void { + if ($error) { + $suspension->throw($error); + } else { + $suspension->resume($value); + } + }); + + $state = $this->state; + $cancellationId = $cancellation?->subscribe(static function (\Throwable $reason) use ( + $callbackId, + $suspension, + $state + ) : void { + $state->unsubscribe($callbackId); + if (!$state->isComplete()) { // Resume has already been scheduled if complete. + $suspension->throw($reason); + } + }); + + try { + return $suspension->suspend(); + } finally { + /** @psalm-suppress PossiblyNullArgument $cancellationId will not be null if $cancellation is not null. */ + $cancellation?->unsubscribe($cancellationId); + } + } +} diff --git a/src/thebigcrafter/Hydrogen/future/FutureIterator.php b/src/thebigcrafter/Hydrogen/future/FutureIterator.php new file mode 100644 index 0000000..0734d1b --- /dev/null +++ b/src/thebigcrafter/Hydrogen/future/FutureIterator.php @@ -0,0 +1,157 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\future; + +use thebigcrafter\Hydrogen\EventLoop; +use thebigcrafter\Hydrogen\exceptions\Cancellation; +use thebigcrafter\Hydrogen\exceptions\NullCancellation; +use thebigcrafter\Hydrogen\trait\ForbidCloning; +use thebigcrafter\Hydrogen\trait\ForbidSerialization; +use function array_key_first; + +/** + * @template Tk + * @template Tv + * + * @internal + */ +final class FutureIterator +{ + use ForbidCloning; + use ForbidSerialization; + + /** @var FutureIteratorQueue */ + private readonly FutureIteratorQueue $queue; + + private readonly Cancellation $cancellation; + + private readonly string $cancellationId; + + /** @var Future|Future|null */ + private ?Future $complete = null; + + public function __construct(?Cancellation $cancellation = null) + { + $this->queue = $queue = new FutureIteratorQueue(); + $this->cancellation = $cancellation ?? new NullCancellation(); + $this->cancellationId = $this->cancellation->subscribe(static function (\Throwable $reason) use ($queue) : void { + if ($queue->suspension) { + $queue->suspension->throw($reason); + $queue->suspension = null; + } + }); + } + + /** + * @param FutureState $state + * @param Tk $key + * @param Future $future + */ + public function enqueue(FutureState $state, mixed $key, Future $future) : void + { + if ($this->complete) { + throw new \Error('Iterator has already been marked as complete'); + } + + $queue = $this->queue; // Using separate object to avoid a circular reference. + + /** + * @param Tv|null $result + */ + $handler = static function (?\Throwable $error, mixed $result, string $id) use ( + $key, + $future, + $queue + ) : void { + unset($queue->pending[$id]); + + if ($queue->suspension) { + $queue->suspension->resume([$key, $future]); + $queue->suspension = null; + return; + } + + $queue->items[] = [$key, $future]; + }; + + $id = $state->subscribe($handler); + + $queue->pending[$id] = $state; + } + + public function complete() : void + { + if ($this->complete) { + throw new \Error('Iterator has already been marked as complete'); + } + + $this->complete = Future::complete(); + + if (!$this->queue->pending && $this->queue->suspension) { + $this->queue->suspension->resume(); + $this->queue->suspension = null; + } + } + + public function error(\Throwable $exception) : void + { + if ($this->complete) { + throw new \Error('Iterator has already been marked as complete'); + } + + $this->complete = Future::error($exception); + + if (!$this->queue->pending && $this->queue->suspension) { + $this->queue->suspension->throw($exception); + $this->queue->suspension = null; + } + } + + /** + * @return null|array{Tk, Future} + */ + public function consume() : ?array + { + if ($this->queue->suspension) { + throw new \Error('Concurrent consume() operations are not supported'); + } + + if (!$this->queue->items) { + if ($this->complete && !$this->queue->pending) { + return $this->complete->await(); + } + + $this->cancellation->throwIfRequested(); + + $this->queue->suspension = EventLoop::getSuspension(); + + /** @var null|array{Tk, Future} */ + return $this->queue->suspension->suspend(); + } + + $key = array_key_first($this->queue->items); + $item = $this->queue->items[$key]; + + unset($this->queue->items[$key]); + + /** @var null|array{Tk, Future} */ + return $item; + } + + public function __destruct() + { + $this->cancellation->unsubscribe($this->cancellationId); + foreach ($this->queue->pending as $id => $state) { + $state->unsubscribe($id); + } + } +} diff --git a/src/thebigcrafter/Hydrogen/future/FutureIteratorQueue.php b/src/thebigcrafter/Hydrogen/future/FutureIteratorQueue.php new file mode 100644 index 0000000..de680ae --- /dev/null +++ b/src/thebigcrafter/Hydrogen/future/FutureIteratorQueue.php @@ -0,0 +1,25 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\future; + +use thebigcrafter\Hydrogen\eventLoop\Suspension; + +final class FutureIteratorQueue +{ + /** @var list}> */ + public array $items = []; + + /** @var array> */ + public array $pending = []; + + public ?Suspension $suspension = null; +} diff --git a/src/thebigcrafter/Hydrogen/future/FutureState.php b/src/thebigcrafter/Hydrogen/future/FutureState.php new file mode 100644 index 0000000..c8669ff --- /dev/null +++ b/src/thebigcrafter/Hydrogen/future/FutureState.php @@ -0,0 +1,140 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\future; + +use thebigcrafter\Hydrogen\EventLoop; + +final class FutureState +{ + // Static so they can be used as array keys + private static string $nextId = 'a'; + + private bool $complete = false; + + private bool $handled = false; + + /** @var array */ + private array $callbacks = []; + + /** @var T|null */ + private mixed $result = null; + + private ?\Throwable $throwable = null; + + private ?string $origin = null; + + public function __destruct() + { + if ($this->throwable && !$this->handled) { + $throwable = new UnhandledFutureError($this->throwable, $this->origin); + EventLoop::queue(static fn () => throw $throwable); + } + } + + /** + * Registers a callback to be notified once the operation is complete or errored. + * + * The callback is invoked directly from the event loop context, so suspension within the callback is not possible. + * + * @param \Closure(?\Throwable, ?T, string): void $callback Callback invoked on error / successful completion of + * the future. + * + * @return string Identifier that can be used to cancel interest for this future. + */ + public function subscribe(\Closure $callback) : string + { + $id = self::$nextId++; + + $this->handled = true; // Even if unsubscribed later, consider the future handled. + + if ($this->complete) { + EventLoop::queue($callback, $this->throwable, $this->result, $id); + } else { + $this->callbacks[$id] = $callback; + } + + return $id; + } + + /** + * Cancels a subscription. + * + * Cancellations are advisory only. The callback might still be called if it is already queued for execution. + * + * @param string $id Identifier returned from subscribe() + */ + public function unsubscribe(string $id) : void + { + unset($this->callbacks[$id]); + } + + /** + * Completes the operation with a result value. + * + * @param T $result Result of the operation. + */ + public function complete(mixed $result) : void + { + if ($this->complete) { + throw new \Error('Operation is no longer pending'); + } + + if ($result instanceof Future) { + throw new \Error('Cannot complete with an instance of ' . Future::class); + } + + $this->result = $result; + $this->invokeCallbacks(); + } + + /** + * Marks the operation as failed. + * + * @param \Throwable $throwable Throwable to indicate the error. + */ + public function error(\Throwable $throwable) : void + { + if ($this->complete) { + throw new \Error('Operation is no longer pending'); + } + + $this->throwable = $throwable; + $this->invokeCallbacks(); + } + + /** + * @return bool True if the operation has completed. + */ + public function isComplete() : bool + { + return $this->complete; + } + + /** + * Suppress the exception thrown to the loop error handler if and operation error is not handled by a callback. + */ + public function ignore() : void + { + $this->handled = true; + } + + private function invokeCallbacks() : void + { + $this->complete = true; + + foreach ($this->callbacks as $id => $callback) { + EventLoop::queue($callback, $this->throwable, $this->result, $id); + } + + $this->callbacks = []; + } +} diff --git a/src/thebigcrafter/Hydrogen/trait/ForbidCloning.php b/src/thebigcrafter/Hydrogen/trait/ForbidCloning.php new file mode 100644 index 0000000..e18a06c --- /dev/null +++ b/src/thebigcrafter/Hydrogen/trait/ForbidCloning.php @@ -0,0 +1,20 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\trait; + +trait ForbidCloning +{ + final protected function __clone() + { + throw new \Error(__CLASS__ . ' does not support cloning'); + } +} diff --git a/src/thebigcrafter/Hydrogen/trait/ForbidSerialization.php b/src/thebigcrafter/Hydrogen/trait/ForbidSerialization.php new file mode 100644 index 0000000..752f978 --- /dev/null +++ b/src/thebigcrafter/Hydrogen/trait/ForbidSerialization.php @@ -0,0 +1,20 @@ + + * This source file is subject to the Apache-2.0 license that is bundled + * with this source code in the file LICENSE. + */ + +declare(strict_types=1); + +namespace thebigcrafter\Hydrogen\trait; + +trait ForbidSerialization +{ + final public function __serialize() : never + { + throw new \Error(__CLASS__ . ' does not support serialization'); + } +} diff --git a/virion.yml b/virion.yml index 83c30c7..fd0f141 100644 --- a/virion.yml +++ b/virion.yml @@ -1,5 +1,5 @@ name: Hydrogen -version: 1.1.1 +version: 1.1.2 php: [8.1] author: thebigcrafter api: [5.0.0]