Skip to content

Commit

Permalink
fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Jun 28, 2024
1 parent e3dfad7 commit 8c790b0
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/Events/AbstractEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ abstract class AbstractEvent
EVENT_SUBSCRIBE => Subscribe::class,
EVENT_UNSUBSCRIBE => Unsubscribe::class,
self::CLIENT_EVENT => ClientEvent::class,
self::SERVER_EVENT => ServerEvent::class,
];

/**
Expand Down
31 changes: 31 additions & 0 deletions src/Events/ServerEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php
/**
* This file is part of workbunny.
*
* Redistributions of files must retain the above copyright notice.
*
* @author chaz6chez<[email protected]>
* @copyright chaz6chez<[email protected]>
* @link https://github.com/workbunny/webman-push-server
* @license https://github.com/workbunny/webman-push-server/blob/main/LICENSE
*/
declare(strict_types=1);

namespace Workbunny\WebmanPushServer\Events;

use Workerman\Connection\TcpConnection;

class ServerEvent extends AbstractEvent
{
/**
* server事件无需响应
*
* @param TcpConnection $connection
* @param array $request
* @return void
*/
public function response(TcpConnection $connection, array $request): void
{
return;
}
}
14 changes: 6 additions & 8 deletions src/Events/Subscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use RedisException;
use support\Log;
use Workbunny\WebmanPushServer\PushServer;
use Workbunny\WebmanPushServer\Traits\ChannelMethods;
use Workbunny\WebmanPushServer\Traits\StorageMethods;
use Workerman\Connection\TcpConnection;
use function Workbunny\WebmanPushServer\uuid;
use const Workbunny\WebmanPushServer\CHANNEL_TYPE_PRESENCE;
Expand Down Expand Up @@ -132,14 +130,14 @@ public static function subscribeChannel(TcpConnection $connection, string $chann
// 为当前进程增加订阅的通道
PushServer::_setChannel($appKey, $channel, $socketId);

$storage = StorageMethods::getStorageClient();
$storage = PushServer::getStorageClient();
// 通道是否已经被建立
$channelExists = $storage->exists($key = StorageMethods::_getChannelStorageKey($appKey, $channel));
$channelExists = $storage->exists($key = PushServer::_getChannelStorageKey($appKey, $channel));
if (!$channelExists) {
/** @see PushServer::$_storage */
$storage->hSet($key, 'type', $type);
// 内部事件广播 通道被创建事件
ChannelMethods::publish(ChannelMethods::$publishTypeServer, [
PushServer::publish(PushServer::$publishTypeServer, [
'appKey' => $appKey,
'channel' => $channel,
'event' => EVENT_CHANNEL_OCCUPIED,
Expand All @@ -163,7 +161,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann
}
// 如果是presence通道
if ($isPresence = ($type === CHANNEL_TYPE_PRESENCE)) {
if (!$storage->exists($userKey = StorageMethods::_getUserStorageKey($appKey, $channel, $userId))) {
if (!$storage->exists($userKey = PushServer::_getUserStorageKey($appKey, $channel, $userId))) {
$storage->hIncrBy($key ,'user_count', 1);
$storage->hMSet($userKey, [
'user_id' => $userId,
Expand All @@ -176,7 +174,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann
*
* {"event":"pusher_internal:member_added","data":{"user_id":1488465780,"user_info":"{\"name\":\"123\",\"sex:\"1\"}","channel ":"presence-channel"}}
*/
ChannelMethods::publishUseRetry(PushServer::$publishTypeClient, [
PushServer::publishUseRetry(PushServer::$publishTypeClient, [
'appKey' => $appKey,
'channel' => $channel,
'event' => EVENT_MEMBER_ADDED,
Expand All @@ -201,7 +199,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann
$channel,
EVENT_SUBSCRIPTION_SUCCEEDED,
$isPresence ?
StorageMethods::_getPresenceChannelDataForSubscribe($appKey, $channel) :
PushServer::_getPresenceChannelDataForSubscribe($appKey, $channel) :
'{}'
);
} catch (RedisException $exception){
Expand Down
9 changes: 4 additions & 5 deletions src/Events/Unsubscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use stdClass;
use support\Log;
use Workbunny\WebmanPushServer\PushServer;
use Workbunny\WebmanPushServer\Traits\StorageMethods;
use Workerman\Connection\TcpConnection;
use function Workbunny\WebmanPushServer\uuid;
use const Workbunny\WebmanPushServer\CHANNEL_TYPE_PRESENCE;
Expand Down Expand Up @@ -70,11 +69,11 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha
$channels = PushServer::_getConnectionProperty($connection, 'channels');

if ($type = $channels[$channel] ?? null) {
$storage = StorageMethods::getStorageClient();
$storage = PushServer::getStorageClient();
// presence通道
if ($type === CHANNEL_TYPE_PRESENCE) {
if ($users = $storage->keys(StorageMethods::_getUserStorageKey($appKey, $channel, $uid))) {
$userCount = $storage->hIncrBy(StorageMethods::_getChannelStorageKey($appKey, $channel), 'user_count', -count($users));
if ($users = $storage->keys(PushServer::_getUserStorageKey($appKey, $channel, $uid))) {
$userCount = $storage->hIncrBy(PushServer::_getChannelStorageKey($appKey, $channel), 'user_count', -count($users));
if ($userCount <= 0) {
$storage->del(...$users);
}
Expand All @@ -96,7 +95,7 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha
}
}
// 查询通道订阅数量
$subCount = $storage->hIncrBy($key = StorageMethods::_getChannelStorageKey($appKey, $channel), 'subscription_count', -1);
$subCount = $storage->hIncrBy($key = PushServer::_getChannelStorageKey($appKey, $channel), 'subscription_count', -1);
if ($subCount <= 0) {
$storage->del($key);
// 内部事件广播 通道被移除事件
Expand Down
40 changes: 31 additions & 9 deletions src/PushServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,16 @@ class PushServer
*/
protected static array $_channels = [];

/** @var int 心跳定时器 */
protected int $_heartbeatTimer = 0;
/** @var int|null 心跳定时器 */
protected ?int $_heartbeatTimer = null;

/** @var int 心跳 */
protected int $_keepaliveTimeout = 60;

public function __construct()
/** @var AbstractEvent|null 最近一个事件 */
protected ?AbstractEvent $_lastEvent = null;

protected function __construct()
{
$this->_keepaliveTimeout = self::getConfig('heartbeat', 60);
}
Expand Down Expand Up @@ -98,7 +101,7 @@ public function onWorkerStart(): void
// 通道订阅
static::subscribe();
// 心跳设置
if ($this->_heartbeatTimer > 0) {
if ($this->_keepaliveTimeout > 0 and !$this->_heartbeatTimer) {
$this->_heartbeatTimer = Timer::add($this->_keepaliveTimeout / 2, function () {
/**
* @var string $appKey
Expand Down Expand Up @@ -184,12 +187,14 @@ public function onConnect(TcpConnection $connection): void
public function onMessage(TcpConnection $connection, $data): void
{
if (is_string($data)) {
static::_setConnectionProperty($connection, 'clientNotSendPingCount', 0);
if ($data = @json_decode($data, true)){
if ($data = @json_decode($data, true)) {
// 获取事件
if ($factory = AbstractEvent::factory($data['event'] ?? '')){
$this->setLastEvent(AbstractEvent::factory($data['event'] ?? ''));
if ($event = $this->getLastEvent()) {
// 心跳计数归零
static::_setConnectionProperty($connection, 'clientNotSendPingCount', 0);
// 事件响应
$factory->response($connection, $data);
$event->response($connection, $data);
return;
}
}
Expand Down Expand Up @@ -221,6 +226,23 @@ public function onClose(TcpConnection $connection): void
}
}

/**
* @return AbstractEvent|null
*/
public function getLastEvent(): ?AbstractEvent
{
return $this->_lastEvent;
}

/**
* @param AbstractEvent|null $lastEvent
* @return void
*/
public function setLastEvent(?AbstractEvent $lastEvent): void
{
$this->_lastEvent = $lastEvent;
}

/**
* 向连接发送错误消息
*
Expand Down Expand Up @@ -282,7 +304,7 @@ public static function terminateConnections(string $appKey, string $socketId, ar
}

/** @inheritDoc */
protected static function _subscribeResponse(string $type, array $data): void
public static function _subscribeResponse(string $type, array $data): void
{
if ($type === ChannelMethods::$publishTypeClient) {
try {
Expand Down
2 changes: 1 addition & 1 deletion src/Traits/ChannelMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,5 @@ public static function publishUseRetry(string $type, array $data, float $retryIn
* @param array $data 消息数据
* @return void
*/
abstract protected static function _subscribeResponse(string $type, array $data): void;
abstract public static function _subscribeResponse(string $type, array $data): void;
}

0 comments on commit 8c790b0

Please sign in to comment.