diff --git a/src/Builders/Traits/AdaptiveTimerMethod.php b/src/Builders/Traits/AdaptiveTimerMethod.php index 7a23c61..a20d9b7 100644 --- a/src/Builders/Traits/AdaptiveTimerMethod.php +++ b/src/Builders/Traits/AdaptiveTimerMethod.php @@ -6,6 +6,7 @@ use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException; use Workerman\Events\EventInterface; use Workerman\Worker; +use function Workbunny\WebmanRqueue\is_worker_version_5; trait AdaptiveTimerMethod { @@ -184,67 +185,69 @@ public function adaptiveTimerCreate(Closure $func, mixed ...$args): string $this->setTimerInitialInterval($this->getTimerInterval()); // 初始化时间 self::setLastMessageMilliTimestamp(self::getMilliTime()); - // 创建定时器 - self::$timerIdMap[$id] = Worker::$globalEvent->add($this->getTimerInitialInterval() / 1000, EventInterface::EV_TIMER, - $callback = function (...$args) use ($func, $id, &$callback) - { - // 获取毫秒时间戳 - $nowMilliTimestamp = self::getMilliTime(); - // 是否开启自适应 - $enable = ( - // 设置了退避指数 - $this->getAvoidIndex() > 0 and - // 定时器间隔小于最大间隔 - $this->getMaxTimerInterval() > $this->getTimerInitialInterval() - ); - // 执行回调 - try { - if ($result = \call_user_func($func, ...$args)) { - // 设置执行时间 - self::setLastMessageMilliTimestamp($nowMilliTimestamp); - } - } catch (\Throwable){ - // 异常为负反馈 - $result = false; + $callback = function (...$args) use ($func, $id, &$callback) + { + // 获取毫秒时间戳 + $nowMilliTimestamp = self::getMilliTime(); + // 是否开启自适应 + $enable = ( + // 设置了退避指数 + $this->getAvoidIndex() > 0 and + // 定时器间隔小于最大间隔 + $this->getMaxTimerInterval() > $this->getTimerInitialInterval() + ); + // 执行回调 + try { + if ($result = \call_user_func($func, ...$args)) { + // 设置执行时间 + self::setLastMessageMilliTimestamp($nowMilliTimestamp); } - // 如果自适应开启 - if ($enable) { - // 正反馈 - if ($result) { - // 归零 - self::$isMaxTimerInterval = false; - // 重新设置定时器 - $setTimer = true; - // 定时器初始化 - $this->setTimerInterval($this->getTimerInitialInterval()); - } - // 负反馈 - else { - $setTimer = false; - if ( - $nowMilliTimestamp - self::getLastMessageMilliTimestamp() > $this->getIdleThreshold() and // 闲置超过闲置阈值 - !self::isMaxTimerInterval() // 非最大间隔 - ) { - $interval = min($this->getAvoidIndex() * $this->getTimerInterval(), $this->getMaxTimerInterval()); - // 如果到达最大值 - if ($interval >= $this->getMaxTimerInterval()) { - self::$isMaxTimerInterval = true; - } - $setTimer = true; - $this->setTimerInterval($interval); + } catch (\Throwable){ + // 异常为负反馈 + $result = false; + } + // 如果自适应开启 + if ($enable) { + // 正反馈 + if ($result) { + // 归零 + self::$isMaxTimerInterval = false; + // 重新设置定时器 + $setTimer = true; + // 定时器初始化 + $this->setTimerInterval($this->getTimerInitialInterval()); + } + // 负反馈 + else { + $setTimer = false; + if ( + $nowMilliTimestamp - self::getLastMessageMilliTimestamp() > $this->getIdleThreshold() and // 闲置超过闲置阈值 + !self::isMaxTimerInterval() // 非最大间隔 + ) { + $interval = min($this->getAvoidIndex() * $this->getTimerInterval(), $this->getMaxTimerInterval()); + // 如果到达最大值 + if ($interval >= $this->getMaxTimerInterval()) { + self::$isMaxTimerInterval = true; } + $setTimer = true; + $this->setTimerInterval($interval); } - // 重置定时器 - if ($setTimer) { - // 移除旧定时器 - self::adaptiveTimerDelete($id); - // 创建新定时器 - self::$timerIdMap[$id] = Worker::$globalEvent->add($this->getTimerInterval() / 1000, EventInterface::EV_TIMER, $callback); - } } - }, - $args - ); + // 重置定时器 + if ($setTimer) { + // 移除旧定时器 + self::adaptiveTimerDelete($id); + // 创建新定时器 + self::$timerIdMap[$id] = is_worker_version_5() and method_exists(Worker::$globalEvent, 'delay') + ? Worker::$globalEvent->delay(floatval($this->getTimerInitialInterval() / 1000), $callback, $args) + : Worker::$globalEvent->add(floatval($this->getTimerInterval() / 1000), EventInterface::EV_TIMER, $callback); + } + } + }; + // 创建定时器 + self::$timerIdMap[$id] = is_worker_version_5() and method_exists(Worker::$globalEvent, 'delay') + ? Worker::$globalEvent->delay(floatval($this->getTimerInitialInterval() / 1000), $callback, $args) + : Worker::$globalEvent->add(floatval($this->getTimerInitialInterval() / 1000), EventInterface::EV_TIMER, $callback, $args); return $id; } @@ -261,14 +264,20 @@ public function adaptiveTimerDelete(?string $id = null): void } if ($id === null) { foreach(self::$timerIdMap as $id) { - Worker::$globalEvent->del( - $id, EventInterface::EV_TIMER); + if (is_worker_version_5() and method_exists(Worker::$globalEvent, 'offDelay')) { + Worker::$globalEvent->offDelay($id); + } else { + Worker::$globalEvent->del($id, EventInterface::EV_TIMER); + } } self::$timerIdMap = []; } else { if ($id = self::$timerIdMap[$id] ?? null) { - Worker::$globalEvent->del( - $id, EventInterface::EV_TIMER); + if (is_worker_version_5() and method_exists(Worker::$globalEvent, 'offDelay')) { + Worker::$globalEvent->offDelay($id); + } else { + Worker::$globalEvent->del($id, EventInterface::EV_TIMER); + } unset(self::$timerIdMap[$id]); } } diff --git a/src/helpers.php b/src/helpers.php index 0327682..ba23512 100644 --- a/src/helpers.php +++ b/src/helpers.php @@ -7,12 +7,14 @@ use Workbunny\WebmanRqueue\Builders\AbstractBuilder; use Workbunny\WebmanRqueue\Builders\QueueBuilder; use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException; +use Workerman\Worker; /** * 同步生产 * @param QueueBuilder $builder * @param string $body * @param array $headers + * @param bool $temp * @return int|false * @throws WebmanRqueueException */ @@ -35,6 +37,15 @@ function sync_publish_get_ids(AbstractBuilder $builder, string $body, array $hea return $builder->publishGetIds($body, $headers, temp: $temp); } +/** + * Version 5.x uses a new event interface + * @return bool + */ +function is_worker_version_5(): bool +{ + return version_compare(Worker::VERSION, '5.0.0', '>='); +} + /** * @param string|null $key * @param mixed|null $default