From 14d5f22d9bf96dd9142916471ebf70ab69e51640 Mon Sep 17 00:00:00 2001 From: "Jonathan H. Wage" Date: Tue, 5 Mar 2024 14:09:36 -0600 Subject: [PATCH 1/2] [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message. --- Tests/Transport/ConnectionTest.php | 67 ++++++++++++++++++++++++++++++ Transport/Connection.php | 56 ++++++++++++++++++------- 2 files changed, 108 insertions(+), 15 deletions(-) diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 9de6fa8..322bf6f 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -813,6 +813,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() ); } + public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->exactly(2)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \AMQPConnectionException('a socket error occurred')), + null + ); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + + public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->exactly(2)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \AMQPConnectionException('a socket error occurred')), + null + ); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body', [], 5000); + } + + public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $exception = new \AMQPConnectionException('a socket error occurred'); + + $amqpExchange->expects($this->exactly(4)) + ->method('publish') + ->willReturnOnConsecutiveCalls( + $this->throwException($exception), + $this->throwException($exception), + $this->throwException($exception), + $this->throwException($exception) + ); + + self::expectException(get_class($exception)); + self::expectExceptionMessage($exception->getMessage()); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection { $amqpConnection = $this->createMock(\AMQPConnection::class); diff --git a/Transport/Connection.php b/Transport/Connection.php index 3ea7784..8689b8e 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -306,19 +306,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ? $this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it } - if (0 !== $delayInMs) { - $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); + $this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) { + if (0 !== $delayInMs) { + $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); - return; - } + return; + } - $this->publishOnExchange( - $this->exchange(), - $body, - $this->getRoutingKeyForMessage($amqpStamp), - $headers, - $amqpStamp - ); + $this->publishOnExchange( + $this->exchange(), + $body, + $this->getRoutingKeyForMessage($amqpStamp), + $headers, + $amqpStamp + ); + }); } /** @@ -570,13 +572,18 @@ public function exchange(): \AMQPExchange private function clearWhenDisconnected(): void { if (!$this->channel()->isConnected()) { - $this->amqpChannel = null; - $this->amqpQueues = []; - $this->amqpExchange = null; - $this->amqpDelayExchange = null; + $this->clear(); } } + private function clear(): void + { + $this->amqpChannel = null; + $this->amqpQueues = []; + $this->amqpExchange = null; + $this->amqpDelayExchange = null; + } + private function getDefaultPublishRoutingKey(): ?string { return $this->exchangeOptions['default_publish_routing_key'] ?? null; @@ -593,6 +600,25 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string { return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(); } + + private function withConnectionExceptionRetry(callable $callable): void + { + $maxRetries = 3; + $retries = 0; + + retry: + try { + $callable(); + } catch (\AMQPConnectionException $e) { + if (++$retries <= $maxRetries) { + $this->clear(); + + goto retry; + } + + throw $e; + } + } } if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\Connection::class, false)) { From 822ad5f425ef362580273a175c45aa765220fe73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 23 Sep 2024 11:24:18 +0200 Subject: [PATCH 2/2] Add PR template and auto-close PR on subtree split repositories --- .gitattributes | 3 +-- .github/PULL_REQUEST_TEMPLATE.md | 8 ++++++++ .github/workflows/close-pull-request.yml | 20 ++++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 .github/PULL_REQUEST_TEMPLATE.md create mode 100644 .github/workflows/close-pull-request.yml diff --git a/.gitattributes b/.gitattributes index 84c7add..14c3c35 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,4 +1,3 @@ /Tests export-ignore /phpunit.xml.dist export-ignore -/.gitattributes export-ignore -/.gitignore export-ignore +/.git* export-ignore diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..4689c4d --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,8 @@ +Please do not submit any Pull Requests here. They will be closed. +--- + +Please submit your PR here instead: +https://github.com/symfony/symfony + +This repository is what we call a "subtree split": a read-only subset of that main repository. +We're looking forward to your PR there! diff --git a/.github/workflows/close-pull-request.yml b/.github/workflows/close-pull-request.yml new file mode 100644 index 0000000..e55b478 --- /dev/null +++ b/.github/workflows/close-pull-request.yml @@ -0,0 +1,20 @@ +name: Close Pull Request + +on: + pull_request_target: + types: [opened] + +jobs: + run: + runs-on: ubuntu-latest + steps: + - uses: superbrothers/close-pull-request@v3 + with: + comment: | + Thanks for your Pull Request! We love contributions. + + However, you should instead open your PR on the main repository: + https://github.com/symfony/symfony + + This repository is what we call a "subtree split": a read-only subset of that main repository. + We're looking forward to your PR there!