Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 27 #28

Merged
merged 6 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,17 @@ composer require workbunny/webman-coroutine

## 文档

| 目录 | 地址 |
|:---:|:----------------------------------------------------------------------------------------------:|
| API | [Fucntion-APIs](https://workbunny.github.io/webman-coroutine/) |
| 教程 | [PHP 协程入门](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/coroutine.md) |
| - | [安装及配置](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/install.md) |
| - | [助手函数](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/helpers.md) |
| - | [`workerman`环境](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/workerman.md) |
| - | [`webman`框架](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/webman.md) |
| - | [`Utils`说明](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/utils.md) |
| - | [自定义拓展](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/custom.md) |
| 目录 | 地址 |
|:---:|:------------------------------------------------------------------------------------------------------:|
| API | [Fucntion-APIs](https://workbunny.github.io/webman-coroutine/) |
| 教程 | [PHP 协程入门](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/coroutine.md) |
| - | [安装及配置](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/install.md) |
| - | [助手函数](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/helpers.md) |
| - | [`workerman`环境](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/workerman.md) |
| - | [`webman`框架](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/webman.md) |
| - | [`Utils`说明](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/utils.md) |
| - | [自定义拓展](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/custom.md) |
| - | [协程的观测和管理](https://github.com/workbunny/webman-coroutine/tree/main/docs/doc/suspension-manage.md) |

## 参与开发

Expand Down
80 changes: 80 additions & 0 deletions docs/doc/suspension-manage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# 协程的观测和管理

`webman-coroutine`中提供的关于协程的部分来自`Utils/Coroutine`和`Factory::sleep()`

## `Utils/Coroutine` 协程工具

> `Utils/Coroutine`会在每次构造的时候将注入一个WeakMap,并储存`id`、`startTime`

- 获取当前进程所有协程

```php
$weakMap = \Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine::getCoroutinesWeakMap();
```
> Tips: 方法返回一个储存所有通过`Utils/Coroutine`创建的协程的`WeakMap`

- 退出当前进程所有协程

```php
$weakMap = \Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine::getCoroutinesWeakMap();
/**
* @var \Workbunny\WebmanCoroutine\Utils\Coroutine\Handlers\CoroutineInterface $coroutine
* @var array<string, mixed> $info ['id' => 协程id, 'startTime' => 开始时间]
*/
foreach ($weakMap as $coroutine => $info) {
$coroutine->kill(new \Workbunny\WebmanCoroutine\Exceptions\KilledException());
}
```
> Tips:
> - `kill`方法并不会立即退出协程,而是在该协程下一次唤起的时候触发,抛出一个异常
> - 各协程驱动各略有不同,如:swoole驱动在协程遇到文件IO事件时并不会立即退出,也不会在下次唤起时抛出异常

## `Handler::$suspension`挂起事件

> 所有基于`Factory::sleep()`创建的挂起事件既是`Handler::$suspension`,包括`sleep()`、`wait_for()`和`Factory::waitFor()`

- 获取当前进程所有挂起事件

```php
$weakMap = \Workbunny\WebmanCoroutine\Factory::getSuspensionsWeakMap();
```
> Tips: 方法返回一个储存所有通过`Factory`创建的协程的`WeakMap`

- 添加/设置一个挂起事件至`WeakMap`
```php
$weakMap = \Workbunny\WebmanCoroutine\Factory::setSuspensionsWeakMap();
```

- 退出当前进程所有挂起事件

```php
$weakMap = \Workbunny\WebmanCoroutine\Factory::getSuspensionsWeakMap();
/**
* @var mixed $suspension
* @var array<string, mixed> $info ['id' => 协程id, 'startTime' => 开始时间, 'event' => 挂起事件|NULL]
*/
foreach ($weakMap as $suspension => $info) {
\Workbunny\WebmanCoroutine\Factory::kill($suspension);
}
```

> Tips:
> - `kill`方法并不会立即退出协程,而是在该挂起下一次被唤起的时候触发,抛出一个`KilledException`
> - 各协程驱动各略有不同,如:`swoole`驱动在协程遇到文件IO事件时并不会立即退出,挂起事件会抛出`KilledException`

## 观测/管理实践

### 采样监听方案

1. 在服务进程启动时创建定时器
2. 定时器实现`Coroutine::getCoroutinesWeakMap()`和`Factory::getSuspensionsWeakMap()`的采样,并以进程pid为区分输出至日志

> Tips: 可以自定义实现对协程或挂起事件的startTime比对,合理杀死过长挂起的协程/事件

### 遥控管理方案

1. 在进程中实现命令对应的控制逻辑,例如:`kill`、`dump`、`check`等
2. 在服务进程启动时通过`redis`/`apcu`对通道进行监听,注册命令对应的控制监听
3. 遥控cli程序通过对通道的`pub`发送指定命令进行控制

> Tips: `webman`/`workerman` 环境基于`apcu`的共享缓存插件推荐:https://www.workerman.net/plugin/133
2 changes: 2 additions & 0 deletions src/Events/SwooleEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ public function destroy()
foreach (Coroutine::listCoroutines() as $coroutine) {
Coroutine::cancel($coroutine);
}
// Wait for coroutines to exit
usleep(100000);
// 退出event loop
Event::exit();
}
Expand Down
41 changes: 41 additions & 0 deletions src/Exceptions/KilledException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php
/**
* @author workbunny/Chaz6chez
* @email [email protected]
*/
declare(strict_types=1);

namespace Workbunny\WebmanCoroutine\Exceptions;

use Throwable;

/**
* 协程或挂起事件被杀死
*/
class KilledException extends RuntimeException
{
/**
* @var string|null
*/
protected null|string $event;

/**
* @param string $message
* @param int $code
* @param string|null $event
* @param Throwable|null $previous
*/
public function __construct(string $message = "", int $code = 0, ?string $event = null, ?Throwable $previous = null)
{
$this->event = $event;
parent::__construct($message, $code, $previous);
}

/**
* @return string|null
*/
public function getEvent(): ?string
{
return $this->event;
}
}
33 changes: 31 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Workbunny\WebmanCoroutine;

use WeakMap;
use Workbunny\WebmanCoroutine\Events\SwooleEvent;
use Workbunny\WebmanCoroutine\Events\SwowEvent;
use Workbunny\WebmanCoroutine\Handlers\DefaultHandler;
Expand All @@ -20,7 +21,15 @@
use Workbunny\WebmanCoroutine\Handlers\SwowWorkerman5Handler;

/**
* 工厂化启动器
* 工厂化启动器
* @method static void isAvailable()
* @method static void initEnv()
* @method static void waitFor(?\Closure $action = null, int|float $timeout = -1, ?string $event = null)
* @method static void wakeup(string $event)
* @method static void sleep(int|float $timeout = 0, ?string $event = null)
* @method static void kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0)
* @method static null|WeakMap getSuspensionsWeakMap()
* @method static void setSuspensionsWeakMap(object $object, string|int $id, ?string $event, float|int $startTime)
*/
class Factory
{
Expand All @@ -47,7 +56,7 @@ class Factory
self::WORKERMAN_SWOOLE => SwooleWorkerman5Handler::class,
self::WORKBUNNY_SWOOLE => SwooleHandler::class,
self::REVOLT_FIBER => RevoltHandler::class,
self::RIPPLE_FIBER => RippleHandler::class,
self::RIPPLE_FIBER_4 => RippleHandler::class,
self::RIPPLE_FIBER_5 => RippleWorkerman5Handler::class,
];

Expand Down Expand Up @@ -167,4 +176,24 @@ public static function init(?string $eventLoopClass): void
self::$_currentEventLoop = $eventLoopClass;
}
}

/**
* 代理调用HandlerInterface方法
*
* @param string $name
* @param array $arguments
* @return mixed
*/
public static function __callStatic(string $name, array $arguments)
{
if (($handler = Factory::getCurrentHandler()) === null) {
Factory::init(null);
/** @var HandlerInterface $handler */
$handler = Factory::getCurrentHandler();
}
if (method_exists($handler, $name)) {
return $handler::$name(...$arguments);
}
throw new \BadMethodCallException("Method $name not exists. ");
}
}
12 changes: 8 additions & 4 deletions src/Handlers/DefaultHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class DefaultHandler implements HandlerInterface
{
use HandlerMethods;

/**
* 测试用,为保证覆盖生成时不会无限等待
*
Expand Down Expand Up @@ -73,12 +75,14 @@ public static function wakeup(string $event): void
{
}

/** @inheritDoc
* @param float|int $timeout
* @param string|null $event
*/
/** @inheritDoc */
public static function sleep(float|int $timeout = 0, ?string $event = null): void
{
usleep(max((int) $timeout * 1000 * 1000, 0));
}

/** @inheritdoc */
public static function kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0): void
{
}
}
19 changes: 19 additions & 0 deletions src/Handlers/HandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Workbunny\WebmanCoroutine\Handlers;

use Throwable;
use WeakMap;
use Workbunny\WebmanCoroutine\Exceptions\TimeoutException;

/**
Expand Down Expand Up @@ -57,4 +58,22 @@ public static function wakeup(string $event): void;
* @return void
*/
public static function sleep(int|float $timeout = 0, ?string $event = null): void;

/**
* 协程强制终止
*
* @param object|int|string $suspensionOrSuspensionId
* @param string $message
* @param int $exitCode
* @return void
*/
public static function kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0): void;

/**
* 获取所有挂起的对象
*
* @return WeakMap
* @link HandlerMethods::getSuspensionsWeakMap()
*/
public static function getSuspensionsWeakMap(): WeakMap;
}
34 changes: 34 additions & 0 deletions src/Handlers/HandlerMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,44 @@

namespace Workbunny\WebmanCoroutine\Handlers;

use WeakMap;
use Workerman\Worker;

trait HandlerMethods
{
/**
* @var WeakMap<object, array>|null <挂起对象, ['id' => int|string, 'event' => string|null, 'startTime' => float|int]>
*/
protected static ?WeakMap $_suspensionsWeakMap = null;

/**
* 获取挂起对象
*
* @return WeakMap
*/
public static function getSuspensionsWeakMap(): WeakMap
{
return self::$_suspensionsWeakMap = static::$_suspensionsWeakMap ?: new WeakMap();
}

/**
* 添加挂起对象
*
* @param object $object
* @param string|int $id
* @param string|null $event
* @param float|int $startTime
* @return void
*/
public static function setSuspensionsWeakMap(object $object, string|int $id, ?string $event, float|int $startTime): void
{
static::getSuspensionsWeakMap()->offsetSet($object, [
'id' => $id,
'event' => $event,
'startTime' => $startTime
]);
}

/**
* @codeCoverageIgnore 为了测试可以mock
*
Expand Down
22 changes: 22 additions & 0 deletions src/Handlers/RevoltHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Workbunny\WebmanCoroutine\Handlers;

use Revolt\EventLoop;
use Workbunny\WebmanCoroutine\Exceptions\KilledException;
use Workbunny\WebmanCoroutine\Exceptions\TimeoutException;

use function Workbunny\WebmanCoroutine\package_installed;
Expand Down Expand Up @@ -77,6 +78,7 @@ public static function sleep(int|float $timeout = 0, ?string $event = null): voi
{
try {
$suspension = EventLoop::getSuspension();
static::setSuspensionsWeakMap($suspension, spl_object_hash($suspension), $event, microtime(true));
if ($event) {
static::$_suspensions[$event] = $suspension;
if ($timeout < 0) {
Expand Down Expand Up @@ -110,4 +112,24 @@ public static function sleep(int|float $timeout = 0, ?string $event = null): voi
}
}
}

/** @inheritdoc */
public static function kill(object|int|string $suspensionOrSuspensionId, string $message = 'kill', int $exitCode = 0): void
{
if ($suspensionOrSuspensionId instanceof EventLoop\Suspension) {
if ($info = static::getSuspensionsWeakMap()->offsetGet($suspensionOrSuspensionId)) {
$suspensionOrSuspensionId->throw(new KilledException($message, $exitCode, $info['event'] ?? null));
}
} else {
/**
* @var EventLoop\Suspension $object
* @var array $info
*/
foreach (static::getSuspensionsWeakMap() as $object => $info) {
if ($info['id'] === $suspensionOrSuspensionId) {
$object->throw(new KilledException($message, $exitCode, $info['event'] ?? null));
}
}
}
}
}
Loading