Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
1. 增加channel相关方法,用于实现队列、Pub/Sub等
  • Loading branch information
chaz6chez committed Nov 18, 2023
1 parent d0d5191 commit e5cda2c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
8 changes: 6 additions & 2 deletions src/Cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Closure;
use Error;
use Workbunny\WebmanSharedCache\Traits\BasicMethods;
use Workbunny\WebmanSharedCache\Traits\ChannelMethods;
use Workbunny\WebmanSharedCache\Traits\HashMethods;

/**
Expand All @@ -30,6 +31,7 @@
* @method static array HExists(string $key, string|int ...$hashKey) Hash key 判断
*
* @method static bool Publish(string $key, mixed $message, null|string|int $workerId = null, bool $store = true) Channel 发布消息
* @method static array GetChannel(string $key) Channel 获取
* @method static bool|int CreateListener(string $key, string|int $workerId, Closure $listener) Channel 监听器创建
* @method static void RemoveListener(string $key, string|int $workerId) Channel 监听器移除
*
Expand All @@ -42,13 +44,15 @@ class Cache
{
use BasicMethods;
use HashMethods;
use ChannelMethods;

/** @var int 阻塞保险 */
public static int $fuse = 60;

/**
* @link HashMethods
* @link BasicMethods
* @link HashMethods hash相关
* @link BasicMethods 基础功能
* @link ChannelMethods 通道相关
*
* @param string $name
* @param array $arguments
Expand Down
16 changes: 16 additions & 0 deletions src/Future.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

class Future
{
public static bool $debug = false;
public static ?Closure $debugFunc = null;
public static array $debugArgs = [];

/**
* @var array = [id => func]
*/
Expand All @@ -22,6 +26,12 @@ class Future
*/
public static function add(Closure $func, array $args = []): int|false
{
if (self::$debug) {
self::$debugFunc = $func;
self::$debugArgs = $args;
return 1;
}

if (!Worker::$globalEvent) {
throw new Error("Event driver error. ");
}
Expand All @@ -44,6 +54,12 @@ public static function add(Closure $func, array $args = []): int|false
*/
public static function del(int|null $id = null): void
{
if (self::$debug) {
self::$debugFunc = null;
self::$debugArgs = [];
return;
}

if (!Worker::$globalEvent) {
throw new Error("Event driver error. ");
}
Expand Down
28 changes: 23 additions & 5 deletions src/Traits/ChannelMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ public static function GetChannelKey(string $key): string
return self::$_CHANNEL . $key;
}

/**
* 通道获取
*
* @param string $key
* @return array = [
* workerId = [
* 'futureId' => futureId,
* 'value' => array
* ]
* ]
*/
protected static function _GetChannel(string $key): array
{
return self::_Get(self::GetChannelKey($key), []);
}

/**
* 通道投递
* - 阻塞最大时长受fuse保护,默认60s
Expand Down Expand Up @@ -102,11 +118,13 @@ protected static function _CreateListener(string $key, string|int $workerId, Clo
// 原子性执行
Cache::Atomic($key, function () use ($key, $workerId, $listener) {
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
$value = $channel[$workerId]['value'] ?? [];
$msg = array_pop($value);
$channel[$workerId]['value'] = $value;
call_user_func($listener, $key, $workerId, $msg);
self::_Set($channelName, $channel);
if ((!empty($value = $channel[$workerId]['value'] ?? []))) {
$msg = array_pop($value);
$channel[$workerId]['value'] = $value;
call_user_func($listener, $key, $workerId, $msg);
self::_Set($channelName, $channel);
}

});
});
self::_Set($channelName, $channel);
Expand Down

0 comments on commit e5cda2c

Please sign in to comment.