Skip to content

Commit

Permalink
added rabbitmq queue purger
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 26, 2024
1 parent 84c8d8f commit d34f362
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=unused-variable
# pylint: disable=protected-access

import asyncio
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import suppress

import aio_pika
import pytest
import tenacity
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
from servicelib.rabbitmq import QueueName, RabbitMQClient, RabbitMQRPCClient
from settings_library.rabbit import RabbitSettings
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
Expand Down Expand Up @@ -132,8 +134,21 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien
await asyncio.gather(*(client.close() for client in created_clients))


async def rabbitmq_client(create_rabbitmq_client):
# NOTE: Legacy fixture
# Use create_rabbitmq_client instead of rabbitmq_client
# SEE docs/coding-conventions.md::CC4
return create_rabbitmq_client
@pytest.fixture
async def ensure_parametrized_queue_is_empty(
create_rabbitmq_client: Callable[[str], RabbitMQClient], queue_name: QueueName
) -> AsyncIterator[None]:
rabbitmq_client = create_rabbitmq_client("pytest-purger")

async def _queue_messages_purger() -> None:
with suppress(aio_pika.exceptions.ChannelClosed):
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()

await _queue_messages_purger()
yield
# cleanup
await _queue_messages_purger()

0 comments on commit d34f362

Please sign in to comment.