diff --git a/src/Builders/AbstractBuilder.php b/src/Builders/AbstractBuilder.php index a99bf9e..481b272 100644 --- a/src/Builders/AbstractBuilder.php +++ b/src/Builders/AbstractBuilder.php @@ -6,6 +6,7 @@ use Psr\Log\LoggerInterface; use support\Redis; use Workbunny\WebmanRqueue\BuilderConfig; +use Workbunny\WebmanRqueue\Builders\Traits\AdaptiveTimerMethod; use Workbunny\WebmanRqueue\Builders\Traits\MessageQueueMethod; use Workbunny\WebmanRqueue\Builders\Traits\MessageTempMethod; use Workbunny\WebmanRqueue\Commands\AbstractCommand; diff --git a/src/Builders/AdaptiveBuilder.php b/src/Builders/AdaptiveBuilder.php new file mode 100644 index 0000000..9882956 --- /dev/null +++ b/src/Builders/AdaptiveBuilder.php @@ -0,0 +1,179 @@ + ['example'], + * 'group' => 'example', + * 'delayed' => false, + * 'prefetch_count' => 1, + * 'queue_size' => 0, + * 'pending_timeout' => 0 + * ] + */ + protected array $configs = []; + + public function __construct(?LoggerInterface $logger = null) + { + parent::__construct($logger); + $name = self::getName(); + $this->getBuilderConfig()->setGroup($this->configs['group'] ?? $name); + $this->getBuilderConfig()->setQueues($this->configs['queues'] ?? [$name]); + $this->getBuilderConfig()->setQueueSize($this->configs['queue_size'] ?? 0); + $this->getBuilderConfig()->setPrefetchCount($this->configs['prefetch_count'] ?? 0); + $this->getBuilderConfig()->setDelayed($this->configs['delayed'] ?? false); + $this->getBuilderConfig()->setCallback([$this, 'handler']); + } + + /** @inheritDoc */ + public function onWorkerStart(Worker $worker): void + { + // 初始化temp库 + $this->tempInit(); + if($this->getConnection()){ + // requeue timer + $this->tempRequeueInit(); + // check pending + if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) { + $this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) { + // 自动ack + $this->claim($worker, $pendingTimeout); + })); + } + // main timer + $this->adaptiveTimerCreate($this->timerInterval / 1000, function () use($worker) { + try { + // consume + $this->consume($worker); + } catch (WebmanRqueueException $exception) { + Log::channel('plugin.workbunny.webman-rqueue.warning')?->warning('Consume exception. ', [ + 'message' => $exception->getMessage(), 'code' => $exception->getCode(), + 'file' => $exception->getFile() . ':' . $exception->getLine(), + 'trace' => $exception->getTrace() + ]); + // 兼容旧版 + $this->getLogger()?->warning('Consume exception. ', [ + 'message' => $exception->getMessage(), 'code' => $exception->getCode() + ]); + } + }); + } + } + + /** @inheritDoc */ + public function onWorkerStop(Worker $worker): void + { + if($this->getConnection()) { + try { + $this->getConnection()->client()->close(); + }catch (RedisException $e) { + echo $e->getMessage() . PHP_EOL; + } + } + // 移除自适应 + $this->adaptiveTimerDelete(); + // + if($this->getPendingTimer()) { + Timer::del($this->getPendingTimer()); + } + } + + /** @inheritDoc */ + public function onWorkerReload(Worker $worker): void + {} + + /** @inheritDoc */ + public static function classContent(string $namespace, string $className, bool $isDelay): string + { + $isDelay = $isDelay ? 'true' : 'false'; + $name = self::getName("$namespace\\$className"); + return << [ + '$name' + ], + // 默认由类名自动生成 + 'group' => '$name', + // 是否延迟 + 'delayed' => $isDelay, + // QOS + 'prefetch_count' => 0, + // Queue size + 'queue_size' => 0, + // 消息pending超时,毫秒 + 'pending_timeout' => 0 + ]; + + /** @var float|null 消费间隔 1ms */ + protected ?float \$timerInterval = 1.0; + + /** @var string redis配置 */ + protected string \$connection = 'default'; + + /** @var int 闲置阈值 ms */ + protected int \$idleThreshold = 0; + + /** @var int 退避指数 */ + protected int \$avoidIndex = 0; + + /** @var float 最大定时器间隔 ms */ + protected float \$maxTimerInterval = 0.0; + + /** @inheritDoc */ + public function handler(string \$id, array \$value, Connection \$connection): bool + { + \$header = new Headers(\$value['_header']); + \$body = \$value['_body']; + // TODO 请重写消费逻辑 + echo "请重写 $className::handler\\n"; + return true; + } +} +doc; + } + + /** + * 消费函数 + * @param string $id + * @param array $value = [ + * '_header' => json_string, + * '_body' => string, + * ] + * @param Connection $connection + * @return bool + */ + abstract public function handler(string $id, array $value, Connection $connection): bool; +} \ No newline at end of file diff --git a/src/Builders/GroupBuilder.php b/src/Builders/GroupBuilder.php index d723a66..2c419c8 100644 --- a/src/Builders/GroupBuilder.php +++ b/src/Builders/GroupBuilder.php @@ -14,8 +14,6 @@ abstract class GroupBuilder extends AbstractBuilder { - use MessageQueueMethod; - /** * 配置 * @@ -30,6 +28,7 @@ abstract class GroupBuilder extends AbstractBuilder */ protected array $configs = []; + /** @var int|null 自动移除定时器 */ private static ?int $_delTimer = null; public function __construct(?LoggerInterface $logger = null) @@ -58,8 +57,8 @@ public function onWorkerStart(Worker $worker): void // check pending if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) { $this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) { - // 自动ack - $this->claim($worker, $pendingTimeout, true); + // 超时消息自动ack并requeue,消息自动移除 + $this->claim($worker, $pendingTimeout); })); } try { @@ -92,9 +91,12 @@ public function onWorkerStop(Worker $worker): void echo $e->getMessage() . PHP_EOL; } } - if($this->getMainTimer()) { + if(self::getMainTimer()) { Timer::del(self::getMainTimer()); } + if ($this->getPendingTimer()) { + Timer::del($this->getPendingTimer()); + } if(self::$_delTimer) { Timer::del(self::$_delTimer); } diff --git a/src/Builders/QueueBuilder.php b/src/Builders/QueueBuilder.php index ae7510a..9ead443 100644 --- a/src/Builders/QueueBuilder.php +++ b/src/Builders/QueueBuilder.php @@ -13,7 +13,6 @@ abstract class QueueBuilder extends AbstractBuilder { - use MessageQueueMethod; /** * 配置 @@ -52,15 +51,15 @@ public function onWorkerStart(Worker $worker): void // check pending if (($pendingTimeout = $this->configs['pending_timeout'] ?? 0) > 0) { $this->setPendingTimer(Timer::add($pendingTimeout / 1000, function () use ($worker, $pendingTimeout) { - // 自动ack - $this->claim($worker, $pendingTimeout, false); + // 超时消息自动ack并requeue,消息自动移除 + $this->claim($worker, $pendingTimeout); })); } // main timer $this->setMainTimer(Timer::add($this->timerInterval / 1000, function () use($worker) { try { // consume - $this->consume($worker, true); + $this->consume($worker); } catch (WebmanRqueueException $exception) { Log::channel('plugin.workbunny.webman-rqueue.warning')?->warning('Consume exception. ', [ 'message' => $exception->getMessage(), 'code' => $exception->getCode(), diff --git a/src/Builders/Traits/AdaptiveTimerMethod.php b/src/Builders/Traits/AdaptiveTimerMethod.php new file mode 100644 index 0000000..fa0ddf0 --- /dev/null +++ b/src/Builders/Traits/AdaptiveTimerMethod.php @@ -0,0 +1,196 @@ +idleThreshold; + } + + /** + * 设置闲置阈值 + * + * @param int $idleThreshold + */ + public function setIdleThreshold(int $idleThreshold): void + { + $this->idleThreshold = $idleThreshold; + } + + /** + * 获取退避指数 + * + * @return int + */ + public function getAvoidIndex(): int + { + return $this->avoidIndex; + } + + /** + * 设置退避指数 + * + * @param int $avoidIndex + */ + public function setAvoidIndex(int $avoidIndex): void + { + $this->avoidIndex = $avoidIndex; + } + + /** + * 获取最大间隔 + * + * @return float + */ + public function getMaxTimerInterval(): float + { + return $this->maxTimerInterval; + } + + /** + * 设置最大间隔 + * + * @param float $maxTimerInterval + */ + public function setMaxTimerInterval(float $maxTimerInterval): void + { + $this->maxTimerInterval = $maxTimerInterval; + } + + /** + * 添加自适应退避定时器 + * + * @param int $millisecond + * @param Closure $func + * @param mixed ...$args + * @return string + */ + public function adaptiveTimerCreate(int $millisecond, Closure $func, mixed ...$args): string + { + if (!Worker::$globalEvent) { + throw new WebmanRqueueException("Event driver error. ", -1); + } + // 初始化上一次获取信息的毫秒时间戳 + self::$lastMessageMilliTimestamp = self::$lastMessageMilliTimestamp ?? intval(microtime(true) * 1000); + // 增加定时器 + $id = spl_object_hash($func); + self::$timerIdMap[$id] = Worker::$globalEvent->add( + $millisecond / 1000, + EventInterface::EV_TIMER, + $callback = function (...$args) use ($func, $millisecond, $id, &$callback) + { + $nowMilliTimestamp = intval(microtime(true) * 1000); + if (\call_user_func($func, ...$args)) { + self::$lastMessageMilliTimestamp = $nowMilliTimestamp; + self::$isMaxTimerInterval = false; + } + if ( + // 设置了闲置阈值、退避指数、最大时间间隔大于定时器初始时间间隔 + $this->avoidIndex > 0 and $this->idleThreshold and $this->maxTimerInterval > $millisecond and + // 闲置超过闲置阈值 + $nowMilliTimestamp - self::$lastMessageMilliTimestamp > $this->avoidIndex and + // 非最大间隔 + !self::$isMaxTimerInterval + ) { + $interval = min($this->avoidIndex * $millisecond, $this->maxTimerInterval); + // 如果到达最大值 + if ($interval >= $this->maxTimerInterval) { + self::$isMaxTimerInterval = true; + } + // 移除之前的定时器 + Worker::$globalEvent->del(self::$timerIdMap[$id], EventInterface::EV_TIMER); + // 新建定时器 + self::$timerIdMap[$id] = Worker::$globalEvent->add($interval, EventInterface::EV_TIMER, $callback); + } + }, + $args + ); + return $id; + } + + /** + * 移除自适应定时器 + * + * @param string|null $id + * @return void + */ + public function adaptiveTimerDelete(?string $id = null): void + { + if (!Worker::$globalEvent) { + throw new WebmanRqueueException("Event driver error. ", -1); + } + if (!$id) { + foreach(self::$timerIdMap as $id) { + Worker::$globalEvent->del( + $id, EventInterface::EV_TIMER); + } + self::$timerIdMap = []; + } else { + if ($id = self::$timerIdMap[$id] ?? null) { + Worker::$globalEvent->del( + $id, EventInterface::EV_TIMER); + unset(self::$timerIdMap[$id]); + } + } + } +} \ No newline at end of file diff --git a/src/Builders/Traits/MessageQueueMethod.php b/src/Builders/Traits/MessageQueueMethod.php index caf0dbc..b943f31 100644 --- a/src/Builders/Traits/MessageQueueMethod.php +++ b/src/Builders/Traits/MessageQueueMethod.php @@ -365,10 +365,10 @@ public function claim(Worker $worker, int $pendingTimeout, bool $autoDel = true) /** * @param Worker $worker * @param bool $del - * @return void + * @return bool true:有消费 false:无消费 * @throws WebmanRqueueException */ - public function consume(Worker $worker, bool $del = true): void + public function consume(Worker $worker, bool $del = true): bool { try { $client = $this->getConnection()->client(); @@ -438,7 +438,9 @@ public function consume(Worker $worker, bool $del = true): void // del if($del) { $client->xDel($queueName, $ids); } } + return true; } + return false; } catch (RedisException $exception) { Log::channel('plugin.workbunny.webman-rqueue.debug')?->debug($exception->getMessage(), $exception->getTrace()); $this->getLogger()?->debug($exception->getMessage(), $exception->getTrace()); diff --git a/src/Commands/AbstractCommand.php b/src/Commands/AbstractCommand.php index 19a4800..6f22601 100644 --- a/src/Commands/AbstractCommand.php +++ b/src/Commands/AbstractCommand.php @@ -4,6 +4,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Output\OutputInterface; +use Workbunny\WebmanRqueue\Builders\AdaptiveBuilder; use Workbunny\WebmanRqueue\Builders\GroupBuilder; use Workbunny\WebmanRqueue\Builders\QueueBuilder; use function Workbunny\WebmanRqueue\base_path; @@ -17,8 +18,9 @@ abstract class AbstractCommand extends Command * @var string[] */ protected array $builderList = [ - 'queue' => QueueBuilder::class, - 'group' => GroupBuilder::class + 'queue' => QueueBuilder::class, + 'group' => GroupBuilder::class, + 'adaptive' => AdaptiveBuilder::class ]; /**