Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
1. 修复Channel方法bug
2. 增加Channel相关测试用例
3. 完善文档
  • Loading branch information
chaz6chez committed Nov 18, 2023
1 parent e5cda2c commit 10e59f9
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 45 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,56 @@
])
}
```
### 3. Cache的Channel功能
- Channel是一个类似Redis-stream、Redis-list、Redis-Pub/Sub的功能模块
- 一个通道可以被多个进程监听,每个进程只能监听一个相同通道(也就是对相同通道只能创建一个监听器)
- **向通道发布消息**
- 临时消息
```php
# 向一个名为test的通道发送临时消息;
# 通道没有监听器时,临时消息会被忽略,只有通道存在监听器时,该消息才会被存入通道
Cache::ChPublish('test', '这是一个测试消息', false);
```
- 暂存消息
```php
# 向一个名为test的通道发送暂存消息;
# 通道存在监听器时,该消息会被存入通道内的所有子通道
Cache::ChPublish('test', '这是一个测试消息', true);
```
- 指定workerId
```php
# 指定发送消息至当前通道内workerId为1的子通道
Cache::ChPublish('test', '这是一个测试消息', true, 1);
```
- **创建通道监听器**
- 一个进程对相同通道仅能创建一个监听器
- 一个进程可以同时监听多个不同的通道
- 建议workerId使用workerman的workerId进行区分
```php
# 向一个名为test的通道创建一个workerId为1的监听器;
# 通道消息先进先出,当有消息时会触发回调
Cache::ChCreateListener('test', '1', function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的业务逻辑
dump($channelKey, $workerId, $message);
});
```
- **移除通道监听器**
- 移除监听器子通道及子通道内消息
```php
# 向一个名为test的通道创建一个workerId为1的监听器;
# 通道移除时不会移除其他子通道消息
Cache::ChRemoveListener('test', '1', true);
```
- 移除监听器子通道,但保留子通道内消息
```php
# 向一个名为test的通道创建一个workerId为1的监听器;
# 通道移除时不会移除所有子通道消息
Cache::ChRemoveListener('test', '1', false);
```
### 其他功能具体可以参看代码注释和测试用例
31 changes: 3 additions & 28 deletions src/Cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,9 @@
/**
* 基于APCu的进程共享内存
*
* @method static bool Set(string $key, mixed $value, array $optional = []) 设置缓存值
* @method static mixed Get(string $key, mixed $default = null) 获取缓存值
* @method static array Del(string ...$keys) 移除缓存
* @method static array Keys(null|string $regex = null) 获取缓存键
* @method static bool|int|float Incr(string $key, int|float $value = 1, int $ttl = 0) 自增
* @method static bool|int|float Decr(string $key, int|float $value = 1, int $ttl = 0) 自减
* @method static array Exists(string ...$keys) 判断缓存键
*
* @method static void Search(string $regex, Closure $handler, int $chunkSize = 100) 搜索键值 - 正则匹配
* @method static bool Atomic(string $lockKey, Closure $handler, bool $blocking = false) 原子操作
*
* @method static bool HSet(string $key, string|int $hashKey, mixed $hashValue) Hash 设置
* @method static bool HDel(string $key, string|int ...$hashKey) Hash 移除
* @method static mixed HGet(string $key, string|int $hashKey, mixed $default = null) Hash 获取
* @method static array HKeys(string $key, null|string $regex = null) Hash keys
* @method static bool|int|float HIncr(string $key, string|int $hashKey, int|float $value = 1) Hash 自增
* @method static bool|int|float HDecr(string $key, string|int $hashKey, int|float $value = 1) Hash 自减
* @method static array HExists(string $key, string|int ...$hashKey) Hash key 判断
*
* @method static bool Publish(string $key, mixed $message, null|string|int $workerId = null, bool $store = true) Channel 发布消息
* @method static array GetChannel(string $key) Channel 获取
* @method static bool|int CreateListener(string $key, string|int $workerId, Closure $listener) Channel 监听器创建
* @method static void RemoveListener(string $key, string|int $workerId) Channel 监听器移除
*
* @method static array LockInfo() 获取锁信息
* @method static array KeyInfo(string $key) 获取键信息
* @method static array Info(bool $limited = false) 获取信息
* @method static bool Clear() 清理所有缓存
* @link HashMethods hash相关
* @link BasicMethods 基础功能
* @link ChannelMethods 通道相关
*/
class Cache
{
Expand Down
17 changes: 17 additions & 0 deletions src/Traits/BasicMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@
use APCUIterator;
use Closure;

/**
* @method static bool Set(string $key, mixed $value, array $optional = []) 设置缓存值
* @method static mixed Get(string $key, mixed $default = null) 获取缓存值
* @method static array Del(string ...$keys) 移除缓存
* @method static array Keys(null|string $regex = null) 获取缓存键
* @method static bool|int|float Incr(string $key, int|float $value = 1, int $ttl = 0) 自增
* @method static bool|int|float Decr(string $key, int|float $value = 1, int $ttl = 0) 自减
* @method static array Exists(string ...$keys) 判断缓存键
*
* @method static void Search(string $regex, Closure $handler, int $chunkSize = 100) 搜索键值 - 正则匹配
* @method static bool Atomic(string $lockKey, Closure $handler, bool $blocking = false) 原子操作
*
* @method static array LockInfo() 获取锁信息
* @method static array KeyInfo(string $key) 获取键信息
* @method static array Info(bool $limited = false) 获取信息
* @method static bool Clear() 清理所有缓存
*/
trait BasicMethods
{
/** @var string 写锁 */
Expand Down
72 changes: 55 additions & 17 deletions src/Traits/ChannelMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,24 @@
use Workbunny\WebmanSharedCache\Future;
use Error;

/**
* @method static array GetChannel(string $key) Channel 获取
* @method static bool ChPublish(string $key, mixed $message, bool $store = true, null|string|int $workerId = null) Channel 发布消息
* @method static bool|int ChCreateListener(string $key, string|int $workerId, Closure $listener) Channel 监听器创建
* @method static void ChRemoveListener(string $key, string|int $workerId, bool $remove = true) Channel 监听器移除
*/
trait ChannelMethods
{
use BasicMethods;

/** @var string 通道前缀 */
protected static string $_CHANNEL = '#Channel#';

/**
* @var array = [channelKey => listeners]
*/
protected static array $_listeners = [];

/**
* @param string $key
* @return string
Expand Down Expand Up @@ -50,11 +61,10 @@ protected static function _GetChannel(string $key): array
* @param bool $store 在没有监听器时是否进行储存
* @return bool
*/
protected static function _Publish(string $key, mixed $message, null|string|int $workerId = null, bool $store = true): bool
protected static function _ChPublish(string $key, mixed $message, bool $store = true, null|string|int $workerId = null): bool
{
$func = __FUNCTION__;
$params = func_get_args();
$key = self::GetChannelKey($key);
self::_Atomic($key, function () use (
$key, $message, $func, $params, $store
) {
Expand All @@ -67,11 +77,21 @@ protected static function _Publish(string $key, mixed $message, null|string|int
* ]
*/
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
foreach ($channel as $workerId => $item) {
if ($store or isset($item['futureId'])) {
$channel[$workerId]['value'][] = $message;
// 如果还没有监听器,将数据投入默认
if (!$channel) {
if ($store) {
$channel['--default--']['value'][] = $message;
}
}
// 否则将消息投入到每个worker的监听器数据中
else {
foreach ($channel as $workerId => $item) {
if ($store or isset($item['futureId'])) {
$channel[$workerId]['value'][] = $message;
}
}
}

self::_Set($channelName, $channel);
return [
'timestamp' => microtime(true),
Expand All @@ -93,13 +113,18 @@ protected static function _Publish(string $key, mixed $message, null|string|int
* @param Closure $listener = function(string $channelName, string|int $workerId, mixed $message) {}
* @return bool|int 监听器id
*/
protected static function _CreateListener(string $key, string|int $workerId, Closure $listener): bool|int
protected static function _ChCreateListener(string $key, string|int $workerId, Closure $listener): bool|int
{
$func = __FUNCTION__;
$result = false;
$params = func_get_args();
$params[2] = '\Closure';
if (isset(self::$_listeners[$key])) {
throw new Error("Channel $key listener already exist. ");
}
self::$_listeners[$key] = $listener;
self::_Atomic($key, function () use (
$key, $workerId, $listener, $func, $params, &$result
$key, $workerId, $func, $params, &$result
) {
/**
* [
Expand All @@ -110,23 +135,30 @@ protected static function _CreateListener(string $key, string|int $workerId, Clo
* ]
*/
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
if (isset($channel[$workerId]['futureId'])) {
throw new Error("Channel $key listener already exist. ");
}

// 设置回调
$channel[$workerId] = $result = Future::add(function () use ($key, $workerId, $listener) {
$channel[$workerId]['futureId'] = $result = Future::add(function () use ($key, $workerId) {
// 原子性执行
Cache::Atomic($key, function () use ($key, $workerId, $listener) {
self::_Atomic($key, function () use ($key, $workerId) {
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
if ((!empty($value = $channel[$workerId]['value'] ?? []))) {
$msg = array_pop($value);
// 先进先出
$msg = array_shift($value);
$channel[$workerId]['value'] = $value;
call_user_func($listener, $key, $workerId, $msg);
call_user_func(self::$_listeners[$key], $key, $workerId, $msg);
self::_Set($channelName, $channel);
}

});
});
$channel[$workerId]['value'] = [];
// 如果存在默认数据
if ($default = $channel['--default--']['value'] ?? []) {
foreach ($channel as &$item) {
array_unshift($item['value'], ...$default);
}
unset($channel['--default--']);
}
self::_Set($channelName, $channel);
return [
'timestamp' => microtime(true),
Expand All @@ -143,14 +175,15 @@ protected static function _CreateListener(string $key, string|int $workerId, Clo
*
* @param string $key
* @param string|int $workerId
* @param bool $remove 是否移除消息
* @return void
*/
protected static function _RemoveListener(string $key, string|int $workerId): void
protected static function _ChRemoveListener(string $key, string|int $workerId, bool $remove = true): void
{
$func = __FUNCTION__;
$params = func_get_args();
self::_Atomic($key, function () use (
$key, $workerId, $func, $params
$key, $workerId, $func, $params, $remove
) {
/**
* [
Expand All @@ -163,7 +196,12 @@ protected static function _RemoveListener(string $key, string|int $workerId): vo
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
if ($id = $channel[$workerId]['futureId'] ?? null) {
Future::del($id);
unset($channel[$workerId]['futureId']);
if ($remove) {
unset($channel[$workerId]);
} else {
$channel[$workerId]['futureId'] = null;
}
unset(self::$_listeners[$key]);
self::_Set($channelName, $channel);
}

Expand Down
9 changes: 9 additions & 0 deletions src/Traits/HashMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

namespace Workbunny\WebmanSharedCache\Traits;

/**
* @method static bool HSet(string $key, string|int $hashKey, mixed $hashValue) Hash 设置
* @method static bool HDel(string $key, string|int ...$hashKey) Hash 移除
* @method static mixed HGet(string $key, string|int $hashKey, mixed $default = null) Hash 获取
* @method static array HKeys(string $key, null|string $regex = null) Hash keys
* @method static bool|int|float HIncr(string $key, string|int $hashKey, int|float $value = 1) Hash 自增
* @method static bool|int|float HDecr(string $key, string|int $hashKey, int|float $value = 1) Hash 自减
* @method static array HExists(string $key, string|int ...$hashKey) Hash key 判断
*/
trait HashMethods
{
use BasicMethods;
Expand Down
Loading

0 comments on commit 10e59f9

Please sign in to comment.