Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Feb 8, 2024
1 parent 1151d80 commit 5369883
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/ChannelServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public function __construct() {
// 由于使用了webman自定义进程启动,所以无须Server原有的构造方式
}

public function onWorkerStart(Worker $worker) {
public function onWorkerStart(Worker $worker): void
{
$worker->count = 1;
$worker->onMessage = [$this, 'onMessage'];
$worker->onClose = [$this, 'onClose'];
Expand Down
15 changes: 9 additions & 6 deletions src/HookServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class HookServer implements ServerInterface
protected array $claimStartTags = [];

/** @inheritDoc */
public static function getConfig(string $key, $default = null)
public static function getConfig(string $key, mixed $default = null): mixed
{
return config('plugin.workbunny.webman-push-server.app.hook-server.' . $key, $default);
}
Expand Down Expand Up @@ -164,7 +164,7 @@ public function ack(string $queue, string $group, array $idArray): bool
* @param string $consumer
* @return void
*/
public function claim(string $queue, string $group, string $consumer)
public function claim(string $queue, string $group, string $consumer): void
{
try {
if ($idArray = self::getStorage()->xAutoClaim(
Expand Down Expand Up @@ -199,7 +199,7 @@ public function claim(string $queue, string $group, string $consumer)
* @return void
* @throws Exception
*/
public function consumer(string $queue, string $group, string $consumer, int $blockTime)
public function consumer(string $queue, string $group, string $consumer, int $blockTime): void
{
try {
// 创建组
Expand Down Expand Up @@ -237,7 +237,7 @@ public function consumer(string $queue, string $group, string $consumer, int $bl
/**
* @return void
*/
protected function _tempInit()
protected function _tempInit(): void
{
$config = config('database.connections')['plugin.workbunny.webman-push-server.local-storage'] ?? [];
if ($config) {
Expand Down Expand Up @@ -321,8 +321,11 @@ function () use ($queue, $group, $consumer) {
$this->_consumerTimer = Timer::add(
$interval = self::getConfig('consumer_interval', 1) / 1000,
function () use ($worker, $interval, $queue, $group, $consumer) {
// 处理pending消息
$this->claim($queue, $group, $consumer);
// 如果没有claim定时器,则每次消费时进行claim
if (!$this->_claimTimer) {
// 处理pending消息
$this->claim($queue, $group, $consumer);
}
// 执行消费
$this->consumer($queue, $group, $consumer, (int)($interval * 1000));
});
Expand Down
4 changes: 2 additions & 2 deletions src/ServerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public static function getConfig(string $key, mixed $default = null): mixed;

/**
* 获取储存器
* @return Redis
* @return Redis|null
*/
public static function getStorage(): Redis;
public static function getStorage(): ?Redis;

/**
* 服务启动
Expand Down

0 comments on commit 5369883

Please sign in to comment.