From f7363201cfa31750de9fb11ba29abc059425cf0e Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 09:55:46 +0100 Subject: [PATCH 1/9] ensure unsubscribe consumer is only unsubscribing the right consumer and allow multiple consumers --- .../src/servicelib/rabbitmq/__init__.py | 3 ++ .../src/servicelib/rabbitmq/_client.py | 46 +++++++++++-------- .../src/servicelib/rabbitmq/_models.py | 5 +- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/__init__.py b/packages/service-library/src/servicelib/rabbitmq/__init__.py index 488ab99833c..b0c2a66fea4 100644 --- a/packages/service-library/src/servicelib/rabbitmq/__init__.py +++ b/packages/service-library/src/servicelib/rabbitmq/__init__.py @@ -8,6 +8,7 @@ RPCNotInitializedError, RPCServerError, ) +from ._models import ConsumerTag, QueueName from ._rpc_router import RPCRouter from ._utils import is_rabbitmq_responsive, wait_till_rabbitmq_responsive @@ -23,6 +24,8 @@ "RPCRouter", "RPCServerError", "wait_till_rabbitmq_responsive", + "QueueName", + "ConsumerTag", ) # nopycln: file diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index dc70aa03ffa..661f2c7f69b 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -3,13 +3,14 @@ from dataclasses import dataclass, field from functools import partial from typing import Final +from uuid import uuid4 import aio_pika from pydantic import NonNegativeInt from ..logging_utils import log_catch, log_context from ._client_base import RabbitMQClientBase -from ._models import MessageHandler, RabbitMessage +from ._models import ConsumerTag, MessageHandler, QueueName, RabbitMessage from ._utils import ( RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, declare_queue, @@ -139,7 +140,7 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel: return channel async def _get_consumer_tag(self, exchange_name) -> str: - return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}" + return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}_{uuid4()}" async def subscribe( self, @@ -151,7 +152,7 @@ async def subscribe( message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S, unexpected_error_max_attempts: int = _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS, - ) -> str: + ) -> tuple[QueueName, ConsumerTag]: """subscribe to exchange_name calling ``message_handler`` for every incoming message - exclusive_queue: True means that every instance of this application will receive the incoming messages @@ -238,14 +239,14 @@ async def subscribe( ) await delayed_queue.bind(delayed_exchange) - _consumer_tag = await self._get_consumer_tag(exchange_name) + consumer_tag = await self._get_consumer_tag(exchange_name) await queue.consume( partial(_on_message, message_handler, unexpected_error_max_attempts), exclusive=exclusive_queue, - consumer_tag=_consumer_tag, + consumer_tag=consumer_tag, ) output: str = queue.name - return output + return output, consumer_tag async def add_topics( self, @@ -300,13 +301,16 @@ async def remove_topics( async def unsubscribe( self, - queue_name: str, + queue_name: QueueName, ) -> None: - assert self._channel_pool # nosec - async with self._channel_pool.acquire() as channel: - queue = await channel.get_queue(queue_name) - # NOTE: we force delete here - await queue.delete(if_unused=False, if_empty=False) + """This will delete the queue if there are no consumers left""" + assert self._connection_pool # nosec + if not self._connection_pool.is_closed: + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + queue = await channel.get_queue(queue_name) + # NOTE: we force delete here + await queue.delete(if_unused=False, if_empty=False) async def publish(self, exchange_name: str, message: RabbitMessage) -> None: """publish message in the exchange exchange_name. @@ -333,10 +337,14 @@ async def publish(self, exchange_name: str, message: RabbitMessage) -> None: routing_key=message.routing_key() or "", ) - async def unsubscribe_consumer(self, exchange_name: str): - assert self._channel_pool # nosec - async with self._channel_pool.acquire() as channel: - queue_name = exchange_name - queue = await channel.get_queue(queue_name) - _consumer_tag = await self._get_consumer_tag(exchange_name) - await queue.cancel(_consumer_tag) + async def unsubscribe_consumer( + self, queue_name: QueueName, consumer_tag: ConsumerTag + ) -> None: + """This will only remove the consumers without deleting the queue""" + assert self._connection_pool # nosec + if not self._connection_pool.is_closed: + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + assert isinstance(channel, aio_pika.RobustChannel) # nosec + queue = await channel.get_queue(queue_name) + await queue.cancel(consumer_tag) diff --git a/packages/service-library/src/servicelib/rabbitmq/_models.py b/packages/service-library/src/servicelib/rabbitmq/_models.py index c76800a4d8a..beda88fc1cc 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_models.py +++ b/packages/service-library/src/servicelib/rabbitmq/_models.py @@ -1,5 +1,5 @@ from collections.abc import Awaitable, Callable -from typing import Any, Protocol +from typing import Any, Protocol, TypeAlias from models_library.basic_types import ConstrainedStr from models_library.rabbitmq_basic_types import ( @@ -11,6 +11,9 @@ MessageHandler = Callable[[Any], Awaitable[bool]] +QueueName: TypeAlias = str +ConsumerTag: TypeAlias = str + class RabbitMessage(Protocol): def body(self) -> bytes: From 8a044704587d2f0b7ed60d1c42e55751a8b76ff4 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:26:16 +0100 Subject: [PATCH 2/9] changed syntax of subscribe --- .../src/servicelib/rabbitmq/__init__.py | 7 +- .../src/servicelib/rabbitmq/_client.py | 68 ++++++++++++------- .../src/servicelib/rabbitmq/_models.py | 2 + .../tests/rabbitmq/test_rabbitmq.py | 45 +++++++----- 4 files changed, 77 insertions(+), 45 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/__init__.py b/packages/service-library/src/servicelib/rabbitmq/__init__.py index b0c2a66fea4..e2a1416d3e3 100644 --- a/packages/service-library/src/servicelib/rabbitmq/__init__.py +++ b/packages/service-library/src/servicelib/rabbitmq/__init__.py @@ -8,13 +8,16 @@ RPCNotInitializedError, RPCServerError, ) -from ._models import ConsumerTag, QueueName +from ._models import ConsumerTag, ExchangeName, QueueName from ._rpc_router import RPCRouter from ._utils import is_rabbitmq_responsive, wait_till_rabbitmq_responsive __all__: tuple[str, ...] = ( "BIND_TO_ALL_TOPICS", + "ConsumerTag", + "ExchangeName", "is_rabbitmq_responsive", + "QueueName", "RabbitMQClient", "RabbitMQRPCClient", "RemoteMethodNotRegisteredError", @@ -24,8 +27,6 @@ "RPCRouter", "RPCServerError", "wait_till_rabbitmq_responsive", - "QueueName", - "ConsumerTag", ) # nopycln: file diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index 661f2c7f69b..e5aa9b24007 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -10,7 +10,14 @@ from ..logging_utils import log_catch, log_context from ._client_base import RabbitMQClientBase -from ._models import ConsumerTag, MessageHandler, QueueName, RabbitMessage +from ._models import ( + ConsumerTag, + ExchangeName, + MessageHandler, + QueueName, + RabbitMessage, + TopicName, +) from ._utils import ( RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, declare_queue, @@ -27,7 +34,7 @@ _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S: Final[float] = 1 _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15 -_DELAYED_EXCHANGE_NAME: Final[str] = "delayed_{exchange_name}" +_DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}") def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int: @@ -139,12 +146,14 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel: channel.close_callbacks.add(self._channel_close_callback) return channel - async def _get_consumer_tag(self, exchange_name) -> str: - return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}_{uuid4()}" + async def _create_consumer_tag(self, exchange_name) -> ConsumerTag: + return ConsumerTag( + f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}_{uuid4()}" + ) async def subscribe( self, - exchange_name: str, + exchange_name: ExchangeName, message_handler: MessageHandler, *, exclusive_queue: bool = True, @@ -239,20 +248,19 @@ async def subscribe( ) await delayed_queue.bind(delayed_exchange) - consumer_tag = await self._get_consumer_tag(exchange_name) + consumer_tag = await self._create_consumer_tag(exchange_name) await queue.consume( partial(_on_message, message_handler, unexpected_error_max_attempts), exclusive=exclusive_queue, consumer_tag=consumer_tag, ) - output: str = queue.name - return output, consumer_tag + return queue.name, consumer_tag async def add_topics( self, - exchange_name: str, + exchange_name: ExchangeName, *, - topics: list[str], + topics: list[TopicName], ) -> None: assert self._channel_pool # nosec @@ -276,9 +284,9 @@ async def add_topics( async def remove_topics( self, - exchange_name: str, + exchange_name: ExchangeName, *, - topics: list[str], + topics: list[TopicName], ) -> None: assert self._channel_pool # nosec async with self._channel_pool.acquire() as channel: @@ -305,14 +313,20 @@ async def unsubscribe( ) -> None: """This will delete the queue if there are no consumers left""" assert self._connection_pool # nosec - if not self._connection_pool.is_closed: - assert self._channel_pool # nosec - async with self._channel_pool.acquire() as channel: - queue = await channel.get_queue(queue_name) - # NOTE: we force delete here - await queue.delete(if_unused=False, if_empty=False) + if self._connection_pool.is_closed: + _logger.warning( + "Connection to RabbitMQ is already closed, skipping unsubscribe from queue..." + ) + return + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + queue = await channel.get_queue(queue_name) + # NOTE: we force delete here + await queue.delete(if_unused=False, if_empty=False) - async def publish(self, exchange_name: str, message: RabbitMessage) -> None: + async def publish( + self, exchange_name: ExchangeName, message: RabbitMessage + ) -> None: """publish message in the exchange exchange_name. specifying a topic will use a TOPIC type of RabbitMQ Exchange instead of FANOUT @@ -342,9 +356,13 @@ async def unsubscribe_consumer( ) -> None: """This will only remove the consumers without deleting the queue""" assert self._connection_pool # nosec - if not self._connection_pool.is_closed: - assert self._channel_pool # nosec - async with self._channel_pool.acquire() as channel: - assert isinstance(channel, aio_pika.RobustChannel) # nosec - queue = await channel.get_queue(queue_name) - await queue.cancel(consumer_tag) + if self._connection_pool.is_closed: + _logger.warning( + "Connection to RabbitMQ is already closed, skipping unsubscribe consumers from queue..." + ) + return + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + assert isinstance(channel, aio_pika.RobustChannel) # nosec + queue = await channel.get_queue(queue_name) + await queue.cancel(consumer_tag) diff --git a/packages/service-library/src/servicelib/rabbitmq/_models.py b/packages/service-library/src/servicelib/rabbitmq/_models.py index beda88fc1cc..d713edfdc1d 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_models.py +++ b/packages/service-library/src/servicelib/rabbitmq/_models.py @@ -11,8 +11,10 @@ MessageHandler = Callable[[Any], Awaitable[bool]] +ExchangeName: TypeAlias = str QueueName: TypeAlias = str ConsumerTag: TypeAlias = str +TopicName: TypeAlias = str class RabbitMessage(Protocol): diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq.py b/packages/service-library/tests/rabbitmq/test_rabbitmq.py index 5bc26c3be1e..c9fa7edf844 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq.py @@ -15,7 +15,13 @@ import pytest from faker import Faker from pytest_mock.plugin import MockerFixture -from servicelib.rabbitmq import BIND_TO_ALL_TOPICS, RabbitMQClient, _client +from servicelib.rabbitmq import ( + BIND_TO_ALL_TOPICS, + ConsumerTag, + QueueName, + RabbitMQClient, + _client, +) from servicelib.rabbitmq._client import _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS from settings_library.rabbit import RabbitSettings from tenacity.asyncio import AsyncRetrying @@ -325,7 +331,7 @@ async def test_publish_with_no_registered_subscriber( ttl_s: float = 0.1 topics_count: int = 1 if topics is None else len(topics) - async def _publish_random_message(): + async def _publish_random_message() -> None: if topics is None: message = random_rabbit_message() await publisher.publish(exchange_name, message) @@ -335,8 +341,8 @@ async def _publish_random_message(): message = random_rabbit_message(topic=topic) await publisher.publish(exchange_name, message) - async def _subscribe_consumer_to_queue(): - await consumer.subscribe( + async def _subscribe_consumer_to_queue() -> tuple[QueueName, ConsumerTag]: + return await consumer.subscribe( exchange_name, mocked_message_parser, topics=topics, @@ -346,27 +352,30 @@ async def _subscribe_consumer_to_queue(): unexpected_error_retry_delay_s=ttl_s, ) - async def _unsubscribe_consumer(): - await consumer.unsubscribe_consumer(exchange_name) + async def _unsubscribe_consumer( + queue_name: QueueName, consumer_tag: ConsumerTag + ) -> None: + await consumer.unsubscribe_consumer(queue_name, consumer_tag) # CASE 1 (subscribe immediately after publishing message) - await _subscribe_consumer_to_queue() - await _unsubscribe_consumer() + consumer_1 = await _subscribe_consumer_to_queue() + await _unsubscribe_consumer(*consumer_1) await _publish_random_message() # reconnect immediately - await _subscribe_consumer_to_queue() + consumer_2 = await _subscribe_consumer_to_queue() # expected to receive a message (one per topic) await _assert_wait_for_messages(on_message_spy, 1 * topics_count) # CASE 2 (no subscriber attached when publishing) on_message_spy.reset_mock() - await _unsubscribe_consumer() + await _unsubscribe_consumer(*consumer_2) await _publish_random_message() # wait for message to expire (will be dropped) await asyncio.sleep(ttl_s * 2) - await _subscribe_consumer_to_queue() + _consumer_3 = await _subscribe_consumer_to_queue() + # wait for a message to be possibly delivered await asyncio.sleep(ttl_s * 2) # nothing changed from before @@ -604,7 +613,7 @@ async def test_rabbit_pub_sub_bind_and_unbind_topics( ) # we should get no messages since no one was subscribed - queue_name = await consumer.subscribe( + queue_name, consumer_tag = await consumer.subscribe( exchange_name, mocked_message_parser, topics=[] ) await _assert_message_received(mocked_message_parser, 0) @@ -666,7 +675,7 @@ async def test_rabbit_adding_topics_to_a_fanout_exchange( message = random_rabbit_message() publisher = create_rabbitmq_client("publisher") consumer = create_rabbitmq_client("consumer") - queue_name = await consumer.subscribe(exchange_name, mocked_message_parser) + queue_name, _ = await consumer.subscribe(exchange_name, mocked_message_parser) await publisher.publish(exchange_name, message) await _assert_message_received(mocked_message_parser, 1, message) mocked_message_parser.reset_mock() @@ -709,10 +718,12 @@ async def test_unsubscribe_consumer( ): exchange_name = f"{random_exchange_name()}" client = create_rabbitmq_client("consumer") - await client.subscribe(exchange_name, mocked_message_parser, exclusive_queue=False) + queue_name, consumer_tag = await client.subscribe( + exchange_name, mocked_message_parser, exclusive_queue=False + ) # Unsubsribe just a consumer, the queue will be still there - await client.unsubscribe_consumer(exchange_name) + await client.unsubscribe_consumer(queue_name, consumer_tag) # Unsubsribe the queue - await client.unsubscribe(exchange_name) + await client.unsubscribe(queue_name) with pytest.raises(aio_pika.exceptions.ChannelNotFoundEntity): - await client.unsubscribe(exchange_name) + await client.unsubscribe(queue_name) From 7b15044c71b5b45719ffc3503436fc2b60045a4e Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:26:33 +0100 Subject: [PATCH 3/9] updated services --- .../services/log_streaming.py | 12 ++++---- .../services/auto_recharge_listener.py | 28 +++++++++---------- .../_rabbitmq_consumers_common.py | 12 ++++---- .../_rabbitmq_exclusive_queue_consumers.py | 12 ++++---- .../_rabbitmq_nonexclusive_queue_consumers.py | 4 +-- 5 files changed, 33 insertions(+), 35 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 6bfcc248414..f6accba05d7 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -10,15 +10,15 @@ from pydantic import NonNegativeInt from servicelib.logging_errors import create_troubleshotting_log_kwargs from servicelib.logging_utils import log_catch -from servicelib.rabbitmq import RabbitMQClient -from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError -from simcore_service_api_server.models.schemas.errors import ErrorGet +from servicelib.rabbitmq import QueueName, RabbitMQClient from .._constants import MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE +from ..exceptions.backend_errors import BaseBackEndError from ..exceptions.log_streaming_errors import ( LogStreamerNotRegisteredError, LogStreamerRegistionConflictError, ) +from ..models.schemas.errors import ErrorGet from ..models.schemas.jobs import JobID, JobLog from .director_v2 import DirectorV2Api @@ -31,10 +31,10 @@ class LogDistributor: def __init__(self, rabbitmq_client: RabbitMQClient): self._rabbit_client = rabbitmq_client self._log_streamers: dict[JobID, Queue[JobLog]] = {} - self._queue_name: str + self._queue_name: QueueName async def setup(self): - self._queue_name = await self._rabbit_client.subscribe( + self._queue_name, _ = await self._rabbit_client.subscribe( LoggerRabbitMessage.get_channel_name(), self._distribute_logs, exclusive_queue=True, @@ -123,7 +123,7 @@ async def log_generator(self) -> AsyncIterable[str]: self._queue.get(), timeout=self._log_check_timeout ) yield log.model_dump_json() + _NEW_LINE - except asyncio.TimeoutError: + except TimeoutError: done = await self._project_done() except BaseBackEndError as exc: diff --git a/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py b/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py index d60c22d84f3..26e30bbe4db 100644 --- a/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py +++ b/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py @@ -4,7 +4,7 @@ from fastapi import FastAPI from models_library.rabbitmq_messages import WalletCreditsMessage from servicelib.logging_utils import log_context -from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq import ConsumerTag, QueueName from .auto_recharge_process_message import process_message from .rabbitmq import get_rabbitmq_client @@ -12,36 +12,36 @@ _logger = logging.getLogger(__name__) -async def _subscribe_to_rabbitmq(app) -> str: +async def _subscribe_to_rabbitmq(app) -> tuple[QueueName, ConsumerTag]: with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): - rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - subscribed_queue: str = await rabbit_client.subscribe( + rabbit_client = get_rabbitmq_client(app) + return await rabbit_client.subscribe( WalletCreditsMessage.get_channel_name(), message_handler=functools.partial(process_message, app), exclusive_queue=False, topics=["#"], ) - return subscribed_queue -async def _unsubscribe_consumer(app) -> None: +async def _unsubscribe_consumer( + app, queue_name: QueueName, consumer_tag: ConsumerTag +) -> None: with log_context(_logger, logging.INFO, msg="Unsubscribing from rabbitmq queue"): - rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - await rabbit_client.unsubscribe_consumer( - WalletCreditsMessage.get_channel_name(), - ) + rabbit_client = get_rabbitmq_client(app) + await rabbit_client.unsubscribe_consumer(queue_name, consumer_tag) def setup_auto_recharge_listener(app: FastAPI): - async def _on_startup(): + async def _on_startup() -> None: app.state.auto_recharge_rabbitmq_consumer = await _subscribe_to_rabbitmq(app) - async def _on_shutdown(): + async def _on_shutdown() -> None: assert app.state.auto_recharge_rabbitmq_consumer # nosec + assert isinstance(app.state.auto_recharge_rabbitmq_consumer, tuple) # nosec if app.state.rabbitmq_client: # NOTE: We want to have persistent queue, therefore we will unsubscribe only consumer - await _unsubscribe_consumer(app) - app.state.auto_recharge_rabbitmq_constumer = None + await _unsubscribe_consumer(app, *app.state.auto_recharge_rabbitmq_consumer) + app.state.auto_recharge_rabbitmq_consumer = None app.add_event_handler("startup", _on_startup) app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers_common.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers_common.py index 9f033db621b..e841d6d04c3 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers_common.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers_common.py @@ -5,7 +5,7 @@ from aiohttp import web from servicelib.logging_utils import log_context -from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq import ConsumerTag, ExchangeName, QueueName, RabbitMQClient from servicelib.utils import logged_gather from ..rabbitmq import get_rabbitmq_client @@ -25,10 +25,10 @@ async def subscribe_to_rabbitmq( SubcribeArgumentsTuple, ..., ], -) -> dict[str, str]: +) -> dict[ExchangeName, tuple[QueueName, ConsumerTag]]: with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channels"): rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - subscribed_queues = await logged_gather( + subscribed_queue_consumer_mappings = await logged_gather( *( rabbit_client.subscribe( p.exchange_name, @@ -40,8 +40,8 @@ async def subscribe_to_rabbitmq( reraise=True, ) return { - exchange_name: queue_name - for (exchange_name, *_), queue_name in zip( - exchange_to_parser_config, subscribed_queues, strict=True + exchange_name: queue_consumer_map + for (exchange_name, *_), queue_consumer_map in zip( + exchange_to_parser_config, subscribed_queue_consumer_mappings, strict=True ) } diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index 67c5d39b65b..048d0162fe3 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -65,12 +65,10 @@ async def _convert_to_node_update_event( async def _progress_message_parser(app: web.Application, data: bytes) -> bool: - rabbit_message: ( - ProgressRabbitMessageNode | ProgressRabbitMessageProject - ) = TypeAdapter( - ProgressRabbitMessageNode | ProgressRabbitMessageProject - ).validate_json( - data + rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = ( + TypeAdapter( + ProgressRabbitMessageNode | ProgressRabbitMessageProject + ).validate_json(data) ) message: SocketMessageDict | None = None if isinstance(rabbit_message, ProgressRabbitMessageProject): @@ -183,7 +181,7 @@ async def _unsubscribe_from_rabbitmq(app) -> None: await logged_gather( *( rabbit_client.unsubscribe(queue_name) - for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values() + for queue_name, _ in app[_APP_RABBITMQ_CONSUMERS_KEY].values() ), ) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py index b6271be822a..5d4b5578a5d 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py @@ -59,8 +59,8 @@ async def _unsubscribe_from_rabbitmq(app) -> None: rabbit_client: RabbitMQClient = get_rabbitmq_client(app) await logged_gather( *( - rabbit_client.unsubscribe_consumer(queue_name) - for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values() + rabbit_client.unsubscribe_consumer(*queue_consumer_map) + for queue_consumer_map in app[_APP_RABBITMQ_CONSUMERS_KEY].values() ), ) From 3e385d35f8a712671b63be02499a673c57215a9a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:48:03 +0100 Subject: [PATCH 4/9] fix test --- .../with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 18a992e3db7..6f016f297c0 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -558,7 +558,7 @@ async def instrumentation_rabbit_client_parser( ) -> AsyncIterator[mock.AsyncMock]: client = create_rabbitmq_client("instrumentation_pytest_consumer") mock = mocker.AsyncMock(return_value=True) - queue_name = await client.subscribe( + queue_name, _ = await client.subscribe( InstrumentationRabbitMessage.get_channel_name(), mock ) yield mock @@ -571,7 +571,7 @@ async def resource_tracking_rabbit_client_parser( ) -> AsyncIterator[mock.AsyncMock]: client = create_rabbitmq_client("resource_tracking_pytest_consumer") mock = mocker.AsyncMock(return_value=True) - queue_name = await client.subscribe( + queue_name, _ = await client.subscribe( RabbitResourceTrackingBaseMessage.get_channel_name(), mock ) yield mock From 660b98b8d5defa12a9170e417947098c11dedc89 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:50:53 +0100 Subject: [PATCH 5/9] fixed usage --- .../services/process_messages_setup.py | 2 +- .../services/process_message_running_service_setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py index deb151fbda5..2afd34d2f42 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py @@ -25,7 +25,7 @@ async def _subscribe_to_rabbitmq(app) -> str: with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - subscribed_queue: str = await rabbit_client.subscribe( + subscribed_queue, _ = await rabbit_client.subscribe( DynamicServiceRunningMessage.get_channel_name(), message_handler=functools.partial( process_dynamic_service_running_message, app diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service_setup.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service_setup.py index 3624eb1254d..c393626e469 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service_setup.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service_setup.py @@ -20,7 +20,7 @@ async def _subscribe_to_rabbitmq(app) -> str: with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - subscribed_queue: str = await rabbit_client.subscribe( + subscribed_queue, _ = await rabbit_client.subscribe( RabbitResourceTrackingBaseMessage.get_channel_name(), message_handler=functools.partial(process_message, app), exclusive_queue=False, From bab69b4467c06dc42b6ee94b4fad008ba06391b1 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 13:57:48 +0100 Subject: [PATCH 6/9] cleanup --- .../service-library/src/servicelib/rabbitmq/_client.py | 9 ++++++++- .../service-library/src/servicelib/rabbitmq/_utils.py | 5 +++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index e5aa9b24007..5d0ed66a1bc 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -157,6 +157,7 @@ async def subscribe( message_handler: MessageHandler, *, exclusive_queue: bool = True, + non_exclusive_queue_name: str | None = None, topics: list[str] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S, @@ -167,6 +168,8 @@ async def subscribe( receive the incoming messages - exclusive_queue: False means that only one instance of this application will reveice the incoming message + - non_exclusive_queue_name: if exclusive_queue is False, then this name will be used. If None + it will use the exchange_name. NOTE: ``message_ttl` is also a soft timeout: if the handler does not finish processing the message before this is reached the message will be redelivered! @@ -192,7 +195,7 @@ async def subscribe( aio_pika.exceptions.ChannelPreconditionFailed: In case an existing exchange with different type is used Returns: - queue name + tuple of queue name and consumer tag mapping """ assert self._channel_pool # nosec @@ -224,6 +227,7 @@ async def subscribe( self.client_name, exchange_name, exclusive_queue=exclusive_queue, + non_exclusive_queue_name=non_exclusive_queue_name, message_ttl=message_ttl, arguments={"x-dead-letter-exchange": delayed_exchange_name}, ) @@ -243,6 +247,7 @@ async def subscribe( self.client_name, delayed_exchange_name, exclusive_queue=exclusive_queue, + non_exclusive_queue_name=non_exclusive_queue_name, message_ttl=int(unexpected_error_retry_delay_s * 1000), arguments={"x-dead-letter-exchange": exchange.name}, ) @@ -271,6 +276,7 @@ async def add_topics( self.client_name, exchange_name, exclusive_queue=True, + non_exclusive_queue_name=None, arguments={ "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( exchange_name=exchange_name @@ -296,6 +302,7 @@ async def remove_topics( self.client_name, exchange_name, exclusive_queue=True, + non_exclusive_queue_name=None, arguments={ "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( exchange_name=exchange_name diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 176635e1e88..5e57645b36a 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -68,6 +68,7 @@ async def declare_queue( exchange_name: str, *, exclusive_queue: bool, + non_exclusive_queue_name: str | None, arguments: dict[str, Any] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, ) -> aio_pika.abc.AbstractRobustQueue: @@ -78,11 +79,11 @@ async def declare_queue( "durable": True, "exclusive": exclusive_queue, "arguments": default_arguments, - "name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive", + "name": f"{get_rabbitmq_client_unique_name(client_name)}_{non_exclusive_queue_name or exchange_name}_exclusive", } if not exclusive_queue: # NOTE: setting a name will ensure multiple instance will take their data here - queue_parameters |= {"name": exchange_name} + queue_parameters |= {"name": non_exclusive_queue_name or exchange_name} # NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED`` # most likely someone changed the signature of the queues (parameters etc...) From 5f46724b2e38bda3ba65a355f73851e12a8904e2 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 13:59:03 +0100 Subject: [PATCH 7/9] add flaky mark --- .../autoscaling/tests/unit/test_modules_buffer_machine_core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py b/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py index c81369a89bb..24a552f342b 100644 --- a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py +++ b/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py @@ -585,6 +585,7 @@ def unneeded_instance_type( return random_type +@pytest.mark.flaky(max_runs=3) @pytest.mark.parametrize( "expected_buffer_params", [ From 70fb78075f210534e69b85f8a84cea4b4d9f5434 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:48:14 +0100 Subject: [PATCH 8/9] cleanup --- .../pytest-simcore/src/pytest_simcore/rabbit_service.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 19d600a2536..d8dc38feb09 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -1,6 +1,7 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable +# pylint: disable=protected-access import asyncio import logging @@ -130,10 +131,3 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien yield _creator # cleanup, properly close the clients await asyncio.gather(*(client.close() for client in created_clients)) - - -async def rabbitmq_client(create_rabbitmq_client): - # NOTE: Legacy fixture - # Use create_rabbitmq_client instead of rabbitmq_client - # SEE docs/coding-conventions.md::CC4 - return create_rabbitmq_client From 62c1083f738204740810d17104fe97d85c901390 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:48:57 +0100 Subject: [PATCH 9/9] @matusdrobuliak66 review: added option to define the queue name --- .../src/servicelib/rabbitmq/_client.py | 12 +++--- .../src/servicelib/rabbitmq/_utils.py | 8 ++-- .../tests/rabbitmq/test_rabbitmq.py | 42 +++++++++++++++---- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index 5d0ed66a1bc..ccf1445c231 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -35,6 +35,7 @@ _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15 _DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}") +_DELAYED_QUEUE_NAME: Final[ExchangeName] = ExchangeName("delayed_{queue_name}") def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int: @@ -225,9 +226,8 @@ async def subscribe( queue = await declare_queue( channel, self.client_name, - exchange_name, + non_exclusive_queue_name or exchange_name, exclusive_queue=exclusive_queue, - non_exclusive_queue_name=non_exclusive_queue_name, message_ttl=message_ttl, arguments={"x-dead-letter-exchange": delayed_exchange_name}, ) @@ -241,13 +241,15 @@ async def subscribe( delayed_exchange = await channel.declare_exchange( delayed_exchange_name, aio_pika.ExchangeType.FANOUT, durable=True ) + delayed_queue_name = _DELAYED_QUEUE_NAME.format( + queue_name=non_exclusive_queue_name or exchange_name + ) delayed_queue = await declare_queue( channel, self.client_name, - delayed_exchange_name, + delayed_queue_name, exclusive_queue=exclusive_queue, - non_exclusive_queue_name=non_exclusive_queue_name, message_ttl=int(unexpected_error_retry_delay_s * 1000), arguments={"x-dead-letter-exchange": exchange.name}, ) @@ -276,7 +278,6 @@ async def add_topics( self.client_name, exchange_name, exclusive_queue=True, - non_exclusive_queue_name=None, arguments={ "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( exchange_name=exchange_name @@ -302,7 +303,6 @@ async def remove_topics( self.client_name, exchange_name, exclusive_queue=True, - non_exclusive_queue_name=None, arguments={ "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( exchange_name=exchange_name diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 5e57645b36a..404adb1b652 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -13,6 +13,7 @@ from tenacity.wait import wait_fixed from ..logging_utils import log_context +from ._models import QueueName _logger = logging.getLogger(__file__) @@ -65,10 +66,9 @@ def get_rabbitmq_client_unique_name(base_name: str) -> str: async def declare_queue( channel: aio_pika.RobustChannel, client_name: str, - exchange_name: str, + queue_name: QueueName, *, exclusive_queue: bool, - non_exclusive_queue_name: str | None, arguments: dict[str, Any] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, ) -> aio_pika.abc.AbstractRobustQueue: @@ -79,11 +79,11 @@ async def declare_queue( "durable": True, "exclusive": exclusive_queue, "arguments": default_arguments, - "name": f"{get_rabbitmq_client_unique_name(client_name)}_{non_exclusive_queue_name or exchange_name}_exclusive", + "name": f"{get_rabbitmq_client_unique_name(client_name)}_{queue_name}_exclusive", } if not exclusive_queue: # NOTE: setting a name will ensure multiple instance will take their data here - queue_parameters |= {"name": non_exclusive_queue_name or exchange_name} + queue_parameters |= {"name": queue_name} # NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED`` # most likely someone changed the signature of the queues (parameters etc...) diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq.py b/packages/service-library/tests/rabbitmq/test_rabbitmq.py index c9fa7edf844..d4c6c4b8ebb 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq.py @@ -6,7 +6,7 @@ import asyncio -from collections.abc import Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass from typing import Any, Final from unittest import mock @@ -464,9 +464,7 @@ def _raise_once_then_true(*args, **kwargs): if _raise_once_then_true.calls == 1: msg = "this is a test!" raise KeyError(msg) - if _raise_once_then_true.calls == 2: - return False - return True + return _raise_once_then_true.calls != 2 exchange_name = random_exchange_name() _raise_once_then_true.calls = 0 @@ -476,6 +474,22 @@ def _raise_once_then_true(*args, **kwargs): await _assert_message_received(mocked_message_parser, 3, message) +@pytest.fixture +async def ensure_queue_deletion( + create_rabbitmq_client: Callable[[str], RabbitMQClient] +) -> AsyncIterator[Callable[[QueueName], None]]: + created_queues = set() + + def _(queue_name: QueueName) -> None: + created_queues.add(queue_name) + + yield _ + + client = create_rabbitmq_client("ensure_queue_deletion") + await asyncio.gather(*(client.unsubscribe(q) for q in created_queues)) + + +@pytest.mark.parametrize("defined_queue_name", [None, "pytest-queue"]) @pytest.mark.parametrize("num_subs", [10]) async def test_pub_sub_with_non_exclusive_queue( create_rabbitmq_client: Callable[[str], RabbitMQClient], @@ -483,6 +497,8 @@ async def test_pub_sub_with_non_exclusive_queue( mocker: MockerFixture, random_rabbit_message: Callable[..., PytestRabbitMessage], num_subs: int, + defined_queue_name: QueueName | None, + ensure_queue_deletion: Callable[[QueueName], None], ): consumers = (create_rabbitmq_client(f"consumer_{n}") for n in range(num_subs)) mocked_message_parsers = [ @@ -492,13 +508,25 @@ async def test_pub_sub_with_non_exclusive_queue( publisher = create_rabbitmq_client("publisher") message = random_rabbit_message() exchange_name = random_exchange_name() - await asyncio.gather( + list_queue_name_consumer_mappings = await asyncio.gather( *( - consumer.subscribe(exchange_name, parser, exclusive_queue=False) + consumer.subscribe( + exchange_name, + parser, + exclusive_queue=False, + non_exclusive_queue_name=defined_queue_name, + ) for consumer, parser in zip(consumers, mocked_message_parsers, strict=True) ) ) - + for queue_name, _ in list_queue_name_consumer_mappings: + assert ( + queue_name == exchange_name + if defined_queue_name is None + else defined_queue_name + ) + ensure_queue_deletion(queue_name) + ensure_queue_deletion(f"delayed_{queue_name}") await publisher.publish(exchange_name, message) # only one consumer should have gotten the message here and the others not async for attempt in AsyncRetrying(