diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 19d600a25364..91873a69d087 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -1,15 +1,17 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable +# pylint: disable=protected-access import asyncio import logging from collections.abc import AsyncIterator, Awaitable, Callable +from contextlib import suppress import aio_pika import pytest import tenacity -from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient +from servicelib.rabbitmq import QueueName, RabbitMQClient, RabbitMQRPCClient from settings_library.rabbit import RabbitSettings from tenacity.before_sleep import before_sleep_log from tenacity.stop import stop_after_attempt @@ -132,8 +134,21 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien await asyncio.gather(*(client.close() for client in created_clients)) -async def rabbitmq_client(create_rabbitmq_client): - # NOTE: Legacy fixture - # Use create_rabbitmq_client instead of rabbitmq_client - # SEE docs/coding-conventions.md::CC4 - return create_rabbitmq_client +@pytest.fixture +async def ensure_parametrized_queue_is_empty( + create_rabbitmq_client: Callable[[str], RabbitMQClient], queue_name: QueueName +) -> AsyncIterator[None]: + rabbitmq_client = create_rabbitmq_client("pytest-purger") + + async def _queue_messages_purger() -> None: + with suppress(aio_pika.exceptions.ChannelClosed): + assert rabbitmq_client._channel_pool # noqa: SLF001 + async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001 + assert isinstance(channel, aio_pika.RobustChannel) + queue = await channel.get_queue(queue_name) + await queue.purge() + + await _queue_messages_purger() + yield + # cleanup + await _queue_messages_purger()