From ba344833f55150246fae7bc1cd44b6930662ba37 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:48:57 +0100 Subject: [PATCH] @matusdrobuliak66 review: added option to define the queue name --- .../src/servicelib/rabbitmq/_client.py | 12 +++--- .../src/servicelib/rabbitmq/_utils.py | 8 ++-- .../tests/rabbitmq/test_rabbitmq.py | 42 +++++++++++++++---- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index 5d0ed66a1bcd..ccf1445c231d 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -35,6 +35,7 @@ _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15 _DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}") +_DELAYED_QUEUE_NAME: Final[ExchangeName] = ExchangeName("delayed_{queue_name}") def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int: @@ -225,9 +226,8 @@ async def subscribe( queue = await declare_queue( channel, self.client_name, - exchange_name, + non_exclusive_queue_name or exchange_name, exclusive_queue=exclusive_queue, - non_exclusive_queue_name=non_exclusive_queue_name, message_ttl=message_ttl, arguments={"x-dead-letter-exchange": delayed_exchange_name}, ) @@ -241,13 +241,15 @@ async def subscribe( delayed_exchange = await channel.declare_exchange( delayed_exchange_name, aio_pika.ExchangeType.FANOUT, durable=True ) + delayed_queue_name = _DELAYED_QUEUE_NAME.format( + queue_name=non_exclusive_queue_name or exchange_name + ) delayed_queue = await declare_queue( channel, self.client_name, - delayed_exchange_name, + delayed_queue_name, exclusive_queue=exclusive_queue, - non_exclusive_queue_name=non_exclusive_queue_name, message_ttl=int(unexpected_error_retry_delay_s * 1000), arguments={"x-dead-letter-exchange": exchange.name}, ) @@ -276,7 +278,6 @@ async def add_topics( self.client_name, exchange_name, exclusive_queue=True, - non_exclusive_queue_name=None, arguments={ "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( exchange_name=exchange_name @@ -302,7 +303,6 @@ async def remove_topics( self.client_name, exchange_name, exclusive_queue=True, - non_exclusive_queue_name=None, arguments={ "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( exchange_name=exchange_name diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 5e57645b36a7..404adb1b6525 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -13,6 +13,7 @@ from tenacity.wait import wait_fixed from ..logging_utils import log_context +from ._models import QueueName _logger = logging.getLogger(__file__) @@ -65,10 +66,9 @@ def get_rabbitmq_client_unique_name(base_name: str) -> str: async def declare_queue( channel: aio_pika.RobustChannel, client_name: str, - exchange_name: str, + queue_name: QueueName, *, exclusive_queue: bool, - non_exclusive_queue_name: str | None, arguments: dict[str, Any] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, ) -> aio_pika.abc.AbstractRobustQueue: @@ -79,11 +79,11 @@ async def declare_queue( "durable": True, "exclusive": exclusive_queue, "arguments": default_arguments, - "name": f"{get_rabbitmq_client_unique_name(client_name)}_{non_exclusive_queue_name or exchange_name}_exclusive", + "name": f"{get_rabbitmq_client_unique_name(client_name)}_{queue_name}_exclusive", } if not exclusive_queue: # NOTE: setting a name will ensure multiple instance will take their data here - queue_parameters |= {"name": non_exclusive_queue_name or exchange_name} + queue_parameters |= {"name": queue_name} # NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED`` # most likely someone changed the signature of the queues (parameters etc...) diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq.py b/packages/service-library/tests/rabbitmq/test_rabbitmq.py index c9fa7edf8445..d4c6c4b8ebb1 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq.py @@ -6,7 +6,7 @@ import asyncio -from collections.abc import Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass from typing import Any, Final from unittest import mock @@ -464,9 +464,7 @@ def _raise_once_then_true(*args, **kwargs): if _raise_once_then_true.calls == 1: msg = "this is a test!" raise KeyError(msg) - if _raise_once_then_true.calls == 2: - return False - return True + return _raise_once_then_true.calls != 2 exchange_name = random_exchange_name() _raise_once_then_true.calls = 0 @@ -476,6 +474,22 @@ def _raise_once_then_true(*args, **kwargs): await _assert_message_received(mocked_message_parser, 3, message) +@pytest.fixture +async def ensure_queue_deletion( + create_rabbitmq_client: Callable[[str], RabbitMQClient] +) -> AsyncIterator[Callable[[QueueName], None]]: + created_queues = set() + + def _(queue_name: QueueName) -> None: + created_queues.add(queue_name) + + yield _ + + client = create_rabbitmq_client("ensure_queue_deletion") + await asyncio.gather(*(client.unsubscribe(q) for q in created_queues)) + + +@pytest.mark.parametrize("defined_queue_name", [None, "pytest-queue"]) @pytest.mark.parametrize("num_subs", [10]) async def test_pub_sub_with_non_exclusive_queue( create_rabbitmq_client: Callable[[str], RabbitMQClient], @@ -483,6 +497,8 @@ async def test_pub_sub_with_non_exclusive_queue( mocker: MockerFixture, random_rabbit_message: Callable[..., PytestRabbitMessage], num_subs: int, + defined_queue_name: QueueName | None, + ensure_queue_deletion: Callable[[QueueName], None], ): consumers = (create_rabbitmq_client(f"consumer_{n}") for n in range(num_subs)) mocked_message_parsers = [ @@ -492,13 +508,25 @@ async def test_pub_sub_with_non_exclusive_queue( publisher = create_rabbitmq_client("publisher") message = random_rabbit_message() exchange_name = random_exchange_name() - await asyncio.gather( + list_queue_name_consumer_mappings = await asyncio.gather( *( - consumer.subscribe(exchange_name, parser, exclusive_queue=False) + consumer.subscribe( + exchange_name, + parser, + exclusive_queue=False, + non_exclusive_queue_name=defined_queue_name, + ) for consumer, parser in zip(consumers, mocked_message_parsers, strict=True) ) ) - + for queue_name, _ in list_queue_name_consumer_mappings: + assert ( + queue_name == exchange_name + if defined_queue_name is None + else defined_queue_name + ) + ensure_queue_deletion(queue_name) + ensure_queue_deletion(f"delayed_{queue_name}") await publisher.publish(exchange_name, message) # only one consumer should have gotten the message here and the others not async for attempt in AsyncRetrying(