Skip to content

Commit

Permalink
add publish use temp
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Apr 7, 2024
1 parent 57a3180 commit 313a464
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
21 changes: 14 additions & 7 deletions src/Builders/Traits/MessageQueueMethod.php
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,10 @@ public function ack(string $queueName, string $groupName, array $id, bool $retry
* @see Headers
* ]
* @param string|null $queueName
* @param bool $temp
* @return array 返回成功的消息ID组
*/
public function publishGetIds(string $body, array $headers = [], null|string $queueName = null): array
public function publishGetIds(string $body, array $headers = [], null|string $queueName = null, bool $temp = false): array
{
$client = $this->getConnection()->client();
$header = new Headers($headers);
Expand All @@ -239,11 +240,16 @@ public function publishGetIds(string $body, array $headers = [], null|string $qu
throw new WebmanRqueueException('Queue size exceeded.');
}
}
if ($id = $client->xAdd($queue, (string)$header->_id, [
if ($id = $client->xAdd($queue, (string)$header->_id, $data = [
'_header' => $header->toString(),
'_body' => $body,
])) {
$ids[$queue] = $id;
} else {
if ($temp) {
$this->tempInsert('requeue', $queue, $data);
$ids["<temp>$queue</temp>"] = $id;
}
}
}
return $ids;
Expand All @@ -260,14 +266,15 @@ public function publishGetIds(string $body, array $headers = [], null|string $qu
*
* @param string $body
* @param array $headers = [
* @param string|null $queueName
* @return int|false 0/false 代表全部失败
*@see Headers
* @see Headers
* ]
* @param string|null $queueName
* @param bool $temp
* @return int|false 0/false 代表全部失败
*/
public function publish(string $body, array $headers = [], null|string $queueName = null): int|false
public function publish(string $body, array $headers = [], null|string $queueName = null , bool $temp = false): int|false
{
return count($this->publishGetIds($body, $headers, $queueName));
return count($this->publishGetIds($body, $headers, $queueName, $temp));
}

/**
Expand Down
10 changes: 6 additions & 4 deletions src/helpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@
* @return int|false
* @throws WebmanRqueueException
*/
function sync_publish(AbstractBuilder $builder, string $body, array $headers = []) : int|false
function sync_publish(AbstractBuilder $builder, string $body, array $headers = [], bool $temp = false) : int|false
{
return $builder->publish($body, $headers);
return $builder->publish($body, $headers, temp: $temp);
}

/**
* 同步生产
* @param QueueBuilder $builder
* @param string $body
* @param array $headers
* @param bool $temp
* @return array
* @throws WebmanRqueueException
*/
function sync_publish_get_ids(AbstractBuilder $builder, string $body, array $headers = []) : array
function sync_publish_get_ids(AbstractBuilder $builder, string $body, array $headers = [], bool $temp = false) : array
{
return $builder->publishGetIds($body, $headers);
return $builder->publishGetIds($body, $headers, temp: $temp);
}

/**
Expand Down

0 comments on commit 313a464

Please sign in to comment.