Skip to content

Commit

Permalink
🎨 Allow to have multiple consumers for the same queue in 1 applicatio…
Browse files Browse the repository at this point in the history
…n and allow to optionally define queue name (#6838)
  • Loading branch information
sanderegg authored Nov 26, 2024
1 parent 6fc9da1 commit bca766c
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 97 deletions.
8 changes: 1 addition & 7 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=unused-variable
# pylint: disable=protected-access

import asyncio
import logging
Expand Down Expand Up @@ -130,10 +131,3 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien
yield _creator
# cleanup, properly close the clients
await asyncio.gather(*(client.close() for client in created_clients))


async def rabbitmq_client(create_rabbitmq_client):
# NOTE: Legacy fixture
# Use create_rabbitmq_client instead of rabbitmq_client
# SEE docs/coding-conventions.md::CC4
return create_rabbitmq_client
4 changes: 4 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
RPCNotInitializedError,
RPCServerError,
)
from ._models import ConsumerTag, ExchangeName, QueueName
from ._rpc_router import RPCRouter
from ._utils import is_rabbitmq_responsive, wait_till_rabbitmq_responsive

__all__: tuple[str, ...] = (
"BIND_TO_ALL_TOPICS",
"ConsumerTag",
"ExchangeName",
"is_rabbitmq_responsive",
"QueueName",
"RabbitMQClient",
"RabbitMQRPCClient",
"RemoteMethodNotRegisteredError",
Expand Down
79 changes: 56 additions & 23 deletions packages/service-library/src/servicelib/rabbitmq/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
from dataclasses import dataclass, field
from functools import partial
from typing import Final
from uuid import uuid4

import aio_pika
from pydantic import NonNegativeInt

from ..logging_utils import log_catch, log_context
from ._client_base import RabbitMQClientBase
from ._models import MessageHandler, RabbitMessage
from ._models import (
ConsumerTag,
ExchangeName,
MessageHandler,
QueueName,
RabbitMessage,
TopicName,
)
from ._utils import (
RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
declare_queue,
Expand All @@ -26,7 +34,8 @@
_DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S: Final[float] = 1
_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15

_DELAYED_EXCHANGE_NAME: Final[str] = "delayed_{exchange_name}"
_DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}")
_DELAYED_QUEUE_NAME: Final[ExchangeName] = ExchangeName("delayed_{queue_name}")


def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int:
Expand Down Expand Up @@ -138,25 +147,30 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
channel.close_callbacks.add(self._channel_close_callback)
return channel

async def _get_consumer_tag(self, exchange_name) -> str:
return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}"
async def _create_consumer_tag(self, exchange_name) -> ConsumerTag:
return ConsumerTag(
f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}_{uuid4()}"
)

async def subscribe(
self,
exchange_name: str,
exchange_name: ExchangeName,
message_handler: MessageHandler,
*,
exclusive_queue: bool = True,
non_exclusive_queue_name: str | None = None,
topics: list[str] | None = None,
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S,
unexpected_error_max_attempts: int = _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS,
) -> str:
) -> tuple[QueueName, ConsumerTag]:
"""subscribe to exchange_name calling ``message_handler`` for every incoming message
- exclusive_queue: True means that every instance of this application will
receive the incoming messages
- exclusive_queue: False means that only one instance of this application will
reveice the incoming message
- non_exclusive_queue_name: if exclusive_queue is False, then this name will be used. If None
it will use the exchange_name.
NOTE: ``message_ttl` is also a soft timeout: if the handler does not finish processing
the message before this is reached the message will be redelivered!
Expand All @@ -182,7 +196,7 @@ async def subscribe(
aio_pika.exceptions.ChannelPreconditionFailed: In case an existing exchange with
different type is used
Returns:
queue name
tuple of queue name and consumer tag mapping
"""

assert self._channel_pool # nosec
Expand Down Expand Up @@ -212,7 +226,7 @@ async def subscribe(
queue = await declare_queue(
channel,
self.client_name,
exchange_name,
non_exclusive_queue_name or exchange_name,
exclusive_queue=exclusive_queue,
message_ttl=message_ttl,
arguments={"x-dead-letter-exchange": delayed_exchange_name},
Expand All @@ -227,31 +241,33 @@ async def subscribe(
delayed_exchange = await channel.declare_exchange(
delayed_exchange_name, aio_pika.ExchangeType.FANOUT, durable=True
)
delayed_queue_name = _DELAYED_QUEUE_NAME.format(
queue_name=non_exclusive_queue_name or exchange_name
)

delayed_queue = await declare_queue(
channel,
self.client_name,
delayed_exchange_name,
delayed_queue_name,
exclusive_queue=exclusive_queue,
message_ttl=int(unexpected_error_retry_delay_s * 1000),
arguments={"x-dead-letter-exchange": exchange.name},
)
await delayed_queue.bind(delayed_exchange)

_consumer_tag = await self._get_consumer_tag(exchange_name)
consumer_tag = await self._create_consumer_tag(exchange_name)
await queue.consume(
partial(_on_message, message_handler, unexpected_error_max_attempts),
exclusive=exclusive_queue,
consumer_tag=_consumer_tag,
consumer_tag=consumer_tag,
)
output: str = queue.name
return output
return queue.name, consumer_tag

async def add_topics(
self,
exchange_name: str,
exchange_name: ExchangeName,
*,
topics: list[str],
topics: list[TopicName],
) -> None:
assert self._channel_pool # nosec

Expand All @@ -275,9 +291,9 @@ async def add_topics(

async def remove_topics(
self,
exchange_name: str,
exchange_name: ExchangeName,
*,
topics: list[str],
topics: list[TopicName],
) -> None:
assert self._channel_pool # nosec
async with self._channel_pool.acquire() as channel:
Expand All @@ -300,15 +316,24 @@ async def remove_topics(

async def unsubscribe(
self,
queue_name: str,
queue_name: QueueName,
) -> None:
"""This will delete the queue if there are no consumers left"""
assert self._connection_pool # nosec
if self._connection_pool.is_closed:
_logger.warning(
"Connection to RabbitMQ is already closed, skipping unsubscribe from queue..."
)
return
assert self._channel_pool # nosec
async with self._channel_pool.acquire() as channel:
queue = await channel.get_queue(queue_name)
# NOTE: we force delete here
await queue.delete(if_unused=False, if_empty=False)

async def publish(self, exchange_name: str, message: RabbitMessage) -> None:
async def publish(
self, exchange_name: ExchangeName, message: RabbitMessage
) -> None:
"""publish message in the exchange exchange_name.
specifying a topic will use a TOPIC type of RabbitMQ Exchange instead of FANOUT
Expand All @@ -333,10 +358,18 @@ async def publish(self, exchange_name: str, message: RabbitMessage) -> None:
routing_key=message.routing_key() or "",
)

async def unsubscribe_consumer(self, exchange_name: str):
async def unsubscribe_consumer(
self, queue_name: QueueName, consumer_tag: ConsumerTag
) -> None:
"""This will only remove the consumers without deleting the queue"""
assert self._connection_pool # nosec
if self._connection_pool.is_closed:
_logger.warning(
"Connection to RabbitMQ is already closed, skipping unsubscribe consumers from queue..."
)
return
assert self._channel_pool # nosec
async with self._channel_pool.acquire() as channel:
queue_name = exchange_name
assert isinstance(channel, aio_pika.RobustChannel) # nosec
queue = await channel.get_queue(queue_name)
_consumer_tag = await self._get_consumer_tag(exchange_name)
await queue.cancel(_consumer_tag)
await queue.cancel(consumer_tag)
7 changes: 6 additions & 1 deletion packages/service-library/src/servicelib/rabbitmq/_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Awaitable, Callable
from typing import Any, Protocol
from typing import Any, Protocol, TypeAlias

from models_library.basic_types import ConstrainedStr
from models_library.rabbitmq_basic_types import (
Expand All @@ -11,6 +11,11 @@

MessageHandler = Callable[[Any], Awaitable[bool]]

ExchangeName: TypeAlias = str
QueueName: TypeAlias = str
ConsumerTag: TypeAlias = str
TopicName: TypeAlias = str


class RabbitMessage(Protocol):
def body(self) -> bytes:
Expand Down
7 changes: 4 additions & 3 deletions packages/service-library/src/servicelib/rabbitmq/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tenacity.wait import wait_fixed

from ..logging_utils import log_context
from ._models import QueueName

_logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -65,7 +66,7 @@ def get_rabbitmq_client_unique_name(base_name: str) -> str:
async def declare_queue(
channel: aio_pika.RobustChannel,
client_name: str,
exchange_name: str,
queue_name: QueueName,
*,
exclusive_queue: bool,
arguments: dict[str, Any] | None = None,
Expand All @@ -78,11 +79,11 @@ async def declare_queue(
"durable": True,
"exclusive": exclusive_queue,
"arguments": default_arguments,
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive",
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{queue_name}_exclusive",
}
if not exclusive_queue:
# NOTE: setting a name will ensure multiple instance will take their data here
queue_parameters |= {"name": exchange_name}
queue_parameters |= {"name": queue_name}

# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
# most likely someone changed the signature of the queues (parameters etc...)
Expand Down
Loading

0 comments on commit bca766c

Please sign in to comment.