From 8c790b0dccdb6d2971b73418e9c1fec04df7dcf3 Mon Sep 17 00:00:00 2001 From: chaz6chez Date: Fri, 28 Jun 2024 13:54:31 +0800 Subject: [PATCH] fixed bugs --- src/Events/AbstractEvent.php | 1 + src/Events/ServerEvent.php | 31 +++++++++++++++++++++++++++ src/Events/Subscribe.php | 14 ++++++------ src/Events/Unsubscribe.php | 9 ++++---- src/PushServer.php | 40 +++++++++++++++++++++++++++-------- src/Traits/ChannelMethods.php | 2 +- 6 files changed, 74 insertions(+), 23 deletions(-) create mode 100644 src/Events/ServerEvent.php diff --git a/src/Events/AbstractEvent.php b/src/Events/AbstractEvent.php index f4e139b..9522181 100644 --- a/src/Events/AbstractEvent.php +++ b/src/Events/AbstractEvent.php @@ -37,6 +37,7 @@ abstract class AbstractEvent EVENT_SUBSCRIBE => Subscribe::class, EVENT_UNSUBSCRIBE => Unsubscribe::class, self::CLIENT_EVENT => ClientEvent::class, + self::SERVER_EVENT => ServerEvent::class, ]; /** diff --git a/src/Events/ServerEvent.php b/src/Events/ServerEvent.php new file mode 100644 index 0000000..83355fe --- /dev/null +++ b/src/Events/ServerEvent.php @@ -0,0 +1,31 @@ + + * @copyright chaz6chez + * @link https://github.com/workbunny/webman-push-server + * @license https://github.com/workbunny/webman-push-server/blob/main/LICENSE + */ +declare(strict_types=1); + +namespace Workbunny\WebmanPushServer\Events; + +use Workerman\Connection\TcpConnection; + +class ServerEvent extends AbstractEvent +{ + /** + * server事件无需响应 + * + * @param TcpConnection $connection + * @param array $request + * @return void + */ + public function response(TcpConnection $connection, array $request): void + { + return; + } +} \ No newline at end of file diff --git a/src/Events/Subscribe.php b/src/Events/Subscribe.php index 9267c85..843b776 100644 --- a/src/Events/Subscribe.php +++ b/src/Events/Subscribe.php @@ -16,8 +16,6 @@ use RedisException; use support\Log; use Workbunny\WebmanPushServer\PushServer; -use Workbunny\WebmanPushServer\Traits\ChannelMethods; -use Workbunny\WebmanPushServer\Traits\StorageMethods; use Workerman\Connection\TcpConnection; use function Workbunny\WebmanPushServer\uuid; use const Workbunny\WebmanPushServer\CHANNEL_TYPE_PRESENCE; @@ -132,14 +130,14 @@ public static function subscribeChannel(TcpConnection $connection, string $chann // 为当前进程增加订阅的通道 PushServer::_setChannel($appKey, $channel, $socketId); - $storage = StorageMethods::getStorageClient(); + $storage = PushServer::getStorageClient(); // 通道是否已经被建立 - $channelExists = $storage->exists($key = StorageMethods::_getChannelStorageKey($appKey, $channel)); + $channelExists = $storage->exists($key = PushServer::_getChannelStorageKey($appKey, $channel)); if (!$channelExists) { /** @see PushServer::$_storage */ $storage->hSet($key, 'type', $type); // 内部事件广播 通道被创建事件 - ChannelMethods::publish(ChannelMethods::$publishTypeServer, [ + PushServer::publish(PushServer::$publishTypeServer, [ 'appKey' => $appKey, 'channel' => $channel, 'event' => EVENT_CHANNEL_OCCUPIED, @@ -163,7 +161,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann } // 如果是presence通道 if ($isPresence = ($type === CHANNEL_TYPE_PRESENCE)) { - if (!$storage->exists($userKey = StorageMethods::_getUserStorageKey($appKey, $channel, $userId))) { + if (!$storage->exists($userKey = PushServer::_getUserStorageKey($appKey, $channel, $userId))) { $storage->hIncrBy($key ,'user_count', 1); $storage->hMSet($userKey, [ 'user_id' => $userId, @@ -176,7 +174,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann * * {"event":"pusher_internal:member_added","data":{"user_id":1488465780,"user_info":"{\"name\":\"123\",\"sex:\"1\"}","channel ":"presence-channel"}} */ - ChannelMethods::publishUseRetry(PushServer::$publishTypeClient, [ + PushServer::publishUseRetry(PushServer::$publishTypeClient, [ 'appKey' => $appKey, 'channel' => $channel, 'event' => EVENT_MEMBER_ADDED, @@ -201,7 +199,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann $channel, EVENT_SUBSCRIPTION_SUCCEEDED, $isPresence ? - StorageMethods::_getPresenceChannelDataForSubscribe($appKey, $channel) : + PushServer::_getPresenceChannelDataForSubscribe($appKey, $channel) : '{}' ); } catch (RedisException $exception){ diff --git a/src/Events/Unsubscribe.php b/src/Events/Unsubscribe.php index be704cd..df8334f 100644 --- a/src/Events/Unsubscribe.php +++ b/src/Events/Unsubscribe.php @@ -17,7 +17,6 @@ use stdClass; use support\Log; use Workbunny\WebmanPushServer\PushServer; -use Workbunny\WebmanPushServer\Traits\StorageMethods; use Workerman\Connection\TcpConnection; use function Workbunny\WebmanPushServer\uuid; use const Workbunny\WebmanPushServer\CHANNEL_TYPE_PRESENCE; @@ -70,11 +69,11 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha $channels = PushServer::_getConnectionProperty($connection, 'channels'); if ($type = $channels[$channel] ?? null) { - $storage = StorageMethods::getStorageClient(); + $storage = PushServer::getStorageClient(); // presence通道 if ($type === CHANNEL_TYPE_PRESENCE) { - if ($users = $storage->keys(StorageMethods::_getUserStorageKey($appKey, $channel, $uid))) { - $userCount = $storage->hIncrBy(StorageMethods::_getChannelStorageKey($appKey, $channel), 'user_count', -count($users)); + if ($users = $storage->keys(PushServer::_getUserStorageKey($appKey, $channel, $uid))) { + $userCount = $storage->hIncrBy(PushServer::_getChannelStorageKey($appKey, $channel), 'user_count', -count($users)); if ($userCount <= 0) { $storage->del(...$users); } @@ -96,7 +95,7 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha } } // 查询通道订阅数量 - $subCount = $storage->hIncrBy($key = StorageMethods::_getChannelStorageKey($appKey, $channel), 'subscription_count', -1); + $subCount = $storage->hIncrBy($key = PushServer::_getChannelStorageKey($appKey, $channel), 'subscription_count', -1); if ($subCount <= 0) { $storage->del($key); // 内部事件广播 通道被移除事件 diff --git a/src/PushServer.php b/src/PushServer.php index 10949bf..e66b4d1 100644 --- a/src/PushServer.php +++ b/src/PushServer.php @@ -61,13 +61,16 @@ class PushServer */ protected static array $_channels = []; - /** @var int 心跳定时器 */ - protected int $_heartbeatTimer = 0; + /** @var int|null 心跳定时器 */ + protected ?int $_heartbeatTimer = null; /** @var int 心跳 */ protected int $_keepaliveTimeout = 60; - public function __construct() + /** @var AbstractEvent|null 最近一个事件 */ + protected ?AbstractEvent $_lastEvent = null; + + protected function __construct() { $this->_keepaliveTimeout = self::getConfig('heartbeat', 60); } @@ -98,7 +101,7 @@ public function onWorkerStart(): void // 通道订阅 static::subscribe(); // 心跳设置 - if ($this->_heartbeatTimer > 0) { + if ($this->_keepaliveTimeout > 0 and !$this->_heartbeatTimer) { $this->_heartbeatTimer = Timer::add($this->_keepaliveTimeout / 2, function () { /** * @var string $appKey @@ -184,12 +187,14 @@ public function onConnect(TcpConnection $connection): void public function onMessage(TcpConnection $connection, $data): void { if (is_string($data)) { - static::_setConnectionProperty($connection, 'clientNotSendPingCount', 0); - if ($data = @json_decode($data, true)){ + if ($data = @json_decode($data, true)) { // 获取事件 - if ($factory = AbstractEvent::factory($data['event'] ?? '')){ + $this->setLastEvent(AbstractEvent::factory($data['event'] ?? '')); + if ($event = $this->getLastEvent()) { + // 心跳计数归零 + static::_setConnectionProperty($connection, 'clientNotSendPingCount', 0); // 事件响应 - $factory->response($connection, $data); + $event->response($connection, $data); return; } } @@ -221,6 +226,23 @@ public function onClose(TcpConnection $connection): void } } + /** + * @return AbstractEvent|null + */ + public function getLastEvent(): ?AbstractEvent + { + return $this->_lastEvent; + } + + /** + * @param AbstractEvent|null $lastEvent + * @return void + */ + public function setLastEvent(?AbstractEvent $lastEvent): void + { + $this->_lastEvent = $lastEvent; + } + /** * 向连接发送错误消息 * @@ -282,7 +304,7 @@ public static function terminateConnections(string $appKey, string $socketId, ar } /** @inheritDoc */ - protected static function _subscribeResponse(string $type, array $data): void + public static function _subscribeResponse(string $type, array $data): void { if ($type === ChannelMethods::$publishTypeClient) { try { diff --git a/src/Traits/ChannelMethods.php b/src/Traits/ChannelMethods.php index 577137d..277f790 100644 --- a/src/Traits/ChannelMethods.php +++ b/src/Traits/ChannelMethods.php @@ -163,5 +163,5 @@ public static function publishUseRetry(string $type, array $data, float $retryIn * @param array $data 消息数据 * @return void */ - abstract protected static function _subscribeResponse(string $type, array $data): void; + abstract public static function _subscribeResponse(string $type, array $data): void; } \ No newline at end of file