Skip to content

Commit

Permalink
♻️ Adding additional checks to RUT + 🐛 Setting RabbitMQ message TTL (I…
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Dec 11, 2023
1 parent 3f92d39 commit 1c7a161
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
9 changes: 8 additions & 1 deletion packages/service-library/src/servicelib/rabbitmq/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
from typing import Final

import aio_pika
from pydantic import NonNegativeInt

from ..logging_utils import log_context
from ._client_base import RabbitMQClientBase
from ._models import MessageHandler, RabbitMessage
from ._utils import declare_queue, get_rabbitmq_client_unique_name
from ._utils import (
RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
declare_queue,
get_rabbitmq_client_unique_name,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -73,6 +78,7 @@ async def subscribe(
*,
exclusive_queue: bool = True,
topics: list[str] | None = None,
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
) -> str:
"""subscribe to exchange_name calling message_handler for every incoming message
- exclusive_queue: True means that every instance of this application will receive the incoming messages
Expand Down Expand Up @@ -114,6 +120,7 @@ async def subscribe(
self.client_name,
exchange_name,
exclusive_queue=exclusive_queue,
message_ttl=message_ttl,
)
if topics is None:
await queue.bind(exchange, routing_key="")
Expand Down
7 changes: 4 additions & 3 deletions packages/service-library/src/servicelib/rabbitmq/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Final

import aio_pika
from pydantic import NonNegativeInt
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
Expand All @@ -16,8 +17,7 @@

_MINUTE: Final[int] = 60


_RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S: Final[int] = 15 * _MINUTE
RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS: Final[int] = 15 * _MINUTE * 1000


class RabbitMQRetryPolicyUponInitialization:
Expand Down Expand Up @@ -55,11 +55,12 @@ async def declare_queue(
exchange_name: str,
*,
exclusive_queue: bool,
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
) -> aio_pika.abc.AbstractRobustQueue:
queue_parameters = {
"durable": True,
"exclusive": exclusive_queue,
"arguments": {"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S},
"arguments": {"x-message-ttl": message_ttl},
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive",
}
if not exclusive_queue:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ async def update_service_run_stopped_at(
return None
return ServiceRunDB.from_orm(row)

async def get_service_run_by_id(
self, service_run_id: ServiceRunId
) -> ServiceRunDB | None:
async with self.db_engine.begin() as conn:
stmt = sa.select(resource_tracker_service_runs).where(
resource_tracker_service_runs.c.service_run_id == service_run_id
)
result = await conn.execute(stmt)
row = result.first()
if row is None:
return None
return ServiceRunDB.from_orm(row)

async def list_service_runs_by_product_and_user_and_wallet(
self,
product_name: ProductName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
_TASK_NAME_START_PERIODIC_TASK = "start_background_task"
_TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES = "periodic_check_of_running_services"

_RUT_MESSAGE_TTL_IN_MS = 2 * 60 * 60 * 1000 # 2 hours


async def _subscribe_to_rabbitmq(app) -> str:
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"):
Expand All @@ -31,6 +33,7 @@ async def _subscribe_to_rabbitmq(app) -> str:
RabbitResourceTrackingBaseMessage.get_channel_name(),
message_handler=functools.partial(process_message, app),
exclusive_queue=False,
message_ttl=_RUT_MESSAGE_TTL_IN_MS,
)
return subscribed_queue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,25 @@ async def _process_start_event(
msg: RabbitResourceTrackingStartedMessage,
rabbitmq_client: RabbitMQClient,
):
service_run_db = await resource_tracker_repo.get_service_run_by_id(
service_run_id=msg.service_run_id
)
if service_run_db:
# NOTE: After we find out why sometimes RUT recieves multiple start events and fix it, we can change it to log level `error`
_logger.warning(
"On process start event the service run id %s already exists in DB, INVESTIGATE! Current msg created_at: %s, already stored msg created_at: %s",
msg.service_run_id,
msg.created_at,
service_run_db.started_at,
)
return

# Prepare `service run` record (if billable `credit transaction`) in the DB
service_type = (
ResourceTrackerServiceType.COMPUTATIONAL_SERVICE
if msg.service_type == ServiceType.COMPUTATIONAL
else ResourceTrackerServiceType.DYNAMIC_SERVICE
)

pricing_unit_cost = None
if msg.pricing_unit_cost_id:
pricing_unit_cost_db = await resource_tracker_repo.get_pricing_unit_cost_by_id(
Expand Down Expand Up @@ -134,10 +147,29 @@ async def _process_heartbeat_event(
msg: RabbitResourceTrackingHeartbeatMessage,
rabbitmq_client: RabbitMQClient,
):
service_run_db = await resource_tracker_repo.get_service_run_by_id(
service_run_id=msg.service_run_id
)
if not service_run_db:
_logger.error(
"Recieved process heartbeat event for service_run_id: %s, but we do not have the started record in the DB, INVESTIGATE!",
msg.service_run_id,
)
return
if service_run_db.service_run_status in {
ServiceRunStatus.SUCCESS,
ServiceRunStatus.ERROR,
}:
_logger.error(
"Recieved process heartbeat event for service_run_id: %s, but it was already closed, INVESTIGATE!",
msg.service_run_id,
)
return

# Update `service run` record (if billable `credit transaction`) in the DB
update_service_run_last_heartbeat = ServiceRunLastHeartbeatUpdate(
service_run_id=msg.service_run_id, last_heartbeat_at=msg.created_at
)

running_service = await resource_tracker_repo.update_service_run_last_heartbeat(
update_service_run_last_heartbeat
)
Expand Down Expand Up @@ -184,6 +216,26 @@ async def _process_stop_event(
msg: RabbitResourceTrackingStoppedMessage,
rabbitmq_client: RabbitMQClient,
):
service_run_db = await resource_tracker_repo.get_service_run_by_id(
service_run_id=msg.service_run_id
)
if not service_run_db:
_logger.error(
"Recieved stop event for service_run_id: %s, but we do not have the started record in the DB, INVESTIGATE!",
msg.service_run_id,
)
return
if service_run_db.service_run_status in {
ServiceRunStatus.SUCCESS,
ServiceRunStatus.ERROR,
}:
_logger.error(
"Recieved stop event for service_run_id: %s, but it was already closed, INVESTIGATE!",
msg.service_run_id,
)
return

# Update `service run` record (if billable `credit transaction`) in the DB
_run_status, _run_status_msg = ServiceRunStatus.SUCCESS, None
if msg.simcore_platform_status is SimcorePlatformStatus.BAD:
_run_status, _run_status_msg = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def test_process_event_functions(
output = await assert_service_runs_db_row(postgres_db, msg.service_run_id)
assert output.stopped_at is None
assert output.service_run_status == "RUNNING"
first_occurence_of_last_heartbeat_at < output.last_heartbeat_at
assert first_occurence_of_last_heartbeat_at < output.last_heartbeat_at

stopped_msg = RabbitResourceTrackingStoppedMessage(
service_run_id=msg.service_run_id,
Expand Down

0 comments on commit 1c7a161

Please sign in to comment.