diff --git a/src/Builders/Traits/MessageQueueMethod.php b/src/Builders/Traits/MessageQueueMethod.php index 415c716..69af4cb 100644 --- a/src/Builders/Traits/MessageQueueMethod.php +++ b/src/Builders/Traits/MessageQueueMethod.php @@ -211,9 +211,10 @@ public function ack(string $queueName, string $groupName, array $id, bool $retry * @see Headers * ] * @param string|null $queueName + * @param bool $temp * @return array 返回成功的消息ID组 */ - public function publishGetIds(string $body, array $headers = [], null|string $queueName = null): array + public function publishGetIds(string $body, array $headers = [], null|string $queueName = null, bool $temp = false): array { $client = $this->getConnection()->client(); $header = new Headers($headers); @@ -239,11 +240,16 @@ public function publishGetIds(string $body, array $headers = [], null|string $qu throw new WebmanRqueueException('Queue size exceeded.'); } } - if ($id = $client->xAdd($queue, (string)$header->_id, [ + if ($id = $client->xAdd($queue, (string)$header->_id, $data = [ '_header' => $header->toString(), '_body' => $body, ])) { $ids[$queue] = $id; + } else { + if ($temp) { + $this->tempInsert('requeue', $queue, $data); + $ids["$queue"] = $id; + } } } return $ids; @@ -260,14 +266,15 @@ public function publishGetIds(string $body, array $headers = [], null|string $qu * * @param string $body * @param array $headers = [ - * @param string|null $queueName - * @return int|false 0/false 代表全部失败 - *@see Headers + * @see Headers * ] + * @param string|null $queueName + * @param bool $temp + * @return int|false 0/false 代表全部失败 */ - public function publish(string $body, array $headers = [], null|string $queueName = null): int|false + public function publish(string $body, array $headers = [], null|string $queueName = null , bool $temp = false): int|false { - return count($this->publishGetIds($body, $headers, $queueName)); + return count($this->publishGetIds($body, $headers, $queueName, $temp)); } /** diff --git a/src/helpers.php b/src/helpers.php index 4890c6d..0327682 100644 --- a/src/helpers.php +++ b/src/helpers.php @@ -16,9 +16,9 @@ * @return int|false * @throws WebmanRqueueException */ -function sync_publish(AbstractBuilder $builder, string $body, array $headers = []) : int|false +function sync_publish(AbstractBuilder $builder, string $body, array $headers = [], bool $temp = false) : int|false { - return $builder->publish($body, $headers); + return $builder->publish($body, $headers, temp: $temp); } /** @@ -26,11 +26,13 @@ function sync_publish(AbstractBuilder $builder, string $body, array $headers = [ * @param QueueBuilder $builder * @param string $body * @param array $headers + * @param bool $temp * @return array + * @throws WebmanRqueueException */ -function sync_publish_get_ids(AbstractBuilder $builder, string $body, array $headers = []) : array +function sync_publish_get_ids(AbstractBuilder $builder, string $body, array $headers = [], bool $temp = false) : array { - return $builder->publishGetIds($body, $headers); + return $builder->publishGetIds($body, $headers, temp: $temp); } /**