Skip to content

Commit

Permalink
updated services
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 26, 2024
1 parent 0d95e66 commit 7319fe4
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from pydantic import NonNegativeInt
from servicelib.logging_errors import create_troubleshotting_log_kwargs
from servicelib.logging_utils import log_catch
from servicelib.rabbitmq import RabbitMQClient
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
from simcore_service_api_server.models.schemas.errors import ErrorGet
from servicelib.rabbitmq import QueueName, RabbitMQClient

from .._constants import MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE
from ..exceptions.backend_errors import BaseBackEndError
from ..exceptions.log_streaming_errors import (
LogStreamerNotRegisteredError,
LogStreamerRegistionConflictError,
)
from ..models.schemas.errors import ErrorGet
from ..models.schemas.jobs import JobID, JobLog
from .director_v2 import DirectorV2Api

Expand All @@ -31,10 +31,10 @@ class LogDistributor:
def __init__(self, rabbitmq_client: RabbitMQClient):
self._rabbit_client = rabbitmq_client
self._log_streamers: dict[JobID, Queue[JobLog]] = {}
self._queue_name: str
self._queue_name: QueueName

async def setup(self):
self._queue_name = await self._rabbit_client.subscribe(
self._queue_name, _ = await self._rabbit_client.subscribe(
LoggerRabbitMessage.get_channel_name(),
self._distribute_logs,
exclusive_queue=True,
Expand Down Expand Up @@ -123,7 +123,7 @@ async def log_generator(self) -> AsyncIterable[str]:
self._queue.get(), timeout=self._log_check_timeout
)
yield log.model_dump_json() + _NEW_LINE
except asyncio.TimeoutError:
except TimeoutError:
done = await self._project_done()

except BaseBackEndError as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,44 @@
from fastapi import FastAPI
from models_library.rabbitmq_messages import WalletCreditsMessage
from servicelib.logging_utils import log_context
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq import ConsumerTag, QueueName

from .auto_recharge_process_message import process_message
from .rabbitmq import get_rabbitmq_client

_logger = logging.getLogger(__name__)


async def _subscribe_to_rabbitmq(app) -> str:
async def _subscribe_to_rabbitmq(app) -> tuple[QueueName, ConsumerTag]:
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
subscribed_queue: str = await rabbit_client.subscribe(
rabbit_client = get_rabbitmq_client(app)
return await rabbit_client.subscribe(
WalletCreditsMessage.get_channel_name(),
message_handler=functools.partial(process_message, app),
exclusive_queue=False,
topics=["#"],
)
return subscribed_queue


async def _unsubscribe_consumer(app) -> None:
async def _unsubscribe_consumer(
app, queue_name: QueueName, consumer_tag: ConsumerTag
) -> None:
with log_context(_logger, logging.INFO, msg="Unsubscribing from rabbitmq queue"):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
await rabbit_client.unsubscribe_consumer(
WalletCreditsMessage.get_channel_name(),
)
rabbit_client = get_rabbitmq_client(app)
await rabbit_client.unsubscribe_consumer(queue_name, consumer_tag)


def setup_auto_recharge_listener(app: FastAPI):
async def _on_startup():
async def _on_startup() -> None:
app.state.auto_recharge_rabbitmq_consumer = await _subscribe_to_rabbitmq(app)

async def _on_shutdown():
async def _on_shutdown() -> None:
assert app.state.auto_recharge_rabbitmq_consumer # nosec
assert isinstance(app.state.auto_recharge_rabbitmq_consumer, tuple) # nosec
if app.state.rabbitmq_client:
# NOTE: We want to have persistent queue, therefore we will unsubscribe only consumer
await _unsubscribe_consumer(app)
app.state.auto_recharge_rabbitmq_constumer = None
await _unsubscribe_consumer(app, *app.state.auto_recharge_rabbitmq_consumer)
app.state.auto_recharge_rabbitmq_consumer = None

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from aiohttp import web
from servicelib.logging_utils import log_context
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq import ConsumerTag, ExchangeName, QueueName, RabbitMQClient
from servicelib.utils import logged_gather

from ..rabbitmq import get_rabbitmq_client
Expand All @@ -25,10 +25,10 @@ async def subscribe_to_rabbitmq(
SubcribeArgumentsTuple,
...,
],
) -> dict[str, str]:
) -> dict[ExchangeName, tuple[QueueName, ConsumerTag]]:
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channels"):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
subscribed_queues = await logged_gather(
subscribed_queue_consumer_mappings = await logged_gather(
*(
rabbit_client.subscribe(
p.exchange_name,
Expand All @@ -40,8 +40,8 @@ async def subscribe_to_rabbitmq(
reraise=True,
)
return {
exchange_name: queue_name
for (exchange_name, *_), queue_name in zip(
exchange_to_parser_config, subscribed_queues, strict=True
exchange_name: queue_consumer_map
for (exchange_name, *_), queue_consumer_map in zip(
exchange_to_parser_config, subscribed_queue_consumer_mappings, strict=True
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ async def _convert_to_node_update_event(


async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message: (
ProgressRabbitMessageNode | ProgressRabbitMessageProject
) = TypeAdapter(
ProgressRabbitMessageNode | ProgressRabbitMessageProject
).validate_json(
data
rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = (
TypeAdapter(
ProgressRabbitMessageNode | ProgressRabbitMessageProject
).validate_json(data)
)
message: SocketMessageDict | None = None
if isinstance(rabbit_message, ProgressRabbitMessageProject):
Expand Down Expand Up @@ -183,7 +181,7 @@ async def _unsubscribe_from_rabbitmq(app) -> None:
await logged_gather(
*(
rabbit_client.unsubscribe(queue_name)
for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
for queue_name, _ in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ async def _unsubscribe_from_rabbitmq(app) -> None:
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
await logged_gather(
*(
rabbit_client.unsubscribe_consumer(queue_name)
for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
rabbit_client.unsubscribe_consumer(*queue_consumer_map)
for queue_consumer_map in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
),
)

Expand Down

0 comments on commit 7319fe4

Please sign in to comment.