diff --git a/packages/service-library/src/servicelib/instrumentation.py b/packages/service-library/src/servicelib/instrumentation.py new file mode 100644 index 00000000000..d1fa57f66e4 --- /dev/null +++ b/packages/service-library/src/servicelib/instrumentation.py @@ -0,0 +1,2 @@ +def get_metrics_namespace(application_name: str) -> str: + return application_name.replace("-", "_") diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 588ab9ecbfa..176635e1e88 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -53,10 +53,10 @@ async def wait_till_rabbitmq_responsive(url: str) -> bool: def get_rabbitmq_client_unique_name(base_name: str) -> str: - # NOTE: below prefix is guaranteed to change each time the preocess restarts - # Why is this desiarable? - # 1. the code base makes the above assumption, otherwise subcscribers and consumers do not work - # 2. enables restartability of webserver during [re]deploys + # NOTE: The prefix below will change every time the process restarts. + # Why is this necessary? + # 1. The codebase relies on this behavior; without it, subscribers and consumers will fail. + # 2. It allows the web server to be restarted seamlessly during [re]deployments. prefix_create_time = f"{psutil.Process(os.getpid()).create_time()}".strip(".")[-6:] return f"{base_name}_{socket.gethostname()}_{prefix_create_time}" diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py index 1cfa96a6f36..1224ea71907 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py @@ -1,8 +1,10 @@ from typing import Final +from servicelib.instrumentation import get_metrics_namespace + from ..._meta import APP_NAME -METRICS_NAMESPACE: Final[str] = APP_NAME.replace("-", "_") +METRICS_NAMESPACE: Final[str] = get_metrics_namespace(APP_NAME) EC2_INSTANCE_LABELS: Final[tuple[str, ...]] = ("instance_type",) CLUSTER_METRICS_DEFINITIONS: Final[dict[str, tuple[str, tuple[str, ...]]]] = { diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py index a2a99f4bea3..750f634bb3b 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py @@ -136,7 +136,6 @@ async def create_dynamic_service( logger.debug("Redirecting %s", redirect_url_with_query) return RedirectResponse(str(redirect_url_with_query)) - # if not await is_sidecar_running( service.node_uuid, dynamic_services_settings.DYNAMIC_SCHEDULER.SWARM_STACK_NAME ): diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index 6e499968c03..330717e6062 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -7,9 +7,6 @@ override_fastapi_openapi_method, ) from servicelib.fastapi.profiler_middleware import ProfilerMiddleware -from servicelib.fastapi.prometheus_instrumentation import ( - setup_prometheus_instrumentation, -) from servicelib.fastapi.tracing import setup_tracing from servicelib.logging_utils import config_all_loggers @@ -28,6 +25,7 @@ director_v0, dynamic_services, dynamic_sidecar, + instrumentation, notifier, rabbitmq, redis, @@ -192,7 +190,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: resource_usage_tracker_client.setup(app) if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED: - setup_prometheus_instrumentation(app) + instrumentation.setup(app) if settings.DIRECTOR_V2_TRACING: setup_tracing(app, app.state.settings.DIRECTOR_V2_TRACING, APP_NAME) diff --git a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py index 33272e9f946..3427a2bc525 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py @@ -2,12 +2,14 @@ import logging import re from collections.abc import Mapping +from datetime import datetime from enum import Enum from functools import cached_property from pathlib import Path from typing import Any, TypeAlias from uuid import UUID +import arrow from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceCreate from models_library.api_schemas_directorv2.dynamic_services_service import ( CommonServiceDetails, @@ -170,6 +172,28 @@ def mark_removed(self) -> None: self.was_removed = True +class ServicesInstrumentation(BaseModel): + start_requested_at: datetime | None = Field( + None, + description="moment in which the process of starting the service was requested", + ) + close_requested_at: datetime | None = Field( + None, + description="moment in which the process of stopping the service was requested", + ) + + def elapsed_since_start_request(self) -> float | None: + if self.start_requested_at is None: + return None + + return (arrow.utcnow().datetime - self.start_requested_at).total_seconds() + + def elapsed_since_close_request(self) -> float | None: + if self.close_requested_at is None: + return None + return (arrow.utcnow().datetime - self.close_requested_at).total_seconds() + + class DynamicSidecar(BaseModel): status: Status = Field( Status.create_as_initially_ok(), @@ -254,6 +278,11 @@ def compose_spec_submitted(self) -> bool: description="set True if the dy-sidecar saves the state and uploads the outputs", ) + instrumentation: ServicesInstrumentation = Field( + default_factory=lambda: ServicesInstrumentation.parse_obj({}), + description="keeps track times for various operations", + ) + # below had already been validated and # used only to start the proxy dynamic_sidecar_id: ServiceId | None = Field( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py index 190a35a315c..7ce782c6366 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py @@ -349,18 +349,20 @@ async def stop_service( progress_callback, ) - async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None: + async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> int: response = await self._thin_client.post_containers_tasks_state_restore( dynamic_sidecar_endpoint ) task_id: TaskId = response.json() - await self._await_for_result( + result: Any | None = await self._await_for_result( task_id, dynamic_sidecar_endpoint, self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT, _debug_progress_callback, ) + assert isinstance(result, int) # nosec + return result async def pull_user_services_images( self, dynamic_sidecar_endpoint: AnyHttpUrl @@ -381,18 +383,20 @@ async def save_service_state( self, dynamic_sidecar_endpoint: AnyHttpUrl, progress_callback: ProgressCallback | None = None, - ) -> None: + ) -> int: response = await self._thin_client.post_containers_tasks_state_save( dynamic_sidecar_endpoint ) task_id: TaskId = response.json() - await self._await_for_result( + result: Any | None = await self._await_for_result( task_id, dynamic_sidecar_endpoint, self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT, progress_callback, ) + assert isinstance(result, int) # nosec + return result async def pull_service_input_ports( self, @@ -416,18 +420,20 @@ async def pull_service_output_ports( self, dynamic_sidecar_endpoint: AnyHttpUrl, port_keys: list[str] | None = None, - ) -> None: + ) -> int: response = await self._thin_client.post_containers_tasks_ports_outputs_pull( dynamic_sidecar_endpoint, port_keys ) task_id: TaskId = response.json() - await self._await_for_result( + result: Any | None = await self._await_for_result( task_id, dynamic_sidecar_endpoint, self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT, _debug_progress_callback, ) + assert isinstance(result, int) # nosec + return result async def push_service_output_ports( self, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py index e88937b13b7..d9c9815b8d5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py @@ -19,6 +19,7 @@ DynamicServicesSchedulerSettings, ) from .....models.dynamic_services_scheduler import SchedulerData +from .....modules.instrumentation import get_instrumentation, get_metrics_labels from .....utils.db import get_repository from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository from ....db.repositories.projects import ProjectsRepository @@ -222,4 +223,12 @@ async def progress_create_containers( scheduler_data.dynamic_sidecar.were_containers_created = True + start_duration = ( + scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_start_request() + ) + assert start_duration is not None # nosec + get_instrumentation(app).dynamic_sidecar_metrics.start_time_duration.labels( + **get_metrics_labels(scheduler_data) + ).observe(start_duration) + _logger.info("Internal state after creating user services %s", scheduler_data) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 6fc6357ec3c..febf47040c0 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -44,6 +44,12 @@ DockerStatus, SchedulerData, ) +from .....modules.instrumentation import ( + get_instrumentation, + get_metrics_labels, + get_rate, + track_duration, +) from .....utils.db import get_repository from ....db.repositories.projects import ProjectsRepository from ....db.repositories.projects_networks import ProjectsNetworksRepository @@ -157,9 +163,15 @@ async def service_save_state( progress_callback: ProgressCallback | None = None, ) -> None: scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid) - await sidecars_client.save_service_state( - scheduler_data.endpoint, progress_callback=progress_callback - ) + + with track_duration() as duration: + size = await sidecars_client.save_service_state( + scheduler_data.endpoint, progress_callback=progress_callback + ) + get_instrumentation(app).dynamic_sidecar_metrics.push_service_state_rate.labels( + **get_metrics_labels(scheduler_data) + ).observe(get_rate(size, duration.to_flaot())) + await sidecars_client.update_volume_state( scheduler_data.endpoint, volume_category=VolumeCategory.STATES, @@ -375,6 +387,16 @@ async def attempt_pod_removal_and_data_saving( rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client await rabbitmq_client.publish(message.channel_name, message) + # metrics + + stop_duration = ( + scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_close_request() + ) + assert stop_duration is not None # nosec + get_instrumentation(app).dynamic_sidecar_metrics.stop_time_duration.labels( + **get_metrics_labels(scheduler_data) + ).observe(stop_duration) + async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None: _logger.debug("Attaching project networks for %s", scheduler_data.service_name) @@ -460,14 +482,44 @@ async def prepare_services_environment( ) ) + async def _pull_output_ports_with_metrics() -> None: + with track_duration() as duration: + size: int = await sidecars_client.pull_service_output_ports( + dynamic_sidecar_endpoint + ) + + get_instrumentation(app).dynamic_sidecar_metrics.output_ports_pull_rate.labels( + **get_metrics_labels(scheduler_data) + ).observe(get_rate(size, duration.to_flaot())) + + async def _pull_user_services_images_with_metrics() -> None: + with track_duration() as duration: + await sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint) + + get_instrumentation( + app + ).dynamic_sidecar_metrics.pull_user_services_images_duration.labels( + **get_metrics_labels(scheduler_data) + ).observe( + duration.to_flaot() + ) + + async def _restore_service_state_with_metrics() -> None: + with track_duration() as duration: + size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint) + + get_instrumentation(app).dynamic_sidecar_metrics.pull_service_state_rate.labels( + **get_metrics_labels(scheduler_data) + ).observe(get_rate(size, duration.to_flaot())) + tasks = [ - sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint), - sidecars_client.pull_service_output_ports(dynamic_sidecar_endpoint), + _pull_user_services_images_with_metrics(), + _pull_output_ports_with_metrics(), ] # When enabled no longer downloads state via nodeports # S3 is used to store state paths if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: - tasks.append(sidecars_client.restore_service_state(dynamic_sidecar_endpoint)) + tasks.append(_restore_service_state_with_metrics()) await limited_gather(*tasks, limit=3) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index b6a5571a4af..41031a60318 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -17,10 +17,12 @@ import contextlib import functools import logging +import time from asyncio import Lock, Queue, Task from dataclasses import dataclass, field from typing import Final +import arrow from fastapi import FastAPI from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceCreate, @@ -54,6 +56,11 @@ DynamicServicesSchedulerSettings, ) from .....models.dynamic_services_scheduler import SchedulerData, ServiceName +from .....modules.instrumentation import ( + get_instrumentation, + get_metrics_labels, + get_rate, +) from ...api_client import SidecarsClient, get_sidecars_client from ...docker_api import update_scheduler_data_label from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError @@ -255,6 +262,9 @@ async def add_service( request_simcore_user_agent=request_simcore_user_agent, can_save=can_save, ) + scheduler_data.dynamic_sidecar.instrumentation.start_requested_at = ( + arrow.utcnow().datetime + ) await self.add_service_from_scheduler_data(scheduler_data) async def add_service_from_scheduler_data( @@ -353,6 +363,10 @@ async def mark_service_for_removal( ) return + current.dynamic_sidecar.instrumentation.close_requested_at = ( + arrow.utcnow().datetime + ) + # PC-> ANE: could you please review what to do when can_save=None assert can_save is not None # nosec current.dynamic_sidecar.service_removal_state.mark_to_remove( @@ -455,9 +469,19 @@ async def retrieve_service_inputs( dynamic_sidecar_endpoint: AnyHttpUrl = scheduler_data.endpoint sidecars_client: SidecarsClient = await get_sidecars_client(self.app, node_uuid) + started = time.time() transferred_bytes = await sidecars_client.pull_service_input_ports( dynamic_sidecar_endpoint, port_keys ) + duration = time.time() - started + + get_instrumentation( + self.app + ).dynamic_sidecar_metrics.input_ports_pull_rate.labels( + **get_metrics_labels(scheduler_data) + ).observe( + get_rate(transferred_bytes, duration) + ) if scheduler_data.restart_policy == RestartPolicy.ON_INPUTS_DOWNLOADED: logger.info("Will restart containers") diff --git a/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/__init__.py new file mode 100644 index 00000000000..8c08a824d3f --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/__init__.py @@ -0,0 +1,10 @@ +from ._setup import get_instrumentation, setup +from ._utils import get_metrics_labels, get_rate, track_duration + +__all__: tuple[str, ...] = ( + "get_instrumentation", + "get_metrics_labels", + "get_rate", + "setup", + "track_duration", +) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py new file mode 100644 index 00000000000..5a8f692a124 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py @@ -0,0 +1,137 @@ +from dataclasses import dataclass, field +from typing import Final + +from prometheus_client import CollectorRegistry, Histogram +from pydantic import ByteSize, parse_obj_as +from servicelib.instrumentation import get_metrics_namespace + +from ..._meta import PROJECT_NAME + +_NAMESPACE_METRICS: Final[str] = get_metrics_namespace(PROJECT_NAME) +_SUBSYSTEM_NAME: Final[str] = "dynamic_services" +_INSTRUMENTATION_LABELS: Final[tuple[str, ...]] = ( + "user_id", + "wallet_id", + "service_key", + "service_version", +) + +_MINUTE: Final[int] = 60 +_BUCKETS_TIME_S: Final[tuple[float, ...]] = ( + 10, + 30, + 1 * _MINUTE, + 2 * _MINUTE, + 3 * _MINUTE, + 5 * _MINUTE, + 7 * _MINUTE, + 10 * _MINUTE, + 15 * _MINUTE, + 20 * _MINUTE, +) + + +_BUCKETS_RATE_BPS: Final[tuple[float, ...]] = tuple( + parse_obj_as(ByteSize, f"{m}MiB") + for m in ( + 1, + 30, + 60, + 90, + 120, + 150, + 200, + 300, + 400, + 500, + 600, + ) +) + + +@dataclass(slots=True, kw_only=True) +class DynamiSidecarMetrics: + + start_time_duration: Histogram = field(init=False) + stop_time_duration: Histogram = field(init=False) + pull_user_services_images_duration: Histogram = field(init=False) + + # ingress rates + output_ports_pull_rate: Histogram = field(init=False) + input_ports_pull_rate: Histogram = field(init=False) + pull_service_state_rate: Histogram = field(init=False) + + # egress rates + # NOTE: input ports are never pushed + # NOTE: output ports are pushed by the dy-sidecar, upon change making recovering the metric very complicated + push_service_state_rate: Histogram = field(init=False) + + def __post_init__(self) -> None: + self.start_time_duration = Histogram( + "start_time_duration_seconds", + "time to start dynamic-sidecar", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_TIME_S, + subsystem=_SUBSYSTEM_NAME, + ) + self.stop_time_duration = Histogram( + "stop_time_duration_seconds", + "time to stop dynamic-sidecar", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_TIME_S, + subsystem=_SUBSYSTEM_NAME, + ) + self.pull_user_services_images_duration = Histogram( + "pull_user_services_images_duration_seconds", + "time to pull docker images", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_RATE_BPS, + subsystem=_SUBSYSTEM_NAME, + ) + + self.output_ports_pull_rate = Histogram( + "output_ports_pull_rate_bps", + "rate at which output ports were pulled", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_RATE_BPS, + subsystem=_SUBSYSTEM_NAME, + ) + self.input_ports_pull_rate = Histogram( + "input_ports_pull_rate_bps", + "rate at which input ports were pulled", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_RATE_BPS, + subsystem=_SUBSYSTEM_NAME, + ) + self.pull_service_state_rate = Histogram( + "pull_service_state_rate_bps", + "rate at which service states were recovered", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_RATE_BPS, + subsystem=_SUBSYSTEM_NAME, + ) + + self.push_service_state_rate = Histogram( + "push_service_state_rate_bps", + "rate at which service states were saved", + labelnames=_INSTRUMENTATION_LABELS, + namespace=_NAMESPACE_METRICS, + buckets=_BUCKETS_RATE_BPS, + subsystem=_SUBSYSTEM_NAME, + ) + + +@dataclass(slots=True, kw_only=True) +class DirectorV2Instrumentation: + registry: CollectorRegistry + + dynamic_sidecar_metrics: DynamiSidecarMetrics = field(init=False) + + def __post_init__(self) -> None: + self.dynamic_sidecar_metrics = DynamiSidecarMetrics() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_setup.py b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_setup.py new file mode 100644 index 00000000000..889cb39a460 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_setup.py @@ -0,0 +1,28 @@ +from typing import cast + +from fastapi import FastAPI +from servicelib.fastapi.prometheus_instrumentation import ( + setup_prometheus_instrumentation, +) + +from ...core.errors import ConfigurationError +from ._models import DirectorV2Instrumentation + + +def setup(app: FastAPI) -> None: + instrumentator = setup_prometheus_instrumentation(app) + + async def on_startup() -> None: + app.state.instrumentation = DirectorV2Instrumentation( + registry=instrumentator.registry + ) + + app.add_event_handler("startup", on_startup) + + +def get_instrumentation(app: FastAPI) -> DirectorV2Instrumentation: + if not app.state.instrumentation: + raise ConfigurationError( + msg="Instrumentation not setup. Please check the configuration." + ) + return cast(DirectorV2Instrumentation, app.state.instrumentation) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_utils.py new file mode 100644 index 00000000000..96b23ae5f1f --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_utils.py @@ -0,0 +1,61 @@ +import time +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Final + +from pydantic import NonNegativeFloat + +from ...models.dynamic_services_scheduler import SchedulerData + +_EPSILON: Final[NonNegativeFloat] = 1e9 + + +def get_metrics_labels(scheduler_data: "SchedulerData") -> dict[str, str]: + return { + "user_id": f"{scheduler_data.user_id}", + "wallet_id": ( + f"{scheduler_data.wallet_info.wallet_id}" + if scheduler_data.wallet_info + else "" + ), + "service_key": scheduler_data.key, + "service_version": scheduler_data.version, + } + + +def get_rate( + size: NonNegativeFloat | None, duration: NonNegativeFloat +) -> NonNegativeFloat: + if size is None or size <= 0: + size = _EPSILON + return size / duration + + +class DeferredFloat: + def __init__(self): + self._value: float | None = None + + def set_value(self, value): + if not isinstance(value, float | int): + msg = "Value must be a float or an int." + raise TypeError(msg) + + self._value = float(value) + + def to_flaot(self) -> float: + if not isinstance(self._value, float): + msg = "Value must be a float or an int." + raise TypeError(msg) + + return self._value + + +@contextmanager +def track_duration() -> Iterator[DeferredFloat]: + duration = DeferredFloat() + start_time = time.time() + + yield duration + + end_time = time.time() + duration.set_value(end_time - start_time) diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index eafe6bb15fc..f107cfa54f5 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -208,6 +208,7 @@ async def client(mock_env: EnvVarsDict) -> AsyncIterator[TestClient]: async def initialized_app(mock_env: EnvVarsDict) -> AsyncIterable[FastAPI]: settings = AppSettings.create_from_envs() app = init_app(settings) + print("Application settings\n", settings.json(indent=2)) async with LifespanManager(app): yield app diff --git a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py index 56eaecdda47..dda04256537 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py +++ b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py @@ -187,6 +187,7 @@ async def director_v2_client( "REDIS_HOST": redis_settings.REDIS_HOST, "REDIS_PORT": f"{redis_settings.REDIS_PORT}", "REDIS_PASSWORD": f"{redis_settings.REDIS_PASSWORD.get_secret_value()}", + "DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED": "1", }, ) monkeypatch.delenv("DYNAMIC_SIDECAR_MOUNT_PATH_DEV", raising=False) diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 2fe09c42286..39b7fea4e5d 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -401,6 +401,7 @@ def mock_env( "COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED": "true", "COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL": dask_scheduler_service, "COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH": dask_scheduler_auth.json(), + "DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED": "1", }, ) monkeypatch.delenv("DYNAMIC_SIDECAR_MOUNT_PATH_DEV", raising=False) diff --git a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py index 134b9eaea74..646cb788ad7 100644 --- a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py +++ b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py @@ -65,7 +65,18 @@ def mock_env( minio_s3_settings_envs: EnvVarsDict, storage_service: URL, network_name: str, + services_endpoint: dict[str, URL], ) -> EnvVarsDict: + director_host = services_endpoint["director"].host + assert director_host + director_port = services_endpoint["director"].port + assert director_port + + catalog_host = services_endpoint["catalog"].host + assert catalog_host + catalog_port = services_endpoint["catalog"].port + assert catalog_port + env_vars: EnvVarsDict = { "DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS": "{}", "TRAEFIK_SIMCORE_ZONE": "test_traefik_zone", @@ -80,6 +91,11 @@ def mock_env( "COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED": "false", "COMPUTATIONAL_BACKEND_ENABLED": "false", "R_CLONE_PROVIDER": "MINIO", + "DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED": "1", + "DIRECTOR_HOST": director_host, + "DIRECTOR_PORT": f"{director_port}", + "CATALOG_HOST": catalog_host, + "CATALOG_PORT": f"{catalog_port}", } setenvs_from_dict(monkeypatch, env_vars) monkeypatch.delenv("DYNAMIC_SIDECAR_MOUNT_PATH_DEV", raising=False) @@ -158,7 +174,7 @@ def _assemble_node_data(spec: dict, label: str) -> dict[str, str]: @pytest.fixture async def ensure_services_stopped( dy_static_file_server_project: ProjectAtDB, - minimal_app: FastAPI, + initialized_app: FastAPI, ) -> AsyncIterable[None]: yield # ensure service cleanup when done testing @@ -177,7 +193,7 @@ async def ensure_services_stopped( # pylint: disable=protected-access scheduler_interval = ( - minimal_app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL + initialized_app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL ) # sleep enough to ensure the observation cycle properly stopped the service await asyncio.sleep(2 * scheduler_interval.total_seconds()) @@ -190,9 +206,10 @@ def mock_sidecars_client(mocker: MockerFixture) -> mock.Mock: "simcore_service_director_v2.modules.dynamic_sidecar.api_client.SidecarsClient" ) for function_name, return_value in [ - ("pull_service_output_ports", None), - ("restore_service_state", None), + ("pull_service_output_ports", 0), + ("restore_service_state", 0), ("push_service_output_ports", None), + ("save_service_state", 0), ]: mocker.patch( f"{class_path}.{function_name}", @@ -212,9 +229,8 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]: ) -@pytest.mark.flaky(max_runs=3) async def test_legacy_and_dynamic_sidecar_run( - minimal_app: FastAPI, + initialized_app: FastAPI, wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]], dy_static_file_server_project: ProjectAtDB, user_dict: dict[str, Any], @@ -263,13 +279,7 @@ async def test_legacy_and_dynamic_sidecar_run( if is_legacy(node): continue - # NOTE: it seems the minimal_app fixture does not contain the actual data - # so we use the one in the async_client??? very strange - await patch_dynamic_service_url( - # pylint: disable=protected-access - app=async_client._transport.app, # noqa: SLF001 # type: ignore - node_uuid=node_id, - ) + await patch_dynamic_service_url(app=initialized_app, node_uuid=node_id) assert len(dy_static_file_server_project.workbench) == 3 diff --git a/services/director-v2/tests/unit/test_modules_instrumentation__utils.py b/services/director-v2/tests/unit/test_modules_instrumentation__utils.py new file mode 100644 index 00000000000..8ebcada1fde --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_instrumentation__utils.py @@ -0,0 +1,10 @@ +import time + +from simcore_service_director_v2.modules.instrumentation._utils import track_duration + + +def test_track_duration(): + with track_duration() as duration: + time.sleep(0.1) + + assert duration.to_flaot() > 0.1 diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py b/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py index 4ddb656a0b2..cc0246bfec9 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py @@ -81,6 +81,8 @@ def minimal_config( monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "0") monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "0") + monkeypatch.setenv("DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED", "1") + @pytest.fixture(scope="session") def dynamic_sidecar_headers() -> dict[str, str]: diff --git a/services/director-v2/tests/unit/with_dbs/test_cli.py b/services/director-v2/tests/unit/with_dbs/test_cli.py index 0322610985c..43beec85900 100644 --- a/services/director-v2/tests/unit/with_dbs/test_cli.py +++ b/services/director-v2/tests/unit/with_dbs/test_cli.py @@ -81,6 +81,7 @@ def mock_save_service_state(mocker: MockerFixture) -> None: mocker.patch( "simcore_service_director_v2.modules.dynamic_sidecar.api_client._public.SidecarsClient.save_service_state", spec=True, + return_value=0, ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index e2be39b99a9..a8277415b06 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -321,6 +321,15 @@ async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus): progress.update(message="done", percent=ProgressPercent(0.99)) +def _get_satate_folders_size(paths: list[Path]) -> int: + total_size: int = 0 + for path in paths: + for file in path.rglob("*"): + if file.is_file(): + total_size += file.stat().st_size + return total_size + + async def _restore_state_folder( app: FastAPI, *, @@ -347,7 +356,7 @@ async def task_restore_state( settings: ApplicationSettings, mounted_volumes: MountedVolumes, app: FastAPI, -) -> None: +) -> int: # NOTE: the legacy data format was a zip file # this method will maintain retro compatibility. # The legacy archive is always downloaded and decompressed @@ -390,6 +399,8 @@ async def task_restore_state( ) progress.update(message="state restored", percent=ProgressPercent(0.99)) + return _get_satate_folders_size(state_paths) + async def _save_state_folder( app: FastAPI, @@ -419,7 +430,7 @@ async def task_save_state( settings: ApplicationSettings, mounted_volumes: MountedVolumes, app: FastAPI, -) -> None: +) -> int: """ Saves the states of the service. If a legacy archive is detected, it will be removed after @@ -453,6 +464,8 @@ async def task_save_state( await post_sidecar_log_message(app, "Finished state saving", log_level=logging.INFO) progress.update(message="finished state saving", percent=ProgressPercent(0.99)) + return _get_satate_folders_size(state_paths) + async def task_ports_inputs_pull( progress: TaskProgress, diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py index 1adde00e206..7493c6a1b7b 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py @@ -577,7 +577,7 @@ async def test_container_restore_state( status_poll_interval=FAST_STATUS_POLL, progress_callback=_debug_progress, ) as result: - assert result is None + assert isinstance(result, int) async def test_container_save_state( @@ -590,7 +590,7 @@ async def test_container_save_state( status_poll_interval=FAST_STATUS_POLL, progress_callback=_debug_progress, ) as result: - assert result is None + assert isinstance(result, int) @pytest.mark.parametrize("inputs_pulling_enabled", [True, False])