From cf8f3b6c11be6867fee71903944a073bb1a1f45a Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 10:13:57 +0200 Subject: [PATCH 01/17] start adding prometheus metrics for logstreaming --- .../core/_prometheus_instrumentation.py | 6 ++++++ .../simcore_service_api_server/services/log_streaming.py | 5 +++++ 2 files changed, 11 insertions(+) create mode 100644 services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py new file mode 100644 index 00000000000..5e6fafac711 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -0,0 +1,6 @@ +from dataclasses import dataclass + + +@dataclass(slots=True, kw_only=True) +class ApiServerPrometheusMetrics: + pass diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 44e6e2f2f76..137cfc60db9 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -100,6 +100,11 @@ async def deregister(self, job_id: JobID): ) del self._log_streamers[job_id] + async def log_queue_sizes(self) -> dict[JobID, int]: + keys: list[JobID] = list(self._log_streamers.keys()) + values: list[int] = [await q.qsize() for q in self._log_streamers.values()] + return dict(zip(keys, values)) + class LogStreamer: def __init__( From 938005fdb1aa6e77906e889d9a16ac40e2d4102f Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 11:55:38 +0200 Subject: [PATCH 02/17] first complete implementation of instrumentation for logstreaming --- .../core/_prometheus_instrumentation.py | 70 ++++++++++++++++++- .../core/settings.py | 3 +- .../services/log_streaming.py | 22 +++--- 3 files changed, 80 insertions(+), 15 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 5e6fafac711..079b79d226c 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -1,6 +1,72 @@ +import asyncio from dataclasses import dataclass +from typing import cast + +from attr import field +from fastapi import FastAPI +from prometheus_client import CollectorRegistry, Gauge +from pydantic import PositiveInt +from servicelib.fastapi.prometheus_instrumentation import ( + setup_prometheus_instrumentation as setup_rest_instrumentation, +) +from simcore_service_api_server.api.dependencies.rabbitmq import get_log_distributor +from simcore_service_api_server.models.schemas.jobs import JobID @dataclass(slots=True, kw_only=True) -class ApiServerPrometheusMetrics: - pass +class ApiServerPrometheusInstrumentation: + registry: CollectorRegistry + _logstreaming_queues: Gauge = field(init=False) + + def __post_init__(self) -> None: + self._logstreaming_queues = Gauge( + "log_stream_queue_length", "#Logs in log streaming queue", ["job_id"] + ) + + def update_metrics(self, log_queue_sizes: dict[JobID, int]): + self._logstreaming_queues.clear() + for job_id, length in log_queue_sizes.items(): + self._logstreaming_queues.labels(job_id=job_id).set(length) + + +async def collect_prometheus_metrics_task(app: FastAPI): + metrics_collect_seconds: PositiveInt = ( + app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS + ) + assert ( + app.state.instrumentation + ), "Instrumentation not setup. Please check the configuration" # nosec + instrumentation = get_instrumentation(app) + log_distributor = get_log_distributor(app) + while True: + await asyncio.sleep(metrics_collect_seconds) + instrumentation.update_metrics( + log_queue_sizes=log_distributor.log_queue_sizes() + ) + + +async def setup(app: FastAPI): + instrumentator = setup_rest_instrumentation(app) + get_log_distributor(app) # check log_distributor is already setup + + async def on_startup() -> None: + app.state.instrumentation = ApiServerPrometheusInstrumentation( + registry=instrumentator.registry + ) + app.state.instrumentation_task = asyncio.create_task( + collect_prometheus_metrics_task(app) + ) + + async def on_shutdown() -> None: + assert app.state.instrumentation_task # nosec + app.state.instrumentation_task.cancel() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +def get_instrumentation(app: FastAPI) -> ApiServerPrometheusInstrumentation: + assert ( + app.state.instrumentation + ), "Instrumentation not setup. Please check the configuration" # nosec + return cast(ApiServerPrometheusInstrumentation, app.state.instrumentation) diff --git a/services/api-server/src/simcore_service_api_server/core/settings.py b/services/api-server/src/simcore_service_api_server/core/settings.py index 8868464ff99..5e64c80c415 100644 --- a/services/api-server/src/simcore_service_api_server/core/settings.py +++ b/services/api-server/src/simcore_service_api_server/core/settings.py @@ -4,7 +4,7 @@ from typing import Any from models_library.basic_types import BootModeEnum, LogLevel -from pydantic import Field, NonNegativeInt, SecretStr, parse_obj_as +from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, parse_obj_as from pydantic.class_validators import validator from settings_library.base import BaseCustomSettings from settings_library.basic_types import PortInt, VersionTag @@ -137,6 +137,7 @@ class ApplicationSettings(BasicSettings): ) API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60 API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True + API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS: PositiveInt = 5 # DEV-TOOLS API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field( default=None, diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 137cfc60db9..fd4cc0985cd 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -1,7 +1,7 @@ import asyncio import logging from asyncio import Queue -from typing import AsyncIterable, Awaitable, Callable, Final +from typing import AsyncIterable, Final from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.users import UserID @@ -31,7 +31,7 @@ class LogStreamerRegistionConflict(LogDistributionBaseException): class LogDistributor: def __init__(self, rabbitmq_client: RabbitMQClient): self._rabbit_client = rabbitmq_client - self._log_streamers: dict[JobID, Callable[[JobLog], Awaitable[None]]] = {} + self._log_streamers: dict[JobID, Queue] = {} self._queue_name: str async def setup(self): @@ -72,22 +72,20 @@ async def _distribute_logs(self, data: bytes): log_level=got.log_level, messages=got.messages, ) - callback = self._log_streamers.get(item.job_id) - if callback is None: + queue = self._log_streamers.get(item.job_id) + if queue is None: raise LogStreamerNotRegistered( f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered" ) - await callback(item) + await queue.put(item) return True - async def register( - self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]] - ): + async def register(self, job_id: JobID, queue: Queue): if job_id in self._log_streamers: raise LogStreamerRegistionConflict( f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time" ) - self._log_streamers[job_id] = callback + self._log_streamers[job_id] = queue await self._rabbit_client.add_topics( LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"] ) @@ -100,9 +98,9 @@ async def deregister(self, job_id: JobID): ) del self._log_streamers[job_id] - async def log_queue_sizes(self) -> dict[JobID, int]: + def log_queue_sizes(self) -> dict[JobID, int]: keys: list[JobID] = list(self._log_streamers.keys()) - values: list[int] = [await q.qsize() for q in self._log_streamers.values()] + values: list[int] = list(map(lambda q: q.qsize(), self._log_streamers.values())) return dict(zip(keys, values)) @@ -125,7 +123,7 @@ def __init__( self._log_check_timeout: NonNegativeInt = log_check_timeout async def setup(self): - await self._log_distributor.register(self._job_id, self._queue.put) + await self._log_distributor.register(self._job_id, self._queue) self._is_registered = True async def teardown(self): From 8d5264975713c445e18215e7e4950cbc974dcf82 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 14:04:45 +0200 Subject: [PATCH 03/17] test logstreaming metrics collection --- .../api/dependencies/rabbitmq.py | 18 ++++++++++++++++- .../api/errors/custom_errors.py | 4 ++++ .../core/_prometheus_instrumentation.py | 20 ++++++++++++++----- .../core/application.py | 4 +--- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py index daf9272af4a..7218ae553dc 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py @@ -1,12 +1,17 @@ -from typing import Annotated, cast +import asyncio +from time import time +from typing import Annotated, Final, cast from fastapi import Depends, FastAPI from pydantic import NonNegativeInt +from servicelib.aiohttp.application_setup import ApplicationSetupError from servicelib.fastapi.dependencies import get_app from servicelib.rabbitmq import RabbitMQClient from ...services.log_streaming import LogDistributor +_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: Final[int] = 10 + def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient: assert app.state.rabbitmq_client # nosec @@ -18,6 +23,17 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib return cast(LogDistributor, app.state.log_distributor) +async def wait_till_log_distributor_ready(app) -> None: + start = time() + while not hasattr(app.state, "log_distributor"): + if time() - start > _MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: + raise ApplicationSetupError( + f"Api server's log_distributor was not ready within {_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS=} seconds" + ) + await asyncio.sleep(1) + return + + def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt: assert app.state.settings # nosec return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS) diff --git a/services/api-server/src/simcore_service_api_server/api/errors/custom_errors.py b/services/api-server/src/simcore_service_api_server/api/errors/custom_errors.py index 73d59598ca6..0fe5f648823 100644 --- a/services/api-server/src/simcore_service_api_server/api/errors/custom_errors.py +++ b/services/api-server/src/simcore_service_api_server/api/errors/custom_errors.py @@ -19,6 +19,10 @@ class MissingWallet(CustomBaseError): pass +class ApplicationSetupError(CustomBaseError): + pass + + async def custom_error_handler(_: Request, exc: CustomBaseError): if isinstance(exc, InsufficientCredits): return JSONResponse( diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 079b79d226c..7be91c4fd04 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -1,6 +1,6 @@ import asyncio from dataclasses import dataclass -from typing import cast +from typing import Final, cast from attr import field from fastapi import FastAPI @@ -9,9 +9,16 @@ from servicelib.fastapi.prometheus_instrumentation import ( setup_prometheus_instrumentation as setup_rest_instrumentation, ) -from simcore_service_api_server.api.dependencies.rabbitmq import get_log_distributor +from simcore_service_api_server.api.dependencies.rabbitmq import ( + get_log_distributor, + wait_till_log_distributor_ready, +) from simcore_service_api_server.models.schemas.jobs import JobID +from .._meta import PROJECT_NAME + +METRICS_NAMESPACE: Final[str] = PROJECT_NAME.replace("-", "_") + @dataclass(slots=True, kw_only=True) class ApiServerPrometheusInstrumentation: @@ -20,7 +27,10 @@ class ApiServerPrometheusInstrumentation: def __post_init__(self) -> None: self._logstreaming_queues = Gauge( - "log_stream_queue_length", "#Logs in log streaming queue", ["job_id"] + "log_stream_queue_length", + "#Logs in log streaming queue", + ["job_id"], + namespace=METRICS_NAMESPACE, ) def update_metrics(self, log_queue_sizes: dict[JobID, int]): @@ -37,6 +47,7 @@ async def collect_prometheus_metrics_task(app: FastAPI): app.state.instrumentation ), "Instrumentation not setup. Please check the configuration" # nosec instrumentation = get_instrumentation(app) + await wait_till_log_distributor_ready(app) log_distributor = get_log_distributor(app) while True: await asyncio.sleep(metrics_collect_seconds) @@ -45,9 +56,8 @@ async def collect_prometheus_metrics_task(app: FastAPI): ) -async def setup(app: FastAPI): +def setup_prometheus_instrumentation(app: FastAPI): instrumentator = setup_rest_instrumentation(app) - get_log_distributor(app) # check log_distributor is already setup async def on_startup() -> None: app.state.instrumentation = ApiServerPrometheusInstrumentation( diff --git a/services/api-server/src/simcore_service_api_server/core/application.py b/services/api-server/src/simcore_service_api_server/core/application.py index 7f31f2ecf52..9775d298825 100644 --- a/services/api-server/src/simcore_service_api_server/core/application.py +++ b/services/api-server/src/simcore_service_api_server/core/application.py @@ -5,9 +5,6 @@ from fastapi_pagination import add_pagination from httpx import HTTPError as HttpxException from models_library.basic_types import BootModeEnum -from servicelib.fastapi.prometheus_instrumentation import ( - setup_prometheus_instrumentation, -) from servicelib.logging_utils import config_all_loggers from simcore_service_api_server.api.errors.log_handling_error import ( log_handling_error_handler, @@ -30,6 +27,7 @@ from ..api.routes.health import router as health_router from ..services import catalog, director_v2, storage, webserver from ..services.rabbitmq import setup_rabbitmq +from ._prometheus_instrumentation import setup_prometheus_instrumentation from .events import create_start_app_handler, create_stop_app_handler from .openapi import override_openapi_method, use_route_names_as_operation_ids from .settings import ApplicationSettings From 704b5aa9ebd323bef06fff0927556cff32bec363 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 14:24:22 +0200 Subject: [PATCH 04/17] log_queue-sizes -> get_log_queue_sizes --- .../core/_prometheus_instrumentation.py | 2 +- .../src/simcore_service_api_server/services/log_streaming.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 7be91c4fd04..6ff0d898395 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -52,7 +52,7 @@ async def collect_prometheus_metrics_task(app: FastAPI): while True: await asyncio.sleep(metrics_collect_seconds) instrumentation.update_metrics( - log_queue_sizes=log_distributor.log_queue_sizes() + log_queue_sizes=log_distributor.get_log_queue_sizes() ) diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index fd4cc0985cd..660fd032e0e 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -98,7 +98,7 @@ async def deregister(self, job_id: JobID): ) del self._log_streamers[job_id] - def log_queue_sizes(self) -> dict[JobID, int]: + def get_log_queue_sizes(self) -> dict[JobID, int]: keys: list[JobID] = list(self._log_streamers.keys()) values: list[int] = list(map(lambda q: q.qsize(), self._log_streamers.values())) return dict(zip(keys, values)) From 325f3801cfa9e4f65bd0ca5a87c387438b319ecb Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 16:09:19 +0200 Subject: [PATCH 05/17] fix unit tests --- .../tests/unit/test_services_rabbitmq.py | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index d5e2cf71a66..9c20d90014f 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -16,6 +16,7 @@ import httpx import pytest import respx +from attr import dataclass from faker import Faker from fastapi import FastAPI, status from fastapi.encoders import jsonable_encoder @@ -113,12 +114,18 @@ async def test_subscribe_publish_receive_logs( log_distributor: LogDistributor, mocker: MockerFixture, ): - async def _consumer_message_handler(job_log: JobLog): - _consumer_message_handler.called = True - _consumer_message_handler.job_log = job_log - assert isinstance(job_log, JobLog) + @dataclass + class MockQueue: + called: bool = False + job_log: JobLog | None = None - await log_distributor.register(project_id, _consumer_message_handler) + async def put(self, job_log: JobLog): + self.called = True + self.job_log = job_log + assert isinstance(job_log, JobLog) + + mock_queue = MockQueue() + await log_distributor.register(project_id, mock_queue) # type: ignore # log producer rabbitmq_producer = create_rabbitmq_client("pytest_producer") @@ -128,16 +135,14 @@ async def _consumer_message_handler(job_log: JobLog): node_id=node_id, messages=[faker.text() for _ in range(10)], ) - _consumer_message_handler.called = False - _consumer_message_handler.job_log = None await rabbitmq_producer.publish(log_message.channel_name, log_message) # check it received await asyncio.sleep(1) await log_distributor.deregister(project_id) - assert _consumer_message_handler.called - job_log = _consumer_message_handler.job_log + assert mock_queue.called + job_log = mock_queue.job_log assert isinstance(job_log, JobLog) assert job_log.job_id == log_message.project_id @@ -147,12 +152,13 @@ async def rabbit_consuming_context( app: FastAPI, project_id: ProjectID, ) -> AsyncIterable[AsyncMock]: - consumer_message_handler = AsyncMock() + queue = asyncio.Queue() + queue.put = AsyncMock() log_distributor: LogDistributor = get_log_distributor(app) - await log_distributor.register(project_id, consumer_message_handler) + await log_distributor.register(project_id, queue) - yield consumer_message_handler + yield queue.put await log_distributor.deregister(project_id) @@ -233,10 +239,12 @@ async def test_log_distributor_register_deregister( ): collected_logs: list[str] = [] - async def callback(job_log: JobLog): - for msg in job_log.messages: - collected_logs.append(msg) + class MockQueue: + async def put(self, job_log: JobLog): + for msg in job_log.messages: + collected_logs.append(msg) + queue = MockQueue() published_logs: list[str] = [] async def _log_publisher(): @@ -246,12 +254,12 @@ async def _log_publisher(): await produce_logs("expected", project_id, node_id, [msg], logging.DEBUG) published_logs.append(msg) - await log_distributor.register(project_id, callback) + await log_distributor.register(project_id, queue) # type: ignore publisher_task = asyncio.create_task(_log_publisher()) await asyncio.sleep(0.1) await log_distributor.deregister(project_id) await asyncio.sleep(0.1) - await log_distributor.register(project_id, callback) + await log_distributor.register(project_id, queue) # type: ignore await asyncio.gather(publisher_task) await asyncio.sleep(0.5) await log_distributor.deregister(project_id) @@ -274,12 +282,14 @@ async def test_log_distributor_multiple_streams( collected_logs: dict[JobID, list[str]] = {id_: [] for id_ in job_ids} - async def callback(job_log: JobLog): - job_id = job_log.job_id - assert (msgs := collected_logs.get(job_id)) is not None - for msg in job_log.messages: - msgs.append(msg) + class MockQueue: + async def put(self, job_log: JobLog): + job_id = job_log.job_id + assert (msgs := collected_logs.get(job_id)) is not None + for msg in job_log.messages: + msgs.append(msg) + queue = MockQueue() published_logs: dict[JobID, list[str]] = {id_: [] for id_ in job_ids} async def _log_publisher(): @@ -291,7 +301,7 @@ async def _log_publisher(): published_logs[job_id].append(msg) for job_id in job_ids: - await log_distributor.register(job_id, callback) + await log_distributor.register(job_id, queue) # type: ignore publisher_task = asyncio.create_task(_log_publisher()) await asyncio.gather(publisher_task) await asyncio.sleep(0.5) From 0dfa82f0c81d91c0f9257383c6c51f8962064924 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 16:10:58 +0200 Subject: [PATCH 06/17] make pylint happy --- services/api-server/tests/unit/test_services_rabbitmq.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index 9c20d90014f..a58d99b54e2 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -3,6 +3,7 @@ # pylint: disable=too-many-arguments # pylint: disable=unused-argument # pylint: disable=unused-variable +# pylint: disable=R6301 import asyncio import logging From a7620c2a96e17fcfb75d00a668987a0637ca62a0 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 16:28:35 +0200 Subject: [PATCH 07/17] @pcrespov @GitHK use tenacity --- .../api/dependencies/rabbitmq.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py index 7218ae553dc..f7a9939f817 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py @@ -1,5 +1,4 @@ -import asyncio -from time import time +import logging from typing import Annotated, Final, cast from fastapi import Depends, FastAPI @@ -7,11 +6,14 @@ from servicelib.aiohttp.application_setup import ApplicationSetupError from servicelib.fastapi.dependencies import get_app from servicelib.rabbitmq import RabbitMQClient +from tenacity import before_sleep_log, retry, stop_after_delay, wait_fixed from ...services.log_streaming import LogDistributor _MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: Final[int] = 10 +_logger = logging.getLogger(__name__) + def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient: assert app.state.rabbitmq_client # nosec @@ -23,14 +25,17 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib return cast(LogDistributor, app.state.log_distributor) +@retry( + wait=wait_fixed(2), + stop=stop_after_delay(_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS), + before_sleep=before_sleep_log(_logger, logging.WARNING), + reraise=True, +) async def wait_till_log_distributor_ready(app) -> None: - start = time() - while not hasattr(app.state, "log_distributor"): - if time() - start > _MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: - raise ApplicationSetupError( - f"Api server's log_distributor was not ready within {_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS=} seconds" - ) - await asyncio.sleep(1) + if not hasattr(app.state, "log_distributor"): + raise ApplicationSetupError( + f"Api server's log_distributor was not ready within {_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS=} seconds" + ) return From 7abeb1e6a78ba5fa638fda5f1d85a816db1ae6a6 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 16:31:28 +0200 Subject: [PATCH 08/17] @pcrespov protect method --- .../core/_prometheus_instrumentation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 6ff0d898395..28af301d8b5 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -39,7 +39,7 @@ def update_metrics(self, log_queue_sizes: dict[JobID, int]): self._logstreaming_queues.labels(job_id=job_id).set(length) -async def collect_prometheus_metrics_task(app: FastAPI): +async def _collect_prometheus_metrics_task(app: FastAPI): metrics_collect_seconds: PositiveInt = ( app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS ) @@ -64,7 +64,7 @@ async def on_startup() -> None: registry=instrumentator.registry ) app.state.instrumentation_task = asyncio.create_task( - collect_prometheus_metrics_task(app) + _collect_prometheus_metrics_task(app) ) async def on_shutdown() -> None: From b936a78fa96e013725b8cacc87966b32fece0ad9 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 16:37:49 +0200 Subject: [PATCH 09/17] ensure task is properly cancelled @pcrespov --- .../core/_prometheus_instrumentation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 28af301d8b5..282a2d9fd1a 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -70,6 +70,10 @@ async def on_startup() -> None: async def on_shutdown() -> None: assert app.state.instrumentation_task # nosec app.state.instrumentation_task.cancel() + try: + await app.state.instrumentation_task + except asyncio.CancelledError: + pass app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) From 5d74c5e3acdaeea2cd6dd976440d2b69ce2365e6 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 16:39:31 +0200 Subject: [PATCH 10/17] move nosec --- .../core/_prometheus_instrumentation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 282a2d9fd1a..09e40dc63d2 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -43,9 +43,9 @@ async def _collect_prometheus_metrics_task(app: FastAPI): metrics_collect_seconds: PositiveInt = ( app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS ) - assert ( + assert ( # nosec app.state.instrumentation - ), "Instrumentation not setup. Please check the configuration" # nosec + ), "Instrumentation not setup. Please check the configuration" instrumentation = get_instrumentation(app) await wait_till_log_distributor_ready(app) log_distributor = get_log_distributor(app) From 21c3926cf12b2e4dfeb0389da618250fb7f89b32 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 17:35:37 +0200 Subject: [PATCH 11/17] use backgroundtask @sanderegg @pcrespov --- .../core/_prometheus_instrumentation.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 09e40dc63d2..d180d6a275e 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -1,11 +1,13 @@ import asyncio from dataclasses import dataclass +from functools import partial from typing import Final, cast from attr import field from fastapi import FastAPI from prometheus_client import CollectorRegistry, Gauge from pydantic import PositiveInt +from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.fastapi.prometheus_instrumentation import ( setup_prometheus_instrumentation as setup_rest_instrumentation, ) @@ -63,17 +65,15 @@ async def on_startup() -> None: app.state.instrumentation = ApiServerPrometheusInstrumentation( registry=instrumentator.registry ) - app.state.instrumentation_task = asyncio.create_task( - _collect_prometheus_metrics_task(app) + app.state.instrumentation_task = start_periodic_task( + task=partial(_collect_prometheus_metrics_task, app), + interval=app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS, + task_name="prometheus_metrics_collection_task", ) async def on_shutdown() -> None: assert app.state.instrumentation_task # nosec - app.state.instrumentation_task.cancel() - try: - await app.state.instrumentation_task - except asyncio.CancelledError: - pass + await stop_periodic_task(app.state.instrumentation_task) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) From dd9b18bb6e42dba5b279727f33c031ba6c33cd09 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 2 Apr 2024 18:23:45 +0200 Subject: [PATCH 12/17] correct background task --- .../core/_prometheus_instrumentation.py | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index d180d6a275e..e55477d47fa 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -1,12 +1,10 @@ -import asyncio from dataclasses import dataclass -from functools import partial +from datetime import timedelta from typing import Final, cast from attr import field from fastapi import FastAPI from prometheus_client import CollectorRegistry, Gauge -from pydantic import PositiveInt from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.fastapi.prometheus_instrumentation import ( setup_prometheus_instrumentation as setup_rest_instrumentation, @@ -42,20 +40,9 @@ def update_metrics(self, log_queue_sizes: dict[JobID, int]): async def _collect_prometheus_metrics_task(app: FastAPI): - metrics_collect_seconds: PositiveInt = ( - app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS + get_instrumentation(app).update_metrics( + log_queue_sizes=get_log_distributor(app).get_log_queue_sizes() ) - assert ( # nosec - app.state.instrumentation - ), "Instrumentation not setup. Please check the configuration" - instrumentation = get_instrumentation(app) - await wait_till_log_distributor_ready(app) - log_distributor = get_log_distributor(app) - while True: - await asyncio.sleep(metrics_collect_seconds) - instrumentation.update_metrics( - log_queue_sizes=log_distributor.get_log_queue_sizes() - ) def setup_prometheus_instrumentation(app: FastAPI): @@ -65,10 +52,14 @@ async def on_startup() -> None: app.state.instrumentation = ApiServerPrometheusInstrumentation( registry=instrumentator.registry ) + await wait_till_log_distributor_ready(app) app.state.instrumentation_task = start_periodic_task( - task=partial(_collect_prometheus_metrics_task, app), - interval=app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS, + task=_collect_prometheus_metrics_task, + interval=timedelta( + seconds=app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS + ), task_name="prometheus_metrics_collection_task", + app=app, ) async def on_shutdown() -> None: From ed71024a62fd120b00f95c967c7727c7cf99e876 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 3 Apr 2024 08:22:50 +0200 Subject: [PATCH 13/17] dont preomptimize @pcrespov --- .../src/simcore_service_api_server/services/log_streaming.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 660fd032e0e..457a196f7db 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -99,9 +99,7 @@ async def deregister(self, job_id: JobID): del self._log_streamers[job_id] def get_log_queue_sizes(self) -> dict[JobID, int]: - keys: list[JobID] = list(self._log_streamers.keys()) - values: list[int] = list(map(lambda q: q.qsize(), self._log_streamers.values())) - return dict(zip(keys, values)) + return {k: v.qsize() for k, v in self._log_streamers.items()} class LogStreamer: From e975d37b3b548907a3220e60d841ab1ec3d8bbf8 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 3 Apr 2024 09:01:24 +0200 Subject: [PATCH 14/17] mock setup_prometheus when rabbit is not available --- services/api-server/tests/unit/conftest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/api-server/tests/unit/conftest.py b/services/api-server/tests/unit/conftest.py index 3897f0a6f7f..4644a1a30d6 100644 --- a/services/api-server/tests/unit/conftest.py +++ b/services/api-server/tests/unit/conftest.py @@ -87,6 +87,9 @@ def mock_missing_plugins(app_environment: EnvVarsDict, mocker: MockerFixture): settings = ApplicationSettings.create_from_envs() if settings.API_SERVER_RABBITMQ is None: mocker.patch("simcore_service_api_server.core.application.setup_rabbitmq") + mocker.patch( + "simcore_service_api_server.core.application.setup_prometheus_instrumentation" + ) return app_environment From 889d8b86449eed5749ec80ce360a393eb24bed67 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 3 Apr 2024 09:08:37 +0200 Subject: [PATCH 15/17] correct import of field --- .../core/_prometheus_instrumentation.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index e55477d47fa..3922784510a 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -1,8 +1,7 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import timedelta from typing import Final, cast -from attr import field from fastapi import FastAPI from prometheus_client import CollectorRegistry, Gauge from servicelib.background_task import start_periodic_task, stop_periodic_task From a584d65fc4d7e416882dfd5b882cfb844e218893 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 3 Apr 2024 09:21:16 +0200 Subject: [PATCH 16/17] fix tests --- services/api-server/tests/unit/_with_db/conftest.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/services/api-server/tests/unit/_with_db/conftest.py b/services/api-server/tests/unit/_with_db/conftest.py index 3570c60dbb1..8ee0e72b5c6 100644 --- a/services/api-server/tests/unit/_with_db/conftest.py +++ b/services/api-server/tests/unit/_with_db/conftest.py @@ -78,7 +78,9 @@ def docker_compose_file( def postgres_service(docker_services, docker_ip, docker_compose_file: Path) -> dict: # check docker-compose's environ is resolved properly config = yaml.safe_load(docker_compose_file.read_text()) - environ = config["services"]["postgres"]["environment"] + environ = dict( + tuple(s.split("=")) for s in config["services"]["postgres"]["environment"] + ) # builds DSN config = { @@ -154,6 +156,9 @@ def app_environment( ) -> EnvVarsDict: """app environments WITH database settings""" mocker.patch("simcore_service_api_server.core.application.setup_rabbitmq") + mocker.patch( + "simcore_service_api_server.core.application.setup_prometheus_instrumentation" + ) envs = setenvs_from_dict(monkeypatch, default_app_env_vars) assert "API_SERVER_POSTGRES" not in envs From a30c3a391abff89136a4dccc9fc31d3602e22ee2 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 3 Apr 2024 09:35:09 +0200 Subject: [PATCH 17/17] redo change to dict --- services/api-server/tests/unit/_with_db/conftest.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/api-server/tests/unit/_with_db/conftest.py b/services/api-server/tests/unit/_with_db/conftest.py index 8ee0e72b5c6..5aa6e190749 100644 --- a/services/api-server/tests/unit/_with_db/conftest.py +++ b/services/api-server/tests/unit/_with_db/conftest.py @@ -78,9 +78,7 @@ def docker_compose_file( def postgres_service(docker_services, docker_ip, docker_compose_file: Path) -> dict: # check docker-compose's environ is resolved properly config = yaml.safe_load(docker_compose_file.read_text()) - environ = dict( - tuple(s.split("=")) for s in config["services"]["postgres"]["environment"] - ) + environ = config["services"]["postgres"]["environment"] # builds DSN config = {