Skip to content

Commit

Permalink
@matusdrobuliak66 review: added option to define the queue name
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 26, 2024
1 parent d243c9e commit ba34483
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 17 deletions.
12 changes: 6 additions & 6 deletions packages/service-library/src/servicelib/rabbitmq/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15

_DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}")
_DELAYED_QUEUE_NAME: Final[ExchangeName] = ExchangeName("delayed_{queue_name}")


def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int:
Expand Down Expand Up @@ -225,9 +226,8 @@ async def subscribe(
queue = await declare_queue(
channel,
self.client_name,
exchange_name,
non_exclusive_queue_name or exchange_name,
exclusive_queue=exclusive_queue,
non_exclusive_queue_name=non_exclusive_queue_name,
message_ttl=message_ttl,
arguments={"x-dead-letter-exchange": delayed_exchange_name},
)
Expand All @@ -241,13 +241,15 @@ async def subscribe(
delayed_exchange = await channel.declare_exchange(
delayed_exchange_name, aio_pika.ExchangeType.FANOUT, durable=True
)
delayed_queue_name = _DELAYED_QUEUE_NAME.format(
queue_name=non_exclusive_queue_name or exchange_name
)

delayed_queue = await declare_queue(
channel,
self.client_name,
delayed_exchange_name,
delayed_queue_name,
exclusive_queue=exclusive_queue,
non_exclusive_queue_name=non_exclusive_queue_name,
message_ttl=int(unexpected_error_retry_delay_s * 1000),
arguments={"x-dead-letter-exchange": exchange.name},
)
Expand Down Expand Up @@ -276,7 +278,6 @@ async def add_topics(
self.client_name,
exchange_name,
exclusive_queue=True,
non_exclusive_queue_name=None,
arguments={
"x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format(
exchange_name=exchange_name
Expand All @@ -302,7 +303,6 @@ async def remove_topics(
self.client_name,
exchange_name,
exclusive_queue=True,
non_exclusive_queue_name=None,
arguments={
"x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format(
exchange_name=exchange_name
Expand Down
8 changes: 4 additions & 4 deletions packages/service-library/src/servicelib/rabbitmq/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tenacity.wait import wait_fixed

from ..logging_utils import log_context
from ._models import QueueName

_logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -65,10 +66,9 @@ def get_rabbitmq_client_unique_name(base_name: str) -> str:
async def declare_queue(
channel: aio_pika.RobustChannel,
client_name: str,
exchange_name: str,
queue_name: QueueName,
*,
exclusive_queue: bool,
non_exclusive_queue_name: str | None,
arguments: dict[str, Any] | None = None,
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
) -> aio_pika.abc.AbstractRobustQueue:
Expand All @@ -79,11 +79,11 @@ async def declare_queue(
"durable": True,
"exclusive": exclusive_queue,
"arguments": default_arguments,
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{non_exclusive_queue_name or exchange_name}_exclusive",
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{queue_name}_exclusive",
}
if not exclusive_queue:
# NOTE: setting a name will ensure multiple instance will take their data here
queue_parameters |= {"name": non_exclusive_queue_name or exchange_name}
queue_parameters |= {"name": queue_name}

# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
# most likely someone changed the signature of the queues (parameters etc...)
Expand Down
42 changes: 35 additions & 7 deletions packages/service-library/tests/rabbitmq/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


import asyncio
from collections.abc import Awaitable, Callable
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass
from typing import Any, Final
from unittest import mock
Expand Down Expand Up @@ -464,9 +464,7 @@ def _raise_once_then_true(*args, **kwargs):
if _raise_once_then_true.calls == 1:
msg = "this is a test!"
raise KeyError(msg)
if _raise_once_then_true.calls == 2:
return False
return True
return _raise_once_then_true.calls != 2

exchange_name = random_exchange_name()
_raise_once_then_true.calls = 0
Expand All @@ -476,13 +474,31 @@ def _raise_once_then_true(*args, **kwargs):
await _assert_message_received(mocked_message_parser, 3, message)


@pytest.fixture
async def ensure_queue_deletion(
create_rabbitmq_client: Callable[[str], RabbitMQClient]
) -> AsyncIterator[Callable[[QueueName], None]]:
created_queues = set()

def _(queue_name: QueueName) -> None:
created_queues.add(queue_name)

yield _

client = create_rabbitmq_client("ensure_queue_deletion")
await asyncio.gather(*(client.unsubscribe(q) for q in created_queues))


@pytest.mark.parametrize("defined_queue_name", [None, "pytest-queue"])
@pytest.mark.parametrize("num_subs", [10])
async def test_pub_sub_with_non_exclusive_queue(
create_rabbitmq_client: Callable[[str], RabbitMQClient],
random_exchange_name: Callable[[], str],
mocker: MockerFixture,
random_rabbit_message: Callable[..., PytestRabbitMessage],
num_subs: int,
defined_queue_name: QueueName | None,
ensure_queue_deletion: Callable[[QueueName], None],
):
consumers = (create_rabbitmq_client(f"consumer_{n}") for n in range(num_subs))
mocked_message_parsers = [
Expand All @@ -492,13 +508,25 @@ async def test_pub_sub_with_non_exclusive_queue(
publisher = create_rabbitmq_client("publisher")
message = random_rabbit_message()
exchange_name = random_exchange_name()
await asyncio.gather(
list_queue_name_consumer_mappings = await asyncio.gather(
*(
consumer.subscribe(exchange_name, parser, exclusive_queue=False)
consumer.subscribe(
exchange_name,
parser,
exclusive_queue=False,
non_exclusive_queue_name=defined_queue_name,
)
for consumer, parser in zip(consumers, mocked_message_parsers, strict=True)
)
)

for queue_name, _ in list_queue_name_consumer_mappings:
assert (
queue_name == exchange_name
if defined_queue_name is None
else defined_queue_name
)
ensure_queue_deletion(queue_name)
ensure_queue_deletion(f"delayed_{queue_name}")
await publisher.publish(exchange_name, message)
# only one consumer should have gotten the message here and the others not
async for attempt in AsyncRetrying(
Expand Down

0 comments on commit ba34483

Please sign in to comment.