From 6ba7a032d6074b4806c9d036c6d21cc092c7d18d Mon Sep 17 00:00:00 2001 From: "Jonathan H. Wage" Date: Tue, 5 Mar 2024 14:09:36 -0600 Subject: [PATCH] [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message. --- Tests/Transport/ConnectionTest.php | 67 ++++++++++++++++++++++++++++++ Transport/Connection.php | 52 +++++++++++++++++------ 2 files changed, 106 insertions(+), 13 deletions(-) diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index a08c102..e57dad6 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() ); } + public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->exactly(2)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \AMQPConnectionException('a socket error occurred')), + null + ); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + + public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->exactly(2)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \AMQPConnectionException('a socket error occurred')), + null + ); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body', [], 5000); + } + + public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $exception = new \AMQPConnectionException('a socket error occurred'); + + $amqpExchange->expects($this->exactly(4)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException($exception), + $this->throwException($exception), + $this->throwException($exception), + $this->throwException($exception), + ); + + self::expectException($exception::class); + self::expectExceptionMessage($exception->getMessage()); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection { $amqpConnection = $this->createMock(\AMQPConnection::class); diff --git a/Transport/Connection.php b/Transport/Connection.php index b399026..0ae1bff 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -287,19 +287,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ? $this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it } - if (0 !== $delayInMs) { - $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); + $this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) { + if (0 !== $delayInMs) { + $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); - return; - } + return; + } - $this->publishOnExchange( - $this->exchange(), - $body, - $this->getRoutingKeyForMessage($amqpStamp), - $headers, - $amqpStamp - ); + $this->publishOnExchange( + $this->exchange(), + $body, + $this->getRoutingKeyForMessage($amqpStamp), + $headers, + $amqpStamp + ); + }); } /** @@ -545,11 +547,16 @@ public function exchange(): \AMQPExchange private function clearWhenDisconnected(): void { if (!$this->channel()->isConnected()) { - unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange); - $this->amqpQueues = []; + $this->clear(); } } + private function clear(): void + { + unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange); + $this->amqpQueues = []; + } + private function getDefaultPublishRoutingKey(): ?string { return $this->exchangeOptions['default_publish_routing_key'] ?? null; @@ -566,4 +573,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string { return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey(); } + + private function withConnectionExceptionRetry(callable $callable): void + { + $maxRetries = 3; + $retries = 0; + + retry: + try { + $callable(); + } catch (\AMQPConnectionException $e) { + if (++$retries <= $maxRetries) { + $this->clear(); + + goto retry; + } + + throw $e; + } + } }