diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/__init__.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/__init__.py index e69de29bb2d..705c568225e 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/__init__.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/__init__.py @@ -0,0 +1,9 @@ +from typing import Final + +from pydantic import parse_obj_as + +from ..rabbitmq_basic_types import RPCNamespace + +DYNAMIC_SIDECAR_RPC_NAMESPACE: Final[RPCNamespace] = parse_obj_as( + RPCNamespace, "dynamic-sidecar" +) diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py index 22d151221d8..986b0c87215 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py @@ -1,9 +1,28 @@ from abc import abstractmethod -from pathlib import Path -from typing import Protocol +from enum import auto +from typing import Any, Final, Protocol -from models_library.projects_nodes_io import NodeID -from pydantic import BaseModel, ByteSize, Field +from pydantic import ( + BaseModel, + ByteSize, + Field, + NonNegativeFloat, + NonNegativeInt, + root_validator, + validator, +) + +from ..projects_nodes_io import NodeID +from ..utils.enums import StrAutoEnum + +_EPSILON: Final[NonNegativeFloat] = 1e-16 + + +class MountPathCategory(StrAutoEnum): + HOST = auto() + STATES_VOLUMES = auto() + INPUTS_VOLUMES = auto() + OUTPUTS_VOLUMES = auto() class SDiskUsageProtocol(Protocol): @@ -28,31 +47,71 @@ def percent(self) -> float: ... +def _get_percent(used: float, total: float) -> float: + return round(used * 100 / (total + _EPSILON), 2) + + class DiskUsage(BaseModel): used: ByteSize = Field(description="used space") free: ByteSize = Field(description="remaining space") total: ByteSize = Field(description="total space = free + used") - used_percent: float = Field( + used_percent: NonNegativeFloat = Field( gte=0.00, lte=100.00, description="Percent of used space relative to the total space", ) + @validator("free") @classmethod - def from_ps_util_disk_usage( - cls, ps_util_disk_usage: SDiskUsageProtocol + def _free_positive(cls, v: float) -> float: + if v < 0: + msg = f"free={v} cannot be a negative value" + raise ValueError(msg) + return v + + @validator("used") + @classmethod + def _used_positive(cls, v: float) -> float: + if v < 0: + msg = f"used={v} cannot be a negative value" + raise ValueError(msg) + return v + + @root_validator(pre=True) + @classmethod + def _check_total(cls, values: dict[str, Any]) -> dict[str, Any]: + total = values["total"] + free = values["free"] + used = values["used"] + if total != free + used: + msg = f"{total=} is different than the sum of {free=}+{used=} => sum={free+used}" + raise ValueError(msg) + return values + + @classmethod + def from_efs_guardian( + cls, used: NonNegativeInt, total: NonNegativeInt ) -> "DiskUsage": - total = ps_util_disk_usage.free + ps_util_disk_usage.used - used_percent = round(ps_util_disk_usage.used * 100 / total, 2) + free = total - used return cls( - used=ByteSize(ps_util_disk_usage.used), - free=ByteSize(ps_util_disk_usage.free), + used=ByteSize(used), + free=ByteSize(free), total=ByteSize(total), - used_percent=used_percent, + used_percent=_get_percent(used, total), ) + @classmethod + def from_ps_util_disk_usage( + cls, ps_util_disk_usage: SDiskUsageProtocol + ) -> "DiskUsage": + total = ps_util_disk_usage.free + ps_util_disk_usage.used + return cls.from_efs_guardian(ps_util_disk_usage.used, total) + + def __hash__(self): + return hash((self.used, self.free, self.total, self.used_percent)) + class ServiceDiskUsage(BaseModel): node_id: NodeID - usage: dict[Path, DiskUsage] + usage: dict[MountPathCategory, DiskUsage] diff --git a/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py b/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py index b3f2e6647ae..8de5c01dc83 100644 --- a/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py +++ b/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py @@ -2,6 +2,7 @@ import pytest from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage from psutil._common import sdiskusage +from pydantic import ByteSize, ValidationError def _assert_same_value(ps_util_disk_usage: sdiskusage) -> None: @@ -27,3 +28,35 @@ def test_disk_usage_regression_cases(ps_util_disk_usage: sdiskusage): def test_disk_usage(): ps_util_disk_usage = psutil.disk_usage("/") _assert_same_value(ps_util_disk_usage) + + +def test_from_efs_guardian_constructor(): + result = DiskUsage.from_efs_guardian(10, 100) + assert result.used == ByteSize(10) + assert result.free == ByteSize(90) + assert result.total == ByteSize(100) + assert result.used_percent == 10 + + +def test_failing_validation(): + with pytest.raises(ValidationError) as exc: + assert DiskUsage.from_efs_guardian(100, 10) + + assert "free=" in f"{exc.value}" + assert "negative value" in f"{exc.value}" + + with pytest.raises(ValidationError) as exc: + assert DiskUsage( + used=-10, # type: ignore + free=ByteSize(10), + total=ByteSize(0), + used_percent=-10, + ) + assert "used=" in f"{exc.value}" + assert "negative value" in f"{exc.value}" + + with pytest.raises(ValidationError) as exc: + DiskUsage( + used=ByteSize(10), free=ByteSize(10), total=ByteSize(21), used_percent=0 + ) + assert "is different than the sum of" in f"{exc.value}" diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/__init__.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/disk_usage.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/disk_usage.py new file mode 100644 index 00000000000..e8a23316e26 --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/disk_usage.py @@ -0,0 +1,22 @@ +import logging + +from models_library.api_schemas_dynamic_sidecar import DYNAMIC_SIDECAR_RPC_NAMESPACE +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.rabbitmq_basic_types import RPCMethodName +from pydantic import parse_obj_as +from servicelib.logging_utils import log_decorator +from servicelib.rabbitmq import RabbitMQRPCClient + +_logger = logging.getLogger(__name__) + + +@log_decorator(_logger, level=logging.DEBUG) +async def update_disk_usage( + rabbitmq_rpc_client: RabbitMQRPCClient, *, usage: dict[str, DiskUsage] +) -> None: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SIDECAR_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "update_disk_usage"), + usage=usage, + ) + assert result is None # nosec diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/__init__.py index 990722e1834..e69de29bb2d 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/__init__.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/__init__.py @@ -1,3 +0,0 @@ -from ._routing import get_main_router - -__all__: tuple[str, ...] = ("get_main_router",) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/__init__.py new file mode 100644 index 00000000000..990722e1834 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/__init__.py @@ -0,0 +1,3 @@ +from ._routing import get_main_router + +__all__: tuple[str, ...] = ("get_main_router",) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/_dependencies.py similarity index 76% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/_dependencies.py index 8992f73f5d6..a3b3f808173 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/_dependencies.py @@ -7,15 +7,16 @@ from fastapi import Depends, FastAPI, Request from fastapi.datastructures import State from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient -from ..core import rabbitmq -from ..core.settings import ApplicationSettings -from ..models.schemas.application_health import ApplicationHealth -from ..models.shared_store import SharedStore -from ..modules.inputs import InputsState -from ..modules.mounted_fs import MountedVolumes -from ..modules.outputs import OutputsContext, OutputsManager -from ..modules.prometheus_metrics import UserServicesMetrics +from ...core import rabbitmq +from ...core.settings import ApplicationSettings +from ...models.schemas.application_health import ApplicationHealth +from ...models.shared_store import SharedStore +from ...modules.inputs import InputsState +from ...modules.mounted_fs import MountedVolumes +from ...modules.outputs import OutputsContext, OutputsManager +from ...modules.prometheus_metrics import UserServicesMetrics def get_application(request: Request) -> FastAPI: @@ -84,3 +85,9 @@ def get_rabbitmq_client( app: Annotated[FastAPI, Depends(get_application)] ) -> RabbitMQClient: return rabbitmq.get_rabbitmq_client(app) + + +def get_rabbitmq_rpc_server( + app: Annotated[FastAPI, Depends(get_application)] +) -> RabbitMQRPCClient: + return rabbitmq.get_rabbitmq_rpc_server(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/_routing.py similarity index 94% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/_routing.py index 752be437b89..bff0bf16244 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/_routing.py @@ -5,8 +5,8 @@ from fastapi import APIRouter, FastAPI -from .._meta import API_VTAG -from ..core.settings import ApplicationSettings +from ..._meta import API_VTAG +from ...core.settings import ApplicationSettings from . import ( containers, containers_extension, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers.py similarity index 96% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers.py index 73c772cc44f..4269646e9bb 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers.py @@ -15,22 +15,22 @@ from pydantic import parse_raw_as from servicelib.fastapi.requests_decorators import cancel_on_disconnect -from ..core.docker_utils import docker_client -from ..core.errors import ( +from ...core.docker_utils import docker_client +from ...core.errors import ( ContainerExecCommandFailedError, ContainerExecContainerNotFoundError, ContainerExecTimeoutError, ) -from ..core.settings import ApplicationSettings -from ..core.validation import ( +from ...core.settings import ApplicationSettings +from ...core.validation import ( ComposeSpecValidation, parse_compose_spec, validate_compose_spec, ) -from ..models.schemas.containers import ContainersComposeSpec -from ..models.shared_store import SharedStore -from ..modules.container_utils import run_command_in_container -from ..modules.mounted_fs import MountedVolumes +from ...models.schemas.containers import ContainersComposeSpec +from ...models.shared_store import SharedStore +from ...modules.container_utils import run_command_in_container +from ...modules.mounted_fs import MountedVolumes from ._dependencies import ( get_container_restart_lock, get_mounted_volumes, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_extension.py similarity index 96% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_extension.py index 5a4578d532b..d5cf21b8723 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_extension.py @@ -9,10 +9,10 @@ from pydantic.main import BaseModel from simcore_sdk.node_ports_v2.port_utils import is_file_type -from ..core.docker_utils import docker_client -from ..modules.inputs import disable_inputs_pulling, enable_inputs_pulling -from ..modules.mounted_fs import MountedVolumes -from ..modules.outputs import ( +from ...core.docker_utils import docker_client +from ...modules.inputs import disable_inputs_pulling, enable_inputs_pulling +from ...modules.mounted_fs import MountedVolumes +from ...modules.outputs import ( OutputsContext, disable_event_propagation, enable_event_propagation, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py similarity index 96% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py index 52b0e2e7ad6..981edd42f7a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/containers_long_running_tasks.py @@ -11,12 +11,12 @@ ) from servicelib.fastapi.requests_decorators import cancel_on_disconnect -from ..core.settings import ApplicationSettings -from ..models.schemas.application_health import ApplicationHealth -from ..models.schemas.containers import ContainersCreate -from ..models.shared_store import SharedStore -from ..modules.inputs import InputsState -from ..modules.long_running_tasks import ( +from ...core.settings import ApplicationSettings +from ...models.schemas.application_health import ApplicationHealth +from ...models.schemas.containers import ContainersCreate +from ...models.shared_store import SharedStore +from ...modules.inputs import InputsState +from ...modules.long_running_tasks import ( task_containers_restart, task_create_service_containers, task_ports_inputs_pull, @@ -27,8 +27,8 @@ task_runs_docker_compose_down, task_save_state, ) -from ..modules.mounted_fs import MountedVolumes -from ..modules.outputs import OutputsManager +from ...modules.mounted_fs import MountedVolumes +from ...modules.outputs import OutputsManager from ._dependencies import ( get_application, get_application_health, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/disk.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/disk.py similarity index 81% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/disk.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/disk.py index 4a4f1ef35d0..f8ce581024a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/disk.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/disk.py @@ -1,6 +1,6 @@ from fastapi import APIRouter, status -from ..core.reserved_space import remove_reserved_disk_space +from ...core.reserved_space import remove_reserved_disk_space router = APIRouter() diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/health.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/health.py similarity index 70% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/health.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/health.py index 04ad5de0924..f55c8dad9ef 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/health.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/health.py @@ -3,9 +3,14 @@ from fastapi import APIRouter, Depends, HTTPException, status from models_library.errors import RABBITMQ_CLIENT_UNHEALTHY_MSG from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient -from ..models.schemas.application_health import ApplicationHealth -from ._dependencies import get_application_health, get_rabbitmq_client +from ...models.schemas.application_health import ApplicationHealth +from ._dependencies import ( + get_application_health, + get_rabbitmq_client, + get_rabbitmq_rpc_server, +) router = APIRouter() @@ -20,13 +25,14 @@ async def health_endpoint( application_health: Annotated[ApplicationHealth, Depends(get_application_health)], rabbitmq_client: Annotated[RabbitMQClient, Depends(get_rabbitmq_client)], + rabbitmq_rpc_server: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_server)], ) -> ApplicationHealth: if not application_health.is_healthy: raise HTTPException( status.HTTP_503_SERVICE_UNAVAILABLE, detail=application_health.dict() ) - if not rabbitmq_client.healthy: + if not rabbitmq_client.healthy or not rabbitmq_rpc_server.healthy: raise HTTPException( status.HTTP_503_SERVICE_UNAVAILABLE, detail=RABBITMQ_CLIENT_UNHEALTHY_MSG ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/prometheus_metrics.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/prometheus_metrics.py similarity index 94% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/prometheus_metrics.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/prometheus_metrics.py index 1b0ae0b7c8b..298f2a84ac0 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/prometheus_metrics.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/prometheus_metrics.py @@ -3,7 +3,7 @@ from fastapi import APIRouter, Depends, status from fastapi.responses import PlainTextResponse -from ..modules.prometheus_metrics import UserServicesMetrics +from ...modules.prometheus_metrics import UserServicesMetrics from ._dependencies import get_user_services_metrics router = APIRouter() diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/volumes.py similarity index 94% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/volumes.py index 5620f6bc11d..ac2e833a3ab 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rest/volumes.py @@ -4,7 +4,7 @@ from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus from pydantic import BaseModel -from ..models.shared_store import SharedStore +from ...models.shared_store import SharedStore from ._dependencies import get_shared_store router = APIRouter() diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/_disk_usage.py new file mode 100644 index 00000000000..6968250005c --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/_disk_usage.py @@ -0,0 +1,12 @@ +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from servicelib.rabbitmq import RPCRouter + +from ...modules.system_monitor import get_disk_usage_monitor + +router = RPCRouter() + + +@router.expose() +async def update_disk_usage(app: FastAPI, *, usage: dict[str, DiskUsage]) -> None: + get_disk_usage_monitor(app).set_disk_usage_for_path(usage) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/routes.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/routes.py new file mode 100644 index 00000000000..2772c9d863c --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/routes.py @@ -0,0 +1,19 @@ +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar import DYNAMIC_SIDECAR_RPC_NAMESPACE +from servicelib.rabbitmq import RPCRouter + +from ...core.rabbitmq import get_rabbitmq_rpc_server +from . import _disk_usage + +ROUTERS: list[RPCRouter] = [ + _disk_usage.router, +] + + +def setup_rpc_api_routes(app: FastAPI) -> None: + async def startup() -> None: + rpc_server = get_rabbitmq_rpc_server(app) + for router in ROUTERS: + await rpc_server.register_router(router, DYNAMIC_SIDECAR_RPC_NAMESPACE, app) + + app.add_event_handler("startup", startup) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index 20029cac7fc..8592afd2440 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -13,7 +13,8 @@ from simcore_sdk.node_ports_common.exceptions import NodeNotFound from .._meta import API_VERSION, API_VTAG, PROJECT_NAME, SUMMARY, __version__ -from ..api import get_main_router +from ..api.rest import get_main_router +from ..api.rpc.routes import setup_rpc_api_routes from ..models.schemas.application_health import ApplicationHealth from ..models.shared_store import SharedStore, setup_shared_store from ..modules.attribute_monitor import setup_attribute_monitor @@ -171,6 +172,7 @@ def create_app(): application_settings: ApplicationSettings = app.state.settings setup_rabbitmq(app) + setup_rpc_api_routes(app) setup_background_log_fetcher(app) setup_resource_tracking(app) setup_notifications(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index 0ce567648d9..f33a43d33fb 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -16,6 +16,7 @@ ) from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context from servicelib.rabbitmq import RabbitMQClient, is_rabbitmq_responsive +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient from settings_library.rabbit import RabbitSettings from ..core.settings import ApplicationSettings @@ -99,32 +100,49 @@ async def wait_for_rabbitmq_liveness(app: FastAPI) -> None: ) +@lru_cache(maxsize=2) +def _is_rabbitmq_initialized(app: FastAPI, state_client_name: str) -> bool: + return hasattr(app.state, state_client_name) + + +def _raise_if_not_initialized(app: FastAPI, state_client_name: str) -> None: + if not _is_rabbitmq_initialized(app, state_client_name): + msg = "RabbitMQ client is not available. Please check the configuration." + raise RuntimeError(msg) + + +def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: + _raise_if_not_initialized(app, "rabbitmq_client") + return cast(RabbitMQClient, app.state.rabbitmq_client) + + +def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient: + _raise_if_not_initialized(app, "rabbitmq_rpc_server") + return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server) + + def setup_rabbitmq(app: FastAPI) -> None: async def on_startup() -> None: app_settings: ApplicationSettings = app.state.settings assert app_settings.RABBIT_SETTINGS # nosec settings = app_settings.RABBIT_SETTINGS + with log_context(_logger, logging.INFO, msg="Create RabbitMQClient"): app.state.rabbitmq_client = RabbitMQClient( client_name=f"dynamic-sidecar_{app_settings.DY_SIDECAR_NODE_ID}", settings=settings, ) + with log_context(_logger, logging.INFO, msg="Create RabbitMQRPCClient"): + app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( + client_name=f"dynamic-sidecar_rpc_{app_settings.DY_SIDECAR_NODE_ID}", + settings=settings, + ) async def on_shutdown() -> None: if app.state.rabbitmq_client: await app.state.rabbitmq_client.close() + if app.state.rabbitmq_rpc_server: + await app.state.rabbitmq_rpc_server.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) - - -@lru_cache(maxsize=1) -def _is_rabbitmq_initialized(app: FastAPI) -> bool: - return hasattr(app.state, "rabbitmq_client") - - -def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: - if not _is_rabbitmq_initialized(app): - msg = "RabbitMQ client is not available. Please check the configuration." - raise RuntimeError(msg) - return cast(RabbitMQClient, app.state.rabbitmq_client) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py index 840c47d729e..664b0e8285d 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py @@ -1,7 +1,8 @@ -from pathlib import Path - from fastapi import FastAPI -from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.api_schemas_dynamic_sidecar.telemetry import ( + DiskUsage, + MountPathCategory, +) from models_library.projects_nodes_io import NodeID from models_library.users import UserID @@ -9,7 +10,11 @@ async def publish_disk_usage( - app: FastAPI, *, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] + app: FastAPI, + *, + user_id: UserID, + node_id: NodeID, + usage: dict[MountPathCategory, DiskUsage] ) -> None: notifier: Notifier = Notifier.get_from_app_state(app) await notifier.notify_service_disk_usage( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py index 0d61e1b388b..04a82d44a04 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py @@ -1,5 +1,4 @@ import contextlib -from pathlib import Path import socketio # type: ignore[import-untyped] from fastapi import FastAPI @@ -17,6 +16,7 @@ ) from models_library.api_schemas_dynamic_sidecar.telemetry import ( DiskUsage, + MountPathCategory, ServiceDiskUsage, ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr @@ -34,7 +34,10 @@ def __init__(self, sio_manager: socketio.AsyncAioPikaManager): self._sio_manager = sio_manager async def notify_service_disk_usage( - self, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] + self, + user_id: UserID, + node_id: NodeID, + usage: dict[MountPathCategory, DiskUsage], ) -> None: await self._sio_manager.emit( SOCKET_IO_SERVICE_DISK_USAGE_EVENT, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py index 18382a30045..546243f4cdb 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py @@ -1,3 +1,7 @@ +from ._disk_usage import get_disk_usage_monitor from ._setup import setup_system_monitor -__all__: tuple[str, ...] = ("setup_system_monitor",) +__all__: tuple[str, ...] = ( + "get_disk_usage_monitor", + "setup_system_monitor", +) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py index 90b06450e6f..cdaa531aeb8 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py @@ -2,11 +2,16 @@ import logging from dataclasses import dataclass, field from datetime import timedelta +from functools import cached_property from pathlib import Path +from typing import Final import psutil from fastapi import FastAPI -from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.api_schemas_dynamic_sidecar.telemetry import ( + DiskUsage, + MountPathCategory, +) from models_library.projects_nodes_io import NodeID from models_library.users import UserID from servicelib.background_task import start_periodic_task, stop_periodic_task @@ -17,9 +22,18 @@ from ..mounted_fs import MountedVolumes from ..notifications import publish_disk_usage +_NODE_FILE_SYSTEM_PATH: Final[Path] = Path("/") + + _logger = logging.getLogger(__name__) +_SUPPORTED_ITEMS: Final[set[str]] = { + MountPathCategory.HOST, + MountPathCategory.STATES_VOLUMES, +} + + async def get_usage(path: Path) -> DiskUsage: usage = await asyncio.get_event_loop().run_in_executor( None, psutil.disk_usage, f"{path}" @@ -27,31 +41,144 @@ async def get_usage(path: Path) -> DiskUsage: return DiskUsage.from_ps_util_disk_usage(usage) +def get_relative_path(path: Path, dy_volumes_mount_dir: Path) -> Path: + try: + return path.relative_to(dy_volumes_mount_dir) + except ValueError: + return path + + +def _get_normalized_folder_name(path: Path) -> str: + return f"{path}".replace("/", "_") + + +def _have_common_entries(a: set[str], b: set[str]) -> bool: + return bool(len(a & b) > 0) + + @dataclass class DiskUsageMonitor: app: FastAPI user_id: UserID node_id: NodeID interval: timedelta - monitored_paths: list[Path] + monitored_paths: dict[MountPathCategory, set[Path]] + + dy_volumes_mount_dir: Path _monitor_task: asyncio.Task | None = None - _last_usage: dict[Path, DiskUsage] = field(default_factory=dict) - async def _publish_disk_usage(self, usage: dict[Path, DiskUsage]): + # tracked disk usage + _last_usage: dict[MountPathCategory, DiskUsage] = field(default_factory=dict) + _usage_overwrite: dict[str, DiskUsage] = field( + default_factory=dict, + metadata={ + "description": ( + "third party services can update the disk usage for certain paths " + "monitored by the dynamic-sidecar. This is the case for the efs-guardian." + ) + }, + ) + + @cached_property + def _monitored_paths_set(self) -> set[Path]: + return set.union(*self.monitored_paths.values()) + + @cached_property + def _normalized_monitored_paths(self) -> dict[MountPathCategory, set[str]]: + """ + Transforms Path -> str form `/tmp/.some_file/here` -> `_tmp_.some_file_here`. + This a one way transformation used to uniquely identify volume mounts inside + by the dynamic-sidecar. These are also used by the efs-guardian. + """ + return { + k: { + _get_normalized_folder_name( + get_relative_path(p, self.dy_volumes_mount_dir) + ) + for p in paths + } + for k, paths in self.monitored_paths.items() + } + + async def _get_measured_disk_usage(self) -> list[DiskUsage]: + return await logged_gather( + *[get_usage(monitored_path) for monitored_path in self._monitored_paths_set] + ) + + def _get_local_disk_usage( + self, measured_disk_usage: list[DiskUsage] + ) -> dict[str, DiskUsage]: + return { + _get_normalized_folder_name( + get_relative_path(p, self.dy_volumes_mount_dir) + ): u + for p, u in zip(self._monitored_paths_set, measured_disk_usage, strict=True) + } + + def _replace_incoming_usage( + self, normalized_disk_usage: dict[str, DiskUsage] + ) -> None: + """overwrites local disk usage with incoming usage from egs-guardian""" + for key, overwrite_usage in self._usage_overwrite.items(): + normalized_disk_usage[key] = overwrite_usage # noqa: PERF403 + + @staticmethod + def _get_grouped_usage_to_folder_names( + normalized_disk_usage: dict[str, DiskUsage] + ) -> dict[DiskUsage, set[str]]: + """Groups all paths that have the same metrics together""" + usage_to_folder_names: dict[DiskUsage, set[str]] = {} + for folder_name, disk_usage in normalized_disk_usage.items(): + if disk_usage not in usage_to_folder_names: + usage_to_folder_names[disk_usage] = set() + + usage_to_folder_names[disk_usage].add(folder_name) + return usage_to_folder_names + + async def _publish_disk_usage(self, usage: dict[MountPathCategory, DiskUsage]): await publish_disk_usage( self.app, user_id=self.user_id, node_id=self.node_id, usage=usage ) async def _monitor(self) -> None: - disk_usages: list[DiskUsage] = await logged_gather( - *[get_usage(monitored_path) for monitored_path in self.monitored_paths] - ) + measured_disk_usage = await self._get_measured_disk_usage() + + local_disk_usage = self._get_local_disk_usage(measured_disk_usage) - usage: dict[Path, DiskUsage] = dict( - zip(self.monitored_paths, disk_usages, strict=True) + self._replace_incoming_usage(local_disk_usage) + + usage_to_folder_names = self._get_grouped_usage_to_folder_names( + local_disk_usage ) - # notify only when changed + # compute new version of DiskUsage for FE, only 1 label for each unique disk usage entry + usage: dict[MountPathCategory, DiskUsage] = {} + + normalized_paths = self._normalized_monitored_paths + + for disk_usage, folder_names in usage_to_folder_names.items(): + for category in [ + MountPathCategory.HOST, + MountPathCategory.STATES_VOLUMES, + MountPathCategory.INPUTS_VOLUMES, + MountPathCategory.OUTPUTS_VOLUMES, + ]: + if folder_names.intersection(normalized_paths[category]): + usage[category] = disk_usage + break + else: + msg = f"Could not assign {disk_usage=} for {folder_names=}" + raise RuntimeError(msg) + + detected_items = set(usage.keys()) + if not detected_items.issubset(_SUPPORTED_ITEMS): + msg = ( + f"Computed {usage=}, has unsupported items {detected_items=}. " + f"Currently only the following are supported: {_SUPPORTED_ITEMS}" + ) + raise RuntimeError(msg) + + # notify only when usage changes if self._last_usage != usage: await self._publish_disk_usage(usage) self._last_usage = usage @@ -65,13 +192,28 @@ async def shutdown(self) -> None: if self._monitor_task: await stop_periodic_task(self._monitor_task) + def set_disk_usage_for_path(self, overwrite_usage: dict[str, DiskUsage]) -> None: + """ + efs-guardian manages disk quotas since the underlying FS has no support for them. + the dynamic-sidecar will use this information to provide correct quotas for the + volumes managed by the efs-guardian + """ + self._usage_overwrite = overwrite_usage -def _get_monitored_paths(app: FastAPI) -> list[Path]: + +def _get_monitored_paths(app: FastAPI) -> dict[MountPathCategory, set[Path]]: mounted_volumes: MountedVolumes = app.state.mounted_volumes - return [ - Path("/"), # root file system and /tmp usage mainly - *list(mounted_volumes.all_disk_paths_iter()), - ] + return { + MountPathCategory.HOST: {_NODE_FILE_SYSTEM_PATH}, + MountPathCategory.INPUTS_VOLUMES: {mounted_volumes.disk_inputs_path}, + MountPathCategory.OUTPUTS_VOLUMES: {mounted_volumes.disk_outputs_path}, + MountPathCategory.STATES_VOLUMES: set(mounted_volumes.disk_state_paths_iter()), + } + + +def get_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor: + disk_usage_monitor: DiskUsageMonitor = app.state.disk_usage_monitor + return disk_usage_monitor def setup_disk_usage(app: FastAPI) -> None: @@ -85,6 +227,7 @@ async def on_startup() -> None: node_id=settings.DY_SIDECAR_NODE_ID, interval=settings.DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL, monitored_paths=_get_monitored_paths(app), + dy_volumes_mount_dir=settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR, ) await disk_usage_monitor.setup() diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index 53b88ac1359..8b4760b26dd 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -41,10 +41,11 @@ "pytest_simcore.faker_users_data", "pytest_simcore.minio_service", "pytest_simcore.pytest_global_environs", - "pytest_simcore.socketio", "pytest_simcore.rabbit_service", + "pytest_simcore.redis_service", "pytest_simcore.repository_paths", "pytest_simcore.simcore_service_library_fixtures", + "pytest_simcore.socketio", ] CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent @@ -318,6 +319,21 @@ def mock_core_rabbitmq(mocker: MockerFixture) -> dict[str, AsyncMock]: return_value=None, autospec=True, ), + "rpc._rpc_initialize": mocker.patch( + "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQRPCClient._rpc_initialize", + return_value=None, + autospec=True, + ), + "rpc.close": mocker.patch( + "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQRPCClient.close", + return_value=None, + autospec=True, + ), + "rpc.register_router": mocker.patch( + "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQRPCClient.register_router", + return_value=None, + autospec=True, + ), } diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py similarity index 99% rename from services/dynamic-sidecar/tests/unit/test_api_containers.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_containers.py index 3a053d8e2fe..f16b883de15 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py @@ -30,7 +30,7 @@ from servicelib.docker_constants import SUFFIX_EGRESS_PROXY_NAME from servicelib.fastapi.long_running_tasks.client import TaskId from simcore_service_dynamic_sidecar._meta import API_VTAG -from simcore_service_dynamic_sidecar.api.containers import _INACTIVE_FOR_LONG_TIME +from simcore_service_dynamic_sidecar.api.rest.containers import _INACTIVE_FOR_LONG_TIME from simcore_service_dynamic_sidecar.core.application import AppState from simcore_service_dynamic_sidecar.core.docker_compose_utils import ( docker_compose_create, @@ -772,7 +772,7 @@ def mock_inactive_since_command_response( activity_response: ActivityInfo, ) -> None: mocker.patch( - "simcore_service_dynamic_sidecar.api.containers.run_command_in_container", + "simcore_service_dynamic_sidecar.api.rest.containers.run_command_in_container", return_value=activity_response.json(), ) @@ -792,7 +792,7 @@ async def test_containers_activity_inactive_since( @pytest.fixture def mock_inactive_response_wrong_format(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_dynamic_sidecar.api.containers.run_command_in_container", + "simcore_service_dynamic_sidecar.api.rest.containers.run_command_in_container", return_value="This is an unparsable json response {}", ) diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py similarity index 99% rename from services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py index 7493c6a1b7b..9ea4de06dbb 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py @@ -36,7 +36,7 @@ from servicelib.fastapi.long_running_tasks.client import setup as client_setup from simcore_sdk.node_ports_common.exceptions import NodeNotFound from simcore_service_dynamic_sidecar._meta import API_VTAG -from simcore_service_dynamic_sidecar.api import containers_long_running_tasks +from simcore_service_dynamic_sidecar.api.rest import containers_long_running_tasks from simcore_service_dynamic_sidecar.core.validation import InvalidComposeSpecError from simcore_service_dynamic_sidecar.models.schemas.containers import ( ContainersComposeSpec, diff --git a/services/dynamic-sidecar/tests/unit/test_api_disk.py b/services/dynamic-sidecar/tests/unit/test_api_rest_disk.py similarity index 100% rename from services/dynamic-sidecar/tests/unit/test_api_disk.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_disk.py diff --git a/services/dynamic-sidecar/tests/unit/test_api_health.py b/services/dynamic-sidecar/tests/unit/test_api_rest_health.py similarity index 100% rename from services/dynamic-sidecar/tests/unit/test_api_health.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_health.py diff --git a/services/dynamic-sidecar/tests/unit/test_api_prometheus_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_rest_prometheus_metrics.py similarity index 100% rename from services/dynamic-sidecar/tests/unit/test_api_prometheus_metrics.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_prometheus_metrics.py diff --git a/services/dynamic-sidecar/tests/unit/test_api_volumes.py b/services/dynamic-sidecar/tests/unit/test_api_rest_volumes.py similarity index 100% rename from services/dynamic-sidecar/tests/unit/test_api_volumes.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_volumes.py diff --git a/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py similarity index 100% rename from services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py rename to services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py diff --git a/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py b/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py new file mode 100644 index 00000000000..cda0964a3b3 --- /dev/null +++ b/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py @@ -0,0 +1,76 @@ +# pylint:disable=protected-access +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from collections.abc import Awaitable, Callable +from typing import AsyncIterable +from unittest.mock import AsyncMock + +import pytest +from asgi_lifespan import LifespanManager +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from pydantic import ByteSize +from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar import disk_usage +from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings +from simcore_service_dynamic_sidecar.core.application import create_app +from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( + get_disk_usage_monitor, +) + +pytest_simcore_core_services_selection = [ + "redis", + "rabbit", +] + + +@pytest.fixture +def mock_environment( + monkeypatch: pytest.MonkeyPatch, + rabbit_service: RabbitSettings, + redis_service: RedisSettings, + mock_environment: EnvVarsDict, + mock_registry_service: AsyncMock, +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + "DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": "true", + "RABBIT_HOST": rabbit_service.RABBIT_HOST, + "RABBIT_PASSWORD": rabbit_service.RABBIT_PASSWORD.get_secret_value(), + "RABBIT_PORT": f"{rabbit_service.RABBIT_PORT}", + "RABBIT_SECURE": f"{rabbit_service.RABBIT_SECURE}", + "RABBIT_USER": rabbit_service.RABBIT_USER, + }, + ) + + +@pytest.fixture +async def app(mock_environment: EnvVarsDict) -> AsyncIterable[FastAPI]: + app = create_app() + async with LifespanManager(app): + yield app + + +@pytest.fixture +async def rpc_client( + app: FastAPI, + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + +async def test_get_state(app: FastAPI, rpc_client: RabbitMQRPCClient): + usage = { + "some_path": DiskUsage( + total=ByteSize(0), used=ByteSize(0), free=ByteSize(0), used_percent=0 + ) + } + + result = await disk_usage.update_disk_usage(rpc_client, usage=usage) + assert result is None + + assert get_disk_usage_monitor(app)._usage_overwrite == usage # noqa: SLF001 diff --git a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py index c7f7e770166..eabd1114083 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -25,6 +25,7 @@ ) from models_library.api_schemas_dynamic_sidecar.telemetry import ( DiskUsage, + MountPathCategory, ServiceDiskUsage, ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr @@ -130,7 +131,7 @@ async def _assert_call_count(mock: AsyncMock, *, call_count: int) -> None: def _get_mocked_disk_usage(byte_size_str: str) -> DiskUsage: return DiskUsage( - total=ByteSize(0), + total=ByteSize.validate(byte_size_str), used=ByteSize(0), free=ByteSize.validate(byte_size_str), used_percent=0, @@ -155,11 +156,13 @@ async def on_service_status(data): "usage", [ pytest.param({}, id="empty"), - pytest.param({Path("/"): _get_mocked_disk_usage("1kb")}, id="one_entry"), + pytest.param( + {MountPathCategory.HOST: _get_mocked_disk_usage("1kb")}, id="one_entry" + ), pytest.param( { - Path("/"): _get_mocked_disk_usage("1kb"), - Path("/tmp"): _get_mocked_disk_usage("2kb"), # noqa: S108 + MountPathCategory.HOST: _get_mocked_disk_usage("1kb"), + MountPathCategory.STATES_VOLUMES: _get_mocked_disk_usage("2kb"), }, id="two_entries", ), diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py index 2c4c3c6a125..e423d588480 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py @@ -1,3 +1,4 @@ +# pylint:disable=protected-access # pylint:disable=redefined-outer-name # pylint:disable=unused-argument @@ -9,18 +10,56 @@ import pytest from faker import Faker from fastapi import FastAPI -from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.api_schemas_dynamic_sidecar.telemetry import ( + DiskUsage, + MountPathCategory, +) from models_library.projects_nodes_io import NodeID +from models_library.services_types import RunID from models_library.users import UserID +from models_library.utils.json_serialization import json_dumps from psutil._common import sdiskusage from pydantic import ByteSize from pytest_mock import MockerFixture +from simcore_service_dynamic_sidecar.modules.mounted_fs import MountedVolumes from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( DiskUsageMonitor, + _get_monitored_paths, get_usage, ) +@pytest.fixture +def dy_volumes(tmp_path: Path) -> Path: + return tmp_path + + +@pytest.fixture +def get_monitored_paths( + dy_volumes: Path, node_id: NodeID +) -> Callable[[Path, Path, list[Path]], dict[MountPathCategory, set[Path]]]: + def _( + inputs: Path, outputs: Path, states: list[Path] + ) -> dict[MountPathCategory, set[Path]]: + mounted_volumes = MountedVolumes( + run_id=RunID.create(), + node_id=node_id, + inputs_path=dy_volumes / inputs, + outputs_path=dy_volumes / outputs, + user_preferences_path=None, + state_paths=[dy_volumes / x for x in states], + state_exclude=set(), + compose_namespace="", + dy_volumes=dy_volumes, + ) + app = Mock() + app.state = Mock() + app.state.mounted_volumes = mounted_volumes + return _get_monitored_paths(app) + + return _ + + @pytest.fixture def mock_disk_usage(mocker: MockerFixture) -> Callable[[dict[str, ByteSize]], None]: base_module = "simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage" @@ -84,15 +123,22 @@ async def _assert_monitor_triggers( async def test_disk_usage_monitor( mock_disk_usage: Callable[[dict[str, ByteSize]], None], + get_monitored_paths: Callable[ + [Path, Path, list[Path]], dict[MountPathCategory, set[Path]] + ], + dy_volumes: Path, publish_disk_usage_spy: Mock, - faker: Faker, + node_id: NodeID, ) -> None: disk_usage_monitor = DiskUsageMonitor( app=AsyncMock(), user_id=1, - node_id=faker.uuid4(), + node_id=node_id, interval=timedelta(seconds=5), - monitored_paths=[Path("/"), Path("/tmp")], # noqa: S108 + monitored_paths=get_monitored_paths( + Path("/inputs"), Path("/outputs"), [Path("/workspace")] + ), + dy_volumes_mount_dir=dy_volumes, ) assert len(publish_disk_usage_spy.call_args_list) == 0 @@ -100,9 +146,9 @@ async def test_disk_usage_monitor( for i in range(1, 3): mock_disk_usage( { - "/": _get_byte_size(f"{i}kb"), - "/tmp": _get_byte_size(f"{i*2}kb"), # noqa: S108 - } + f"{p}": _get_byte_size(f"{i*2}kb") + for p in disk_usage_monitor._monitored_paths_set # noqa: SLF001 + }, ) await _assert_monitor_triggers( @@ -110,8 +156,7 @@ async def test_disk_usage_monitor( ) assert _get_entry(publish_disk_usage_spy, index=0) == { - Path("/"): _get_mocked_disk_usage(f"{i}kb"), - Path("/tmp"): _get_mocked_disk_usage(f"{i*2}kb"), # noqa: S108 + MountPathCategory.HOST: _get_mocked_disk_usage(f"{i*2}kb"), } # reset mock to test again @@ -119,7 +164,7 @@ async def test_disk_usage_monitor( def _random_tmp_file(tmp_path: Path, faker: Faker) -> None: - some_path: Path = tmp_path / faker.uuid4() + some_path: Path = tmp_path / f"{faker.uuid4()}" some_path.write_text("some text here") @@ -129,3 +174,78 @@ async def test_get_usage(tmp_path: Path, faker: Faker): usage_after = await get_usage(Path("/")) assert usage_after.free < usage_before.free + + +async def test_disk_usage_monitor_new_frontend_format( + mock_disk_usage: Callable[[dict[str, ByteSize]], None], + get_monitored_paths: Callable[ + [Path, Path, list[Path]], dict[MountPathCategory, set[Path]] + ], + publish_disk_usage_spy: Mock, + node_id: NodeID, + dy_volumes: Path, +) -> None: + disk_usage_monitor = DiskUsageMonitor( + app=AsyncMock(), + user_id=1, + node_id=node_id, + interval=timedelta(seconds=5), + monitored_paths=get_monitored_paths( + Path("/home/user/inputs"), + Path("/home/user/outputs"), + [Path("/home/user/workspace"), Path("/.data/assets")], + ), + dy_volumes_mount_dir=dy_volumes, + ) + + mock_disk_usage( + { + f"{p}": ByteSize(1294390525952) + for p in disk_usage_monitor._monitored_paths_set # noqa: SLF001 + }, + ) + + async def _wait_for_usage() -> dict[str, DiskUsage]: + publish_disk_usage_spy.reset_mock() + await disk_usage_monitor._monitor() # noqa: SLF001 + publish_disk_usage_spy.assert_called() + return publish_disk_usage_spy.call_args_list[0][0][0] + + # normally only 1 entry is found + frontend_usage = await _wait_for_usage() + print(json_dumps(frontend_usage, indent=2)) + assert len(frontend_usage) == 1 + assert MountPathCategory.HOST in frontend_usage + assert frontend_usage[MountPathCategory.HOST] == _get_mocked_disk_usage( + "1294390525952" + ) + + # emulate EFS sending metrics, 2 entries are found + + disk_usage_monitor.set_disk_usage_for_path( + { + ".data_assets": _get_mocked_disk_usage("1GB"), + "home_user_workspace": _get_mocked_disk_usage("1GB"), + } + ) + + frontend_usage = await _wait_for_usage() + print(json_dumps(frontend_usage, indent=2)) + assert len(frontend_usage) == 2 + assert MountPathCategory.HOST in frontend_usage + assert MountPathCategory.STATES_VOLUMES in frontend_usage + assert frontend_usage[MountPathCategory.HOST] == _get_mocked_disk_usage( + "1294390525952" + ) + assert frontend_usage[MountPathCategory.STATES_VOLUMES] == _get_mocked_disk_usage( + "1GB" + ) + + # emulate data could not be mapped + disk_usage_monitor.set_disk_usage_for_path( + { + "missing_path": _get_mocked_disk_usage("2GB"), + } + ) + with pytest.raises(RuntimeError): + frontend_usage = await _wait_for_usage() diff --git a/services/static-webserver/client/source/class/osparc/workbench/DiskUsageController.js b/services/static-webserver/client/source/class/osparc/workbench/DiskUsageController.js index ca031f65023..fd15b5cc688 100644 --- a/services/static-webserver/client/source/class/osparc/workbench/DiskUsageController.js +++ b/services/static-webserver/client/source/class/osparc/workbench/DiskUsageController.js @@ -87,7 +87,7 @@ qx.Class.define("osparc.workbench.DiskUsageController", { return; } - const diskUsage = data.usage["/"] + const diskUsage = data.usage["HOST"] function isMatchingNodeId({nodeId}) { return nodeId === id; } diff --git a/services/static-webserver/client/source/class/osparc/workbench/DiskUsageIndicator.js b/services/static-webserver/client/source/class/osparc/workbench/DiskUsageIndicator.js index c944d7fc9ed..d844cfcd5e9 100644 --- a/services/static-webserver/client/source/class/osparc/workbench/DiskUsageIndicator.js +++ b/services/static-webserver/client/source/class/osparc/workbench/DiskUsageIndicator.js @@ -156,7 +156,7 @@ qx.Class.define("osparc.workbench.DiskUsageIndicator", { return; } - const usage = diskUsage["usage"]["/"] + const usage = diskUsage["usage"]["HOST"] const color1 = this.__getIndicatorColor(usage.free); const progress = `${usage["used_percent"]}%`; const labelDiskSize = osparc.utils.Utils.bytesToSize(usage.free);