Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
1. 增加channel相关方法,用于实现队列、Pub/Sub等
  • Loading branch information
chaz6chez committed Nov 17, 2023
1 parent 95de757 commit d0d5191
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/Cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
* @method static bool|int|float HDecr(string $key, string|int $hashKey, int|float $value = 1) Hash 自减
* @method static array HExists(string $key, string|int ...$hashKey) Hash key 判断
*
* @method static bool Publish(string $key, mixed $message, null|string|int $workerId = null, bool $store = true) Channel 发布消息
* @method static bool|int CreateListener(string $key, string|int $workerId, Closure $listener) Channel 监听器创建
* @method static void RemoveListener(string $key, string|int $workerId) Channel 监听器移除
*
* @method static array LockInfo() 获取锁信息
* @method static array KeyInfo(string $key) 获取键信息
* @method static array Info(bool $limited = false) 获取信息
Expand Down
14 changes: 6 additions & 8 deletions src/Traits/ChannelMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ protected static function _Publish(string $key, mixed $message, null|string|int
* @param string $key
* @param string|int $workerId
* @param Closure $listener = function(string $channelName, string|int $workerId, mixed $message) {}
* @return bool|int|float
* @return bool|int 监听器id
*/
protected static function _CreateListener(string $key, string|int $workerId, Closure $listener): bool|int|float
protected static function _CreateListener(string $key, string|int $workerId, Closure $listener): bool|int
{
$func = __FUNCTION__;
$result = false;
Expand All @@ -98,7 +98,7 @@ protected static function _CreateListener(string $key, string|int $workerId, Clo
throw new Error("Channel $key listener already exist. ");
}
// 设置回调
$channel[$workerId] = Future::add(function () use ($key, $workerId, $listener) {
$channel[$workerId] = $result = Future::add(function () use ($key, $workerId, $listener) {
// 原子性执行
Cache::Atomic($key, function () use ($key, $workerId, $listener) {
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
Expand All @@ -125,15 +125,14 @@ protected static function _CreateListener(string $key, string|int $workerId, Clo
*
* @param string $key
* @param string|int $workerId
* @return bool|int|float
* @return void
*/
protected static function _RemoveListener(string $key, string|int $workerId): bool|int|float
protected static function _RemoveListener(string $key, string|int $workerId): void
{
$func = __FUNCTION__;
$result = false;
$params = func_get_args();
self::_Atomic($key, function () use (
$key, $workerId, $func, $params, &$result
$key, $workerId, $func, $params
) {
/**
* [
Expand All @@ -157,6 +156,5 @@ protected static function _RemoveListener(string $key, string|int $workerId): bo
'result' => null
];
}, true);
return $result;
}
}

0 comments on commit d0d5191

Please sign in to comment.