diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 9de6fa8..322bf6f 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -813,6 +813,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() ); } + public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->exactly(2)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \AMQPConnectionException('a socket error occurred')), + null + ); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + + public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->exactly(2)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \AMQPConnectionException('a socket error occurred')), + null + ); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body', [], 5000); + } + + public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $exception = new \AMQPConnectionException('a socket error occurred'); + + $amqpExchange->expects($this->exactly(4)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException($exception), + $this->throwException($exception), + $this->throwException($exception), + $this->throwException($exception) + ); + + self::expectException(get_class($exception)); + self::expectExceptionMessage($exception->getMessage()); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection { $amqpConnection = $this->createMock(\AMQPConnection::class); diff --git a/Transport/Connection.php b/Transport/Connection.php index 3ea7784..8689b8e 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -306,19 +306,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ? $this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it } - if (0 !== $delayInMs) { - $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); + $this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) { + if (0 !== $delayInMs) { + $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); - return; - } + return; + } - $this->publishOnExchange( - $this->exchange(), - $body, - $this->getRoutingKeyForMessage($amqpStamp), - $headers, - $amqpStamp - ); + $this->publishOnExchange( + $this->exchange(), + $body, + $this->getRoutingKeyForMessage($amqpStamp), + $headers, + $amqpStamp + ); + }); } /** @@ -570,13 +572,18 @@ public function exchange(): \AMQPExchange private function clearWhenDisconnected(): void { if (!$this->channel()->isConnected()) { - $this->amqpChannel = null; - $this->amqpQueues = []; - $this->amqpExchange = null; - $this->amqpDelayExchange = null; + $this->clear(); } } + private function clear(): void + { + $this->amqpChannel = null; + $this->amqpQueues = []; + $this->amqpExchange = null; + $this->amqpDelayExchange = null; + } + private function getDefaultPublishRoutingKey(): ?string { return $this->exchangeOptions['default_publish_routing_key'] ?? null; @@ -593,6 +600,25 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string { return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(); } + + private function withConnectionExceptionRetry(callable $callable): void + { + $maxRetries = 3; + $retries = 0; + + retry: + try { + $callable(); + } catch (\AMQPConnectionException $e) { + if (++$retries <= $maxRetries) { + $this->clear(); + + goto retry; + } + + throw $e; + } + } } if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\Connection::class, false)) {