From daef85c3aebb33b4a42b7a5f7fa1f49f78b21af6 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard <126242332+bisgaard-itis@users.noreply.github.com> Date: Wed, 7 Feb 2024 10:59:07 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=97=EF=B8=8F=20Add=20DEBUG=20log=20messag?= =?UTF-8?q?es=20in=20the=20callback=20the=20API-server=20passes=20to=20the?= =?UTF-8?q?=20RabbitMQ=20client=20(#5312)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../services/log_streaming.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 8c9ec8193e4..271ab521766 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -1,18 +1,20 @@ import asyncio +import logging from asyncio import Queue from datetime import datetime, timezone from typing import AsyncIterable, Awaitable, Callable, Final from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.users import UserID -from pydantic import NonNegativeInt, PositiveInt +from pydantic import NonNegativeInt, ValidationError from servicelib.rabbitmq import RabbitMQClient from ..models.schemas.jobs import JobID, JobLog from .director_v2 import DirectorV2Api +_logger = logging.getLogger(__name__) + _NEW_LINE: Final[str] = "\n" -_SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS: Final[PositiveInt] = 10 class LogDistributionBaseException(Exception): @@ -52,7 +54,19 @@ async def __aexit__(self, exc_type, exc, tb): await self.teardown() async def _distribute_logs(self, data: bytes): - got = LoggerRabbitMessage.parse_raw(data) + try: + got = LoggerRabbitMessage.parse_raw( + data + ) # rabbitmq client safe_nacks the message if this deserialization fails + except ValidationError as e: + _logger.debug( + "Could not parse log message from RabbitMQ in LogDistributor._distribute_logs" + ) + raise e + _logger.debug( + "LogDistributor._distribute_logs received message message from RabbitMQ: %s", + got.json(), + ) item = JobLog( job_id=got.project_id, node_id=got.node_id,