diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index 542bd2ba2ee..96de471ed34 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -4,11 +4,16 @@ from typing import Final import aio_pika +from pydantic import NonNegativeInt from ..logging_utils import log_context from ._client_base import RabbitMQClientBase from ._models import MessageHandler, RabbitMessage -from ._utils import declare_queue, get_rabbitmq_client_unique_name +from ._utils import ( + RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, + declare_queue, + get_rabbitmq_client_unique_name, +) _logger = logging.getLogger(__name__) @@ -73,6 +78,7 @@ async def subscribe( *, exclusive_queue: bool = True, topics: list[str] | None = None, + message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, ) -> str: """subscribe to exchange_name calling message_handler for every incoming message - exclusive_queue: True means that every instance of this application will receive the incoming messages @@ -114,6 +120,7 @@ async def subscribe( self.client_name, exchange_name, exclusive_queue=exclusive_queue, + message_ttl=message_ttl, ) if topics is None: await queue.bind(exchange, routing_key="") diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index eaae5dc4e7d..afec7e908a5 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -4,6 +4,7 @@ from typing import Final import aio_pika +from pydantic import NonNegativeInt from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.stop import stop_after_delay @@ -16,8 +17,7 @@ _MINUTE: Final[int] = 60 - -_RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S: Final[int] = 15 * _MINUTE +RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS: Final[int] = 15 * _MINUTE * 1000 class RabbitMQRetryPolicyUponInitialization: @@ -55,11 +55,12 @@ async def declare_queue( exchange_name: str, *, exclusive_queue: bool, + message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, ) -> aio_pika.abc.AbstractRobustQueue: queue_parameters = { "durable": True, "exclusive": exclusive_queue, - "arguments": {"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S}, + "arguments": {"x-message-ttl": message_ttl}, "name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive", } if not exclusive_queue: diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py index 86ef25bd9bf..9db1486e97f 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py @@ -176,6 +176,19 @@ async def update_service_run_stopped_at( return None return ServiceRunDB.from_orm(row) + async def get_service_run_by_id( + self, service_run_id: ServiceRunId + ) -> ServiceRunDB | None: + async with self.db_engine.begin() as conn: + stmt = sa.select(resource_tracker_service_runs).where( + resource_tracker_service_runs.c.service_run_id == service_run_id + ) + result = await conn.execute(stmt) + row = result.first() + if row is None: + return None + return ServiceRunDB.from_orm(row) + async def list_service_runs_by_product_and_user_and_wallet( self, product_name: ProductName, diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker.py index b78489d1bbf..a99c0ac296a 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker.py @@ -23,6 +23,8 @@ _TASK_NAME_START_PERIODIC_TASK = "start_background_task" _TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES = "periodic_check_of_running_services" +_RUT_MESSAGE_TTL_IN_MS = 2 * 60 * 60 * 1000 # 2 hours + async def _subscribe_to_rabbitmq(app) -> str: with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): @@ -31,6 +33,7 @@ async def _subscribe_to_rabbitmq(app) -> str: RabbitResourceTrackingBaseMessage.get_channel_name(), message_handler=functools.partial(process_message, app), exclusive_queue=False, + message_ttl=_RUT_MESSAGE_TTL_IN_MS, ) return subscribed_queue diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_process_messages.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_process_messages.py index 8e274a73949..66af24e41b2 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_process_messages.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_process_messages.py @@ -63,12 +63,25 @@ async def _process_start_event( msg: RabbitResourceTrackingStartedMessage, rabbitmq_client: RabbitMQClient, ): + service_run_db = await resource_tracker_repo.get_service_run_by_id( + service_run_id=msg.service_run_id + ) + if service_run_db: + # NOTE: After we find out why sometimes RUT recieves multiple start events and fix it, we can change it to log level `error` + _logger.warning( + "On process start event the service run id %s already exists in DB, INVESTIGATE! Current msg created_at: %s, already stored msg created_at: %s", + msg.service_run_id, + msg.created_at, + service_run_db.started_at, + ) + return + + # Prepare `service run` record (if billable `credit transaction`) in the DB service_type = ( ResourceTrackerServiceType.COMPUTATIONAL_SERVICE if msg.service_type == ServiceType.COMPUTATIONAL else ResourceTrackerServiceType.DYNAMIC_SERVICE ) - pricing_unit_cost = None if msg.pricing_unit_cost_id: pricing_unit_cost_db = await resource_tracker_repo.get_pricing_unit_cost_by_id( @@ -134,10 +147,29 @@ async def _process_heartbeat_event( msg: RabbitResourceTrackingHeartbeatMessage, rabbitmq_client: RabbitMQClient, ): + service_run_db = await resource_tracker_repo.get_service_run_by_id( + service_run_id=msg.service_run_id + ) + if not service_run_db: + _logger.error( + "Recieved process heartbeat event for service_run_id: %s, but we do not have the started record in the DB, INVESTIGATE!", + msg.service_run_id, + ) + return + if service_run_db.service_run_status in { + ServiceRunStatus.SUCCESS, + ServiceRunStatus.ERROR, + }: + _logger.error( + "Recieved process heartbeat event for service_run_id: %s, but it was already closed, INVESTIGATE!", + msg.service_run_id, + ) + return + + # Update `service run` record (if billable `credit transaction`) in the DB update_service_run_last_heartbeat = ServiceRunLastHeartbeatUpdate( service_run_id=msg.service_run_id, last_heartbeat_at=msg.created_at ) - running_service = await resource_tracker_repo.update_service_run_last_heartbeat( update_service_run_last_heartbeat ) @@ -184,6 +216,26 @@ async def _process_stop_event( msg: RabbitResourceTrackingStoppedMessage, rabbitmq_client: RabbitMQClient, ): + service_run_db = await resource_tracker_repo.get_service_run_by_id( + service_run_id=msg.service_run_id + ) + if not service_run_db: + _logger.error( + "Recieved stop event for service_run_id: %s, but we do not have the started record in the DB, INVESTIGATE!", + msg.service_run_id, + ) + return + if service_run_db.service_run_status in { + ServiceRunStatus.SUCCESS, + ServiceRunStatus.ERROR, + }: + _logger.error( + "Recieved stop event for service_run_id: %s, but it was already closed, INVESTIGATE!", + msg.service_run_id, + ) + return + + # Update `service run` record (if billable `credit transaction`) in the DB _run_status, _run_status_msg = ServiceRunStatus.SUCCESS, None if msg.simcore_platform_status is SimcorePlatformStatus.BAD: _run_status, _run_status_msg = ( diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py b/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py index f238862a1a8..dec714a582b 100644 --- a/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py +++ b/services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message.py @@ -59,7 +59,7 @@ async def test_process_event_functions( output = await assert_service_runs_db_row(postgres_db, msg.service_run_id) assert output.stopped_at is None assert output.service_run_status == "RUNNING" - first_occurence_of_last_heartbeat_at < output.last_heartbeat_at + assert first_occurence_of_last_heartbeat_at < output.last_heartbeat_at stopped_msg = RabbitResourceTrackingStoppedMessage( service_run_id=msg.service_run_id,