diff --git a/README.md b/README.md index 1cb462938a5..cdd64b4de70 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # osparc-simcore platform

+ +

@@ -184,7 +186,9 @@ This project is licensed under the terms of the [MIT license](LICENSE). ---

+ Made with love (and lots of hard work) at www.z43.swiss +

diff --git a/packages/models-library/requirements/_test.in b/packages/models-library/requirements/_test.in index 55fd8bd1403..3d83dade891 100644 --- a/packages/models-library/requirements/_test.in +++ b/packages/models-library/requirements/_test.in @@ -11,6 +11,7 @@ coverage faker pint +psutil pytest pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76 pytest-cov diff --git a/packages/models-library/requirements/_test.txt b/packages/models-library/requirements/_test.txt index 61487eab257..4c65677c216 100644 --- a/packages/models-library/requirements/_test.txt +++ b/packages/models-library/requirements/_test.txt @@ -52,6 +52,8 @@ pluggy==1.3.0 # via pytest pprintpp==0.4.0 # via pytest-icdiff +psutil==5.9.8 + # via -r requirements/_test.in pytest==7.4.3 # via # -r requirements/_test.in diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py new file mode 100644 index 00000000000..054b0834bc4 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py @@ -0,0 +1,3 @@ +from typing import Final + +SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage" diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py new file mode 100644 index 00000000000..22d151221d8 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/telemetry.py @@ -0,0 +1,58 @@ +from abc import abstractmethod +from pathlib import Path +from typing import Protocol + +from models_library.projects_nodes_io import NodeID +from pydantic import BaseModel, ByteSize, Field + + +class SDiskUsageProtocol(Protocol): + @property + @abstractmethod + def total(self) -> int: + ... + + @property + @abstractmethod + def used(self) -> int: + ... + + @property + @abstractmethod + def free(self) -> int: + ... + + @property + @abstractmethod + def percent(self) -> float: + ... + + +class DiskUsage(BaseModel): + used: ByteSize = Field(description="used space") + free: ByteSize = Field(description="remaining space") + + total: ByteSize = Field(description="total space = free + used") + used_percent: float = Field( + gte=0.00, + lte=100.00, + description="Percent of used space relative to the total space", + ) + + @classmethod + def from_ps_util_disk_usage( + cls, ps_util_disk_usage: SDiskUsageProtocol + ) -> "DiskUsage": + total = ps_util_disk_usage.free + ps_util_disk_usage.used + used_percent = round(ps_util_disk_usage.used * 100 / total, 2) + return cls( + used=ByteSize(ps_util_disk_usage.used), + free=ByteSize(ps_util_disk_usage.free), + total=ByteSize(total), + used_percent=used_percent, + ) + + +class ServiceDiskUsage(BaseModel): + node_id: NodeID + usage: dict[Path, DiskUsage] diff --git a/packages/models-library/src/models_library/api_schemas_webserver/socketio.py b/packages/models-library/src/models_library/api_schemas_webserver/socketio.py index 6221387aa11..05bd342a4c3 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/socketio.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/socketio.py @@ -1,17 +1,16 @@ +from ..basic_types import IDStr from ..users import GroupID, UserID -class SocketIORoom(str): - __slots__ = () - +class SocketIORoomStr(IDStr): @classmethod - def from_socket_id(cls, socket_id: str) -> "SocketIORoom": + def from_socket_id(cls, socket_id: str) -> "SocketIORoomStr": return cls(socket_id) @classmethod - def from_group_id(cls, group_id: GroupID) -> "SocketIORoom": + def from_group_id(cls, group_id: GroupID) -> "SocketIORoomStr": return cls(f"group:{group_id}") @classmethod - def from_user_id(cls, user_id: UserID) -> "SocketIORoom": + def from_user_id(cls, user_id: UserID) -> "SocketIORoomStr": return cls(f"user:{user_id}") diff --git a/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py b/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py new file mode 100644 index 00000000000..56ac6ed5c6f --- /dev/null +++ b/packages/models-library/tests/test_api_schemas_dynamic_sidecar_telemetry.py @@ -0,0 +1,10 @@ +import psutil +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage + + +def test_disk_usage(): + ps_util_disk_usage = psutil.disk_usage("/") + disk_usage = DiskUsage.from_ps_util_disk_usage(ps_util_disk_usage) + assert disk_usage.used == ps_util_disk_usage.used + assert disk_usage.free == ps_util_disk_usage.free + assert round(disk_usage.used_percent, 1) == round(ps_util_disk_usage.percent, 1) diff --git a/packages/models-library/tests/test_api_schemas_webserver_socketio.py b/packages/models-library/tests/test_api_schemas_webserver_socketio.py index 9136dfda8ae..e5dfdbf7eff 100644 --- a/packages/models-library/tests/test_api_schemas_webserver_socketio.py +++ b/packages/models-library/tests/test_api_schemas_webserver_socketio.py @@ -2,7 +2,7 @@ import pytest from faker import Faker -from models_library.api_schemas_webserver.socketio import SocketIORoom +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.users import GroupID, UserID @@ -22,6 +22,6 @@ def socket_id(faker: Faker) -> str: def test_socketio_room(user_id: UserID, group_id: GroupID, socket_id: str): - assert SocketIORoom.from_user_id(user_id) == f"user:{user_id}" - assert SocketIORoom.from_group_id(group_id) == f"group:{group_id}" - assert SocketIORoom.from_socket_id(socket_id) == socket_id + assert SocketIORoomStr.from_user_id(user_id) == f"user:{user_id}" + assert SocketIORoomStr.from_group_id(group_id) == f"group:{group_id}" + assert SocketIORoomStr.from_socket_id(socket_id) == socket_id diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/f20f4c9fca71_added_enable_telemetry_option_to_groups_.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/f20f4c9fca71_added_enable_telemetry_option_to_groups_.py new file mode 100644 index 00000000000..18d66b72a2e --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/f20f4c9fca71_added_enable_telemetry_option_to_groups_.py @@ -0,0 +1,35 @@ +"""added enable telemetry option to groups extra properties + +Revision ID: f20f4c9fca71 +Revises: f9f9a650bf4b +Create Date: 2024-01-19 14:11:16.354169+00:00 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f20f4c9fca71" +down_revision = "f9f9a650bf4b" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "groups_extra_properties", + sa.Column( + "enable_telemetry", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("groups_extra_properties", "enable_telemetry") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py index e4011561a27..e0d438d76c9 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py @@ -56,6 +56,13 @@ server_default=sa.sql.expression.false(), doc="If true, group will use on-demand clusters", ), + sa.Column( + "enable_telemetry", + sa.Boolean(), + nullable=False, + server_default=sa.sql.expression.false(), + doc="If true, will send telemetry for new style dynamic services to frontend", + ), sa.UniqueConstraint( "group_id", "product_name", name="group_id_product_name_uniqueness" ), diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py index 89029085667..e52fbb4791a 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py @@ -29,6 +29,7 @@ class GroupExtraProperties(FromRowMixin): internet_access: bool override_services_specifications: bool use_on_demand_clusters: bool + enable_telemetry: bool created: datetime.datetime modified: datetime.datetime diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_users.py b/packages/postgres-database/src/simcore_postgres_database/utils_users.py index 715b14a489f..0dc315d7ebf 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_users.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_users.py @@ -105,7 +105,7 @@ async def get_active_user_email(conn: SAConnection, user_id: int) -> str: (users.c.status == UserStatus.ACTIVE) & (users.c.id == user_id) ) ) - if value: + if value is not None: assert isinstance(value, str) # nosec return value diff --git a/packages/pytest-simcore/src/pytest_simcore/pytest_socketio.py b/packages/pytest-simcore/src/pytest_simcore/pytest_socketio.py new file mode 100644 index 00000000000..4932d789249 --- /dev/null +++ b/packages/pytest-simcore/src/pytest_simcore/pytest_socketio.py @@ -0,0 +1,143 @@ +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +import asyncio +from collections.abc import AsyncIterable, AsyncIterator, Callable +from contextlib import _AsyncGeneratorContextManager, asynccontextmanager +from unittest.mock import AsyncMock + +import pytest +import socketio +from aiohttp import web +from aiohttp.test_utils import TestServer +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.users import UserID +from pytest_mock import MockerFixture +from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager +from settings_library.rabbit import RabbitSettings +from socketio import AsyncAioPikaManager, AsyncServer +from yarl import URL + + +@pytest.fixture +async def socketio_server_factory() -> Callable[ + [RabbitSettings], _AsyncGeneratorContextManager[AsyncServer] +]: + @asynccontextmanager + async def _(rabbit_settings: RabbitSettings) -> AsyncIterator[AsyncServer]: + # Same configuration as simcore_service_webserver/socketio/server.py + server_manager = AsyncAioPikaManager(url=rabbit_settings.dsn) + + server = AsyncServer( + async_mode="aiohttp", engineio_logger=True, client_manager=server_manager + ) + + yield server + + await cleanup_socketio_async_pubsub_manager(server_manager) + + return _ + + +@pytest.fixture +async def socketio_server() -> AsyncIterable[AsyncServer]: + msg = "must be implemented in test" + raise NotImplementedError(msg) + + +@pytest.fixture +async def web_server( + socketio_server: AsyncServer, unused_tcp_port_factory: Callable[[], int] +) -> AsyncIterator[URL]: + """ + this emulates the webserver setup: socketio server with + an aiopika manager that attaches an aiohttp web app + """ + aiohttp_app = web.Application() + socketio_server.attach(aiohttp_app) + + async def _lifespan( + server: TestServer, started: asyncio.Event, teardown: asyncio.Event + ): + # NOTE: this is necessary to avoid blocking comms between client and this server + await server.start_server() + started.set() # notifies started + await teardown.wait() # keeps test0server until needs to close + await server.close() + + setup = asyncio.Event() + teardown = asyncio.Event() + + server = TestServer(aiohttp_app, port=unused_tcp_port_factory()) + t = asyncio.create_task(_lifespan(server, setup, teardown), name="server-lifespan") + + await setup.wait() + + yield URL(server.make_url("/")) + + assert t + teardown.set() + + +@pytest.fixture +async def server_url(web_server: URL) -> str: + return f'{web_server.with_path("/")}' + + +@pytest.fixture +def socketio_client_factory( + server_url: str, +) -> Callable[[], _AsyncGeneratorContextManager[socketio.AsyncClient]]: + @asynccontextmanager + async def _() -> AsyncIterator[socketio.AsyncClient]: + """This emulates a socketio client in the front-end""" + client = socketio.AsyncClient(logger=True, engineio_logger=True) + await client.connect(f"{server_url}", transports=["websocket"]) + + yield client + + await client.disconnect() + + return _ + + +@pytest.fixture +def room_name() -> SocketIORoomStr: + msg = "must be implemented in test" + raise NotImplementedError(msg) + + +@pytest.fixture +def socketio_server_events( + socketio_server: AsyncServer, + mocker: MockerFixture, + user_id: UserID, + room_name: SocketIORoomStr, +) -> dict[str, AsyncMock]: + # handlers + async def connect(sid: str, environ): + print("connecting", sid) + await socketio_server.enter_room(sid, room_name) + + async def on_check(sid, data): + print("check", sid, data) + + async def disconnect(sid: str): + print("disconnecting", sid) + await socketio_server.leave_room(sid, room_name) + + # spies + spy_connect = mocker.AsyncMock(wraps=connect) + socketio_server.on("connect", spy_connect) + + spy_on_check = mocker.AsyncMock(wraps=on_check) + socketio_server.on("check", spy_on_check) + + spy_disconnect = mocker.AsyncMock(wraps=disconnect) + socketio_server.on("disconnect", spy_disconnect) + + return { + connect.__name__: spy_connect, + disconnect.__name__: spy_disconnect, + on_check.__name__: spy_on_check, + } diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py index 41ea99e2fdc..8706a899d12 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py @@ -1,28 +1,33 @@ -import sqlalchemy as sa -from models_library.products import ProductName -from models_library.users import UserID +from simcore_postgres_database.utils_groups_extra_properties import ( + GroupExtraProperties, + GroupExtraPropertiesRepo, +) -from ..tables import groups_extra_properties, user_to_groups from ._base import BaseRepository class GroupsExtraPropertiesRepository(BaseRepository): - async def has_internet_access( - self, user_id: UserID, product_name: ProductName - ) -> bool: + async def _get_aggregated_properties_for_user( + self, + *, + user_id: int, + product_name: str, + ) -> GroupExtraProperties: async with self.db_engine.acquire() as conn: - # checks if one of the groups which the user is part of has internet access - select_stmt = sa.select( - [groups_extra_properties.c.internet_access] - ).select_from( - user_to_groups.join( - groups_extra_properties, - (groups_extra_properties.c.group_id == user_to_groups.c.gid) - & (user_to_groups.c.uid == user_id) - & (groups_extra_properties.c.internet_access == True) - & (groups_extra_properties.c.product_name == product_name), - ) + return await GroupExtraPropertiesRepo.get_aggregated_properties_for_user( + conn, user_id=user_id, product_name=product_name ) - user_with_access = await conn.scalar(select_stmt) - return user_with_access is not None + async def has_internet_access(self, *, user_id: int, product_name: str) -> bool: + group_extra_properties = await self._get_aggregated_properties_for_user( + user_id=user_id, product_name=product_name + ) + internet_access: bool = group_extra_properties.internet_access + return internet_access + + async def is_telemetry_enabled(self, *, user_id: int, product_name: str) -> bool: + group_extra_properties = await self._get_aggregated_properties_for_user( + user_id=user_id, product_name=product_name + ) + telemetry_enabled: bool = group_extra_properties.enable_telemetry + return telemetry_enabled diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index 1ccd656b006..3a66de81819 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -48,6 +48,7 @@ def _get_environment_variables( *, allow_internet_access: bool, metrics_collection_allowed: bool, + telemetry_enabled: bool, ) -> dict[str, str]: registry_settings = app_settings.DIRECTOR_V2_DOCKER_REGISTRY rabbit_settings = app_settings.DIRECTOR_V2_RABBITMQ @@ -79,6 +80,7 @@ def _get_environment_variables( "DY_SIDECAR_PROJECT_ID": f"{scheduler_data.project_id}", "DY_SIDECAR_RUN_ID": scheduler_data.run_id, "DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS": f"{allow_internet_access}", + "DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": f"{telemetry_enabled}", "DY_SIDECAR_STATE_EXCLUDE": json_dumps(f"{x}" for x in state_exclude), "DY_SIDECAR_CALLBACKS_MAPPING": callbacks_mapping.json(), "DY_SIDECAR_STATE_PATHS": json_dumps( @@ -299,7 +301,7 @@ def _get_ports( return ports -def get_dynamic_sidecar_spec( +def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: PLR0913 scheduler_data: SchedulerData, dynamic_sidecar_settings: DynamicSidecarSettings, dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings, @@ -311,6 +313,7 @@ def get_dynamic_sidecar_spec( allow_internet_access: bool, hardware_info: HardwareInfo | None, metrics_collection_allowed: bool, + telemetry_enabled: bool, ) -> AioDockerServiceSpec: """ The dynamic-sidecar is responsible for managing the lifecycle @@ -408,6 +411,7 @@ def get_dynamic_sidecar_spec( app_settings, allow_internet_access=allow_internet_access, metrics_collection_allowed=metrics_collection_allowed, + telemetry_enabled=telemetry_enabled, ), "Hosts": [], "Image": dynamic_sidecar_settings.DYNAMIC_SIDECAR_IMAGE, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py new file mode 100644 index 00000000000..4f438209cf2 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py @@ -0,0 +1,309 @@ +# pylint: disable=relative-beyond-top-level + +import logging +from typing import Any, Final + +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.aiodocker_api import AioDockerServiceSpec +from models_library.projects import ProjectAtDB +from models_library.projects_nodes import Node +from models_library.projects_nodes_io import NodeIDStr +from models_library.rabbitmq_messages import ( + InstrumentationRabbitMessage, + ProgressRabbitMessageNode, + ProgressType, +) +from models_library.service_settings_labels import SimcoreServiceSettingsLabel +from models_library.services import RunID +from servicelib.json_serialization import json_dumps +from servicelib.rabbitmq import RabbitMQClient +from simcore_postgres_database.models.comp_tasks import NodeClass + +from .....core.dynamic_services_settings import DynamicServicesSettings +from .....core.dynamic_services_settings.proxy import DynamicSidecarProxySettings +from .....core.dynamic_services_settings.scheduler import ( + DynamicServicesSchedulerSettings, +) +from .....core.dynamic_services_settings.sidecar import ( + DynamicSidecarSettings, + PlacementSettings, +) +from .....models.dynamic_services_scheduler import NetworkId, SchedulerData +from .....utils.db import get_repository +from .....utils.dict_utils import nested_update +from ....catalog import CatalogClient +from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository +from ....db.repositories.projects import ProjectsRepository +from ....director_v0 import DirectorV0Client +from ...docker_api import ( + constrain_service_to_node, + create_network, + create_service_and_get_id, + get_dynamic_sidecar_placement, + get_swarm_network, + is_dynamic_sidecar_stack_missing, +) +from ...docker_service_specs import ( + extract_service_port_service_settings, + get_dynamic_proxy_spec, + get_dynamic_sidecar_spec, + merge_settings_before_use, +) +from ._abc import DynamicSchedulerEvent +from ._events_utils import get_allow_metrics_collection, get_director_v0_client + +_logger = logging.getLogger(__name__) + +_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS: Final[tuple[list[str], ...]] = ( + ["labels"], + ["task_template", "Resources", "Limits"], + ["task_template", "Resources", "Reservation", "MemoryBytes"], + ["task_template", "Resources", "Reservation", "NanoCPUs"], + ["task_template", "Placement", "Constraints"], + ["task_template", "ContainerSpec", "Env"], + ["task_template", "Resources", "Reservation", "GenericResources"], +) + + +async def _create_proxy_service( + app, + *, + scheduler_data: SchedulerData, + dynamic_sidecar_network_id: NetworkId, + swarm_network_id: NetworkId, + swarm_network_name: str, +): + proxy_settings: DynamicSidecarProxySettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR_PROXY_SETTINGS + ) + scheduler_data.proxy_admin_api_port = ( + proxy_settings.DYNAMIC_SIDECAR_CADDY_ADMIN_API_PORT + ) + + dynamic_services_settings: DynamicServicesSettings = ( + app.state.settings.DYNAMIC_SERVICES + ) + + dynamic_sidecar_proxy_create_service_params: dict[ + str, Any + ] = get_dynamic_proxy_spec( + scheduler_data=scheduler_data, + dynamic_services_settings=dynamic_services_settings, + dynamic_sidecar_network_id=dynamic_sidecar_network_id, + swarm_network_id=swarm_network_id, + swarm_network_name=swarm_network_name, + ) + _logger.debug( + "dynamic-sidecar-proxy create_service_params %s", + json_dumps(dynamic_sidecar_proxy_create_service_params), + ) + + await create_service_and_get_id(dynamic_sidecar_proxy_create_service_params) + + +class CreateSidecars(DynamicSchedulerEvent): + """Created the dynamic-sidecar and the proxy.""" + + @classmethod + async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + # the call to is_dynamic_sidecar_stack_missing is expensive + # if the dynamic sidecar was started skip + if scheduler_data.dynamic_sidecar.was_dynamic_sidecar_started: + return False + + settings: DynamicServicesSchedulerSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER + ) + return await is_dynamic_sidecar_stack_missing( + node_uuid=scheduler_data.node_uuid, + swarm_stack_name=settings.SWARM_STACK_NAME, + ) + + @classmethod + async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: + # instrumentation + message = InstrumentationRabbitMessage( + metrics="service_started", + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + service_uuid=scheduler_data.node_uuid, + service_type=NodeClass.INTERACTIVE.value, + service_key=scheduler_data.key, + service_tag=scheduler_data.version, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + ) + rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client + await rabbitmq_client.publish(message.channel_name, message) + + dynamic_sidecar_settings: DynamicSidecarSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR + ) + dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER + ) + dynamic_services_placement_settings: PlacementSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_PLACEMENT_SETTINGS + ) + + # the dynamic-sidecar should merge all the settings, especially: + # resources and placement derived from all the images in + # the provided docker-compose spec + # also other encodes the env vars to target the proper container + director_v0_client: DirectorV0Client = get_director_v0_client(app) + # fetching project form DB and fetching user settings + projects_repository = get_repository(app, ProjectsRepository) + + project: ProjectAtDB = await projects_repository.get_project( + project_id=scheduler_data.project_id + ) + + node_uuid_str = NodeIDStr(scheduler_data.node_uuid) + node: Node | None = project.workbench.get(node_uuid_str) + boot_options = ( + node.boot_options + if node is not None and node.boot_options is not None + else {} + ) + _logger.info("%s", f"{boot_options=}") + + settings: SimcoreServiceSettingsLabel = await merge_settings_before_use( + director_v0_client=director_v0_client, + service_key=scheduler_data.key, + service_tag=scheduler_data.version, + service_user_selection_boot_options=boot_options, + service_resources=scheduler_data.service_resources, + placement_substitutions=dynamic_services_placement_settings.DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS, + ) + + groups_extra_properties = get_repository(app, GroupsExtraPropertiesRepository) + + assert scheduler_data.product_name is not None # nosec + allow_internet_access: bool = await groups_extra_properties.has_internet_access( + user_id=scheduler_data.user_id, product_name=scheduler_data.product_name + ) + + network_config = { + "Name": scheduler_data.dynamic_sidecar_network_name, + "Driver": "overlay", + "Labels": { + "io.simcore.zone": f"{dynamic_services_scheduler_settings.TRAEFIK_SIMCORE_ZONE}", + "com.simcore.description": f"interactive for node: {scheduler_data.node_uuid}", + "uuid": f"{scheduler_data.node_uuid}", # needed for removal when project is closed + }, + "Attachable": True, + "Internal": not allow_internet_access, + } + dynamic_sidecar_network_id = await create_network(network_config) + + # attach the service to the swarm network dedicated to services + swarm_network: dict[str, Any] = await get_swarm_network( + dynamic_services_scheduler_settings.SIMCORE_SERVICES_NETWORK_NAME + ) + swarm_network_id: NetworkId = swarm_network["Id"] + swarm_network_name: str = swarm_network["Name"] + + metrics_collection_allowed: bool = await get_allow_metrics_collection( + app, + user_id=scheduler_data.user_id, + product_name=scheduler_data.product_name, + ) + + # start dynamic-sidecar and run the proxy on the same node + + # Each time a new dynamic-sidecar service is created + # generate a new `run_id` to avoid resource collisions + scheduler_data.run_id = RunID.create() + + # telemetry configuration + is_telemetry_enabled = await groups_extra_properties.is_telemetry_enabled( + user_id=scheduler_data.user_id, product_name=scheduler_data.product_name + ) + + # WARNING: do NOT log, this structure has secrets in the open + # If you want to log, please use an obfuscator + dynamic_sidecar_service_spec_base: AioDockerServiceSpec = get_dynamic_sidecar_spec( + scheduler_data=scheduler_data, + dynamic_sidecar_settings=dynamic_sidecar_settings, + dynamic_services_scheduler_settings=dynamic_services_scheduler_settings, + swarm_network_id=swarm_network_id, + settings=settings, + app_settings=app.state.settings, + hardware_info=scheduler_data.hardware_info, + has_quota_support=dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS, + allow_internet_access=allow_internet_access, + metrics_collection_allowed=metrics_collection_allowed, + telemetry_enabled=is_telemetry_enabled, + ) + + catalog_client = CatalogClient.instance(app) + user_specific_service_spec = ( + await catalog_client.get_service_specifications( + scheduler_data.user_id, scheduler_data.key, scheduler_data.version + ) + ).get("sidecar", {}) or {} + user_specific_service_spec = AioDockerServiceSpec.parse_obj( + user_specific_service_spec + ) + # NOTE: since user_specific_service_spec follows Docker Service Spec and not Aio + # we do not use aliases when exporting dynamic_sidecar_service_spec_base + dynamic_sidecar_service_final_spec = AioDockerServiceSpec.parse_obj( + nested_update( + jsonable_encoder(dynamic_sidecar_service_spec_base, exclude_unset=True), + jsonable_encoder(user_specific_service_spec, exclude_unset=True), + include=_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS, + ) + ) + rabbit_message = ProgressRabbitMessageNode( + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + progress_type=ProgressType.SIDECARS_PULLING, + progress=0, + ) + await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message) + dynamic_sidecar_id = await create_service_and_get_id( + dynamic_sidecar_service_final_spec + ) + # constrain service to the same node + scheduler_data.dynamic_sidecar.docker_node_id = ( + await get_dynamic_sidecar_placement( + dynamic_sidecar_id, dynamic_services_scheduler_settings + ) + ) + + rabbit_message = ProgressRabbitMessageNode( + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + progress_type=ProgressType.SIDECARS_PULLING, + progress=1, + ) + await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message) + + await constrain_service_to_node( + service_name=scheduler_data.service_name, + docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id, + ) + + # update service_port and assign it to the status + # needed by CreateUserServices action + scheduler_data.service_port = extract_service_port_service_settings(settings) + + await _create_proxy_service( + app, + scheduler_data=scheduler_data, + dynamic_sidecar_network_id=dynamic_sidecar_network_id, + swarm_network_id=swarm_network_id, + swarm_network_name=swarm_network_name, + ) + + # finally mark services created + scheduler_data.dynamic_sidecar.dynamic_sidecar_id = dynamic_sidecar_id + scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id = ( + dynamic_sidecar_network_id + ) + scheduler_data.dynamic_sidecar.swarm_network_id = swarm_network_id + scheduler_data.dynamic_sidecar.swarm_network_name = swarm_network_name + scheduler_data.dynamic_sidecar.was_dynamic_sidecar_started = True diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py index 28bcaa97e8e..3e5de8cdb9b 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py @@ -1,72 +1,29 @@ # pylint: disable=relative-beyond-top-level import logging -from typing import Any, Final +from typing import Any from fastapi import FastAPI -from fastapi.encoders import jsonable_encoder -from models_library.aiodocker_api import AioDockerServiceSpec -from models_library.projects import ProjectAtDB -from models_library.projects_nodes import Node -from models_library.projects_nodes_io import NodeIDStr -from models_library.rabbitmq_messages import ( - InstrumentationRabbitMessage, - ProgressRabbitMessageNode, - ProgressType, -) -from models_library.service_settings_labels import SimcoreServiceSettingsLabel -from models_library.services import RunID from servicelib.fastapi.http_client_thin import BaseHttpClientError -from servicelib.json_serialization import json_dumps -from servicelib.rabbitmq import RabbitMQClient -from simcore_postgres_database.models.comp_tasks import NodeClass -from .....core.dynamic_services_settings import DynamicServicesSettings -from .....core.dynamic_services_settings.proxy import DynamicSidecarProxySettings from .....core.dynamic_services_settings.scheduler import ( DynamicServicesSchedulerSettings, ) -from .....core.dynamic_services_settings.sidecar import ( - DynamicSidecarSettings, - PlacementSettings, -) from .....models.dynamic_services_scheduler import ( DockerContainerInspect, DockerStatus, DynamicSidecarStatus, - NetworkId, SchedulerData, ) -from .....utils.db import get_repository -from .....utils.dict_utils import nested_update -from ....catalog import CatalogClient -from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository -from ....db.repositories.projects import ProjectsRepository -from ....director_v0 import DirectorV0Client from ...api_client import get_dynamic_sidecar_service_health, get_sidecars_client -from ...docker_api import ( - constrain_service_to_node, - create_network, - create_service_and_get_id, - get_dynamic_sidecar_placement, - get_swarm_network, - is_dynamic_sidecar_stack_missing, -) -from ...docker_service_specs import ( - extract_service_port_service_settings, - get_dynamic_proxy_spec, - get_dynamic_sidecar_spec, - merge_settings_before_use, -) from ...errors import UnexpectedContainerStatusError from ._abc import DynamicSchedulerEvent +from ._event_create_sidecars import CreateSidecars from ._events_user_services import create_user_services from ._events_utils import ( are_all_user_services_containers_running, attach_project_networks, attempt_pod_removal_and_data_saving, - get_allow_metrics_collection, - get_director_v0_client, parse_containers_inspect, prepare_services_environment, wait_for_sidecar_api, @@ -74,238 +31,10 @@ _logger = logging.getLogger(__name__) -_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS: Final[tuple[list[str], ...]] = ( - ["labels"], - ["task_template", "Resources", "Limits"], - ["task_template", "Resources", "Reservation", "MemoryBytes"], - ["task_template", "Resources", "Reservation", "NanoCPUs"], - ["task_template", "Placement", "Constraints"], - ["task_template", "ContainerSpec", "Env"], - ["task_template", "Resources", "Reservation", "GenericResources"], -) _EXPECTED_STATUSES: set[DockerStatus] = {DockerStatus.created, DockerStatus.running} -class CreateSidecars(DynamicSchedulerEvent): - """Created the dynamic-sidecar and the proxy.""" - - @classmethod - async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: - # the call to is_dynamic_sidecar_stack_missing is expensive - # if the dynamic sidecar was started skip - if scheduler_data.dynamic_sidecar.was_dynamic_sidecar_started: - return False - - settings: DynamicServicesSchedulerSettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER - ) - return await is_dynamic_sidecar_stack_missing( - node_uuid=scheduler_data.node_uuid, - swarm_stack_name=settings.SWARM_STACK_NAME, - ) - - @classmethod - async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: - # instrumentation - message = InstrumentationRabbitMessage( - metrics="service_started", - user_id=scheduler_data.user_id, - project_id=scheduler_data.project_id, - node_id=scheduler_data.node_uuid, - service_uuid=scheduler_data.node_uuid, - service_type=NodeClass.INTERACTIVE.value, - service_key=scheduler_data.key, - service_tag=scheduler_data.version, - simcore_user_agent=scheduler_data.request_simcore_user_agent, - ) - rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client - await rabbitmq_client.publish(message.channel_name, message) - - dynamic_sidecar_settings: DynamicSidecarSettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR - ) - dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER - ) - dynamic_services_placement_settings: PlacementSettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_PLACEMENT_SETTINGS - ) - - # the dynamic-sidecar should merge all the settings, especially: - # resources and placement derived from all the images in - # the provided docker-compose spec - # also other encodes the env vars to target the proper container - director_v0_client: DirectorV0Client = get_director_v0_client(app) - # fetching project form DB and fetching user settings - projects_repository = get_repository(app, ProjectsRepository) - - project: ProjectAtDB = await projects_repository.get_project( - project_id=scheduler_data.project_id - ) - - node_uuid_str = NodeIDStr(scheduler_data.node_uuid) - node: Node | None = project.workbench.get(node_uuid_str) - boot_options = ( - node.boot_options - if node is not None and node.boot_options is not None - else {} - ) - _logger.info("%s", f"{boot_options=}") - - settings: SimcoreServiceSettingsLabel = await merge_settings_before_use( - director_v0_client=director_v0_client, - service_key=scheduler_data.key, - service_tag=scheduler_data.version, - service_user_selection_boot_options=boot_options, - service_resources=scheduler_data.service_resources, - placement_substitutions=dynamic_services_placement_settings.DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS, - ) - - groups_extra_properties = get_repository(app, GroupsExtraPropertiesRepository) - - assert scheduler_data.product_name is not None # nosec - allow_internet_access: bool = await groups_extra_properties.has_internet_access( - user_id=scheduler_data.user_id, product_name=scheduler_data.product_name - ) - - network_config = { - "Name": scheduler_data.dynamic_sidecar_network_name, - "Driver": "overlay", - "Labels": { - "io.simcore.zone": f"{dynamic_services_scheduler_settings.TRAEFIK_SIMCORE_ZONE}", - "com.simcore.description": f"interactive for node: {scheduler_data.node_uuid}", - "uuid": f"{scheduler_data.node_uuid}", # needed for removal when project is closed - }, - "Attachable": True, - "Internal": not allow_internet_access, - } - dynamic_sidecar_network_id = await create_network(network_config) - - # attach the service to the swarm network dedicated to services - swarm_network: dict[str, Any] = await get_swarm_network( - dynamic_services_scheduler_settings.SIMCORE_SERVICES_NETWORK_NAME - ) - swarm_network_id: NetworkId = swarm_network["Id"] - swarm_network_name: str = swarm_network["Name"] - - metrics_collection_allowed: bool = await get_allow_metrics_collection( - app, - user_id=scheduler_data.user_id, - product_name=scheduler_data.product_name, - ) - - # start dynamic-sidecar and run the proxy on the same node - - # Each time a new dynamic-sidecar service is created - # generate a new `run_id` to avoid resource collisions - scheduler_data.run_id = RunID.create() - - # WARNING: do NOT log, this structure has secrets in the open - # If you want to log, please use an obfuscator - dynamic_sidecar_service_spec_base: AioDockerServiceSpec = get_dynamic_sidecar_spec( - scheduler_data=scheduler_data, - dynamic_sidecar_settings=dynamic_sidecar_settings, - dynamic_services_scheduler_settings=dynamic_services_scheduler_settings, - swarm_network_id=swarm_network_id, - settings=settings, - app_settings=app.state.settings, - hardware_info=scheduler_data.hardware_info, - has_quota_support=dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS, - allow_internet_access=allow_internet_access, - metrics_collection_allowed=metrics_collection_allowed, - ) - - catalog_client = CatalogClient.instance(app) - user_specific_service_spec = ( - await catalog_client.get_service_specifications( - scheduler_data.user_id, scheduler_data.key, scheduler_data.version - ) - ).get("sidecar", {}) or {} - user_specific_service_spec = AioDockerServiceSpec.parse_obj( - user_specific_service_spec - ) - # NOTE: since user_specific_service_spec follows Docker Service Spec and not Aio - # we do not use aliases when exporting dynamic_sidecar_service_spec_base - dynamic_sidecar_service_final_spec = AioDockerServiceSpec.parse_obj( - nested_update( - jsonable_encoder(dynamic_sidecar_service_spec_base, exclude_unset=True), - jsonable_encoder(user_specific_service_spec, exclude_unset=True), - include=_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS, - ) - ) - rabbit_message = ProgressRabbitMessageNode( - user_id=scheduler_data.user_id, - project_id=scheduler_data.project_id, - node_id=scheduler_data.node_uuid, - progress_type=ProgressType.SIDECARS_PULLING, - progress=0, - ) - await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message) - dynamic_sidecar_id = await create_service_and_get_id( - dynamic_sidecar_service_final_spec - ) - # constrain service to the same node - scheduler_data.dynamic_sidecar.docker_node_id = ( - await get_dynamic_sidecar_placement( - dynamic_sidecar_id, dynamic_services_scheduler_settings - ) - ) - rabbit_message = ProgressRabbitMessageNode( - user_id=scheduler_data.user_id, - project_id=scheduler_data.project_id, - node_id=scheduler_data.node_uuid, - progress_type=ProgressType.SIDECARS_PULLING, - progress=1, - ) - await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message) - - await constrain_service_to_node( - service_name=scheduler_data.service_name, - docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id, - ) - - # update service_port and assign it to the status - # needed by CreateUserServices action - scheduler_data.service_port = extract_service_port_service_settings(settings) - - proxy_settings: DynamicSidecarProxySettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR_PROXY_SETTINGS - ) - scheduler_data.proxy_admin_api_port = ( - proxy_settings.DYNAMIC_SIDECAR_CADDY_ADMIN_API_PORT - ) - - dynamic_services_settings: DynamicServicesSettings = ( - app.state.settings.DYNAMIC_SERVICES - ) - - dynamic_sidecar_proxy_create_service_params: dict[ - str, Any - ] = get_dynamic_proxy_spec( - scheduler_data=scheduler_data, - dynamic_services_settings=dynamic_services_settings, - dynamic_sidecar_network_id=dynamic_sidecar_network_id, - swarm_network_id=swarm_network_id, - swarm_network_name=swarm_network_name, - ) - _logger.debug( - "dynamic-sidecar-proxy create_service_params %s", - json_dumps(dynamic_sidecar_proxy_create_service_params), - ) - - await create_service_and_get_id(dynamic_sidecar_proxy_create_service_params) - - # finally mark services created - scheduler_data.dynamic_sidecar.dynamic_sidecar_id = dynamic_sidecar_id - scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id = ( - dynamic_sidecar_network_id - ) - scheduler_data.dynamic_sidecar.swarm_network_id = swarm_network_id - scheduler_data.dynamic_sidecar.swarm_network_name = swarm_network_name - scheduler_data.dynamic_sidecar.was_dynamic_sidecar_started = True - - class WaitForSidecarAPI(DynamicSchedulerEvent): """ Waits for the sidecar to start and respond to API calls. diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py index 8117f0d9f60..f79d734101b 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py @@ -2,7 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -from typing import Any +from typing import Any, Final import pytest from simcore_service_director_v2.core.settings import AppSettings @@ -12,7 +12,7 @@ ) # PLEASE keep alphabetical to simplify debugging -EXPECTED_DYNAMIC_SIDECAR_ENV_VAR_NAMES = { +EXPECTED_DYNAMIC_SIDECAR_ENV_VAR_NAMES: Final[set[str]] = { "DY_SIDECAR_CALLBACKS_MAPPING", "DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED", "DY_SIDECAR_NODE_ID", @@ -25,11 +25,13 @@ "DY_SIDECAR_SERVICE_VERSION", "DY_SIDECAR_STATE_EXCLUDE", "DY_SIDECAR_STATE_PATHS", + "DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE", "DY_SIDECAR_USER_ID", "DY_SIDECAR_USER_PREFERENCES_PATH", "DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS", "DYNAMIC_SIDECAR_COMPOSE_NAMESPACE", "DYNAMIC_SIDECAR_LOG_LEVEL", + "NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS", "POSTGRES_DB", "POSTGRES_ENDPOINT", "POSTGRES_HOST", @@ -54,7 +56,6 @@ "S3_ACCESS_KEY", "S3_BUCKET_NAME", "S3_ENDPOINT", - "NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS", "S3_SECRET_KEY", "S3_SECURE", "SC_BOOT_MODE", @@ -78,6 +79,7 @@ def test_dynamic_sidecar_env_vars( app_settings, allow_internet_access=False, metrics_collection_allowed=True, + telemetry_enabled=True, ) print("dynamic_sidecar_env_vars:", dynamic_sidecar_env_vars) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index 70f36654737..45522ba326b 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -224,6 +224,7 @@ def expected_dynamic_sidecar_spec( ), "DY_SIDECAR_USER_ID": "234", "DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS": "False", + "DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": "True", "FORWARD_ENV_DISPLAY": ":0", "NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS": "3", "DYNAMIC_SIDECAR_LOG_LEVEL": "DEBUG", @@ -437,6 +438,7 @@ def test_get_dynamic_proxy_spec( has_quota_support=False, allow_internet_access=False, metrics_collection_allowed=True, + telemetry_enabled=True, ) exclude_keys: Mapping[int | str, Any] = { @@ -530,6 +532,7 @@ async def test_merge_dynamic_sidecar_specs_with_user_specific_specs( has_quota_support=False, allow_internet_access=False, metrics_collection_allowed=True, + telemetry_enabled=True, ) assert dynamic_sidecar_spec dynamic_sidecar_spec_dict = dynamic_sidecar_spec.dict() diff --git a/services/dynamic-sidecar/requirements/_base.in b/services/dynamic-sidecar/requirements/_base.in index fccb80d6e06..251ca3cedfc 100644 --- a/services/dynamic-sidecar/requirements/_base.in +++ b/services/dynamic-sidecar/requirements/_base.in @@ -34,6 +34,7 @@ httpx psutil pydantic python-magic # file type identification library. See 'magic.from_file(...)' NOTE: requires `libmagic`` installed +python-socketio PyYAML u-msgpack-python uvicorn diff --git a/services/dynamic-sidecar/requirements/_base.txt b/services/dynamic-sidecar/requirements/_base.txt index 5cb350b7f1e..886e0f5a1e1 100644 --- a/services/dynamic-sidecar/requirements/_base.txt +++ b/services/dynamic-sidecar/requirements/_base.txt @@ -90,6 +90,8 @@ attrs==23.1.0 # aiohttp # jsonschema # referencing +bidict==0.22.1 + # via python-socketio certifi==2023.7.22 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -144,6 +146,7 @@ fastapi==0.99.1 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_fastapi.in # -r requirements/_base.in + # prometheus-fastapi-instrumentator frozenlist==1.4.0 # via # aiohttp @@ -154,6 +157,7 @@ h11==0.14.0 # via # httpcore # uvicorn + # wsproto httpcore==0.17.3 # via httpx httpx==0.24.1 @@ -235,6 +239,12 @@ pamqp==3.2.1 # via aiormq pint==0.22 # via -r requirements/../../../packages/simcore-sdk/requirements/_base.in +prometheus-client==0.19.0 + # via + # -r requirements/../../../packages/service-library/requirements/_fastapi.in + # prometheus-fastapi-instrumentator +prometheus-fastapi-instrumentator==6.1.0 + # via -r requirements/../../../packages/service-library/requirements/_fastapi.in psutil==5.9.5 # via -r requirements/_base.in psycopg2-binary==2.9.6 @@ -285,8 +295,12 @@ pyinstrument==4.5.1 # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in python-dateutil==2.8.2 # via arrow +python-engineio==4.8.2 + # via python-socketio python-magic==0.4.27 # via -r requirements/_base.in +python-socketio==5.11.0 + # via -r requirements/_base.in pyyaml==6.0.1 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -350,6 +364,8 @@ rpds-py==0.9.2 # via # jsonschema # referencing +simple-websocket==1.0.0 + # via python-engineio six==1.16.0 # via python-dateutil sniffio==1.3.0 @@ -442,6 +458,8 @@ uvicorn==0.23.1 # -r requirements/_base.in watchdog==3.0.0 # via -r requirements/_base.in +wsproto==1.2.0 + # via simple-websocket yarl==1.9.2 # via # -r requirements/../../../packages/postgres-database/requirements/_base.in diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index 5bf7df66a7c..20bbc74e792 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -23,6 +23,7 @@ from ..modules.outputs import setup_outputs from ..modules.prometheus_metrics import setup_prometheus_metrics from ..modules.resource_tracking import setup_resource_tracking +from ..modules.system_monitor import setup_system_monitor from ..modules.user_services_preferences import setup_user_services_preferences from .docker_compose_utils import docker_compose_down from .docker_logs import setup_background_log_fetcher @@ -158,6 +159,7 @@ def create_app(): setup_rabbitmq(app) setup_background_log_fetcher(app) setup_resource_tracking(app) + setup_system_monitor(app) # also sets up mounted_volumes setup_mounted_fs(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py index 2411072ec0b..369b8d73fcd 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py @@ -30,6 +30,12 @@ class ResourceTrackingSettings(BaseCustomSettings): ) +class SystemMonitorSettings(BaseCustomSettings): + DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE: bool = Field( + default=False, description="enabled/disabled disk usage monitoring" + ) + + class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): SC_BOOT_MODE: BootModeEnum = Field( ..., @@ -85,6 +91,11 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): ), ) + DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL: timedelta = Field( + default=timedelta(seconds=5), + description="time between checks for disk usage", + ) + DEBUG: bool = Field( default=False, description="If set to True the application will boot into debug mode", @@ -138,6 +149,8 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): DY_SIDECAR_SERVICE_VERSION: ServiceVersion | None = None DY_SIDECAR_PRODUCT_NAME: ProductName | None = None + SYSTEM_MONITOR_SETTINGS: SystemMonitorSettings = Field(auto_default_from_env=True) + REGISTRY_SETTINGS: RegistrySettings = Field(auto_default_from_env=True) RABBIT_SETTINGS: RabbitSettings | None = Field(auto_default_from_env=True) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py new file mode 100644 index 00000000000..18382a30045 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/__init__.py @@ -0,0 +1,3 @@ +from ._setup import setup_system_monitor + +__all__: tuple[str, ...] = ("setup_system_monitor",) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py new file mode 100644 index 00000000000..1ecc04fdaea --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py @@ -0,0 +1,99 @@ +import asyncio +import logging +from dataclasses import dataclass, field +from datetime import timedelta +from pathlib import Path + +import psutil +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.logging_utils import log_context +from servicelib.utils import logged_gather + +from ...core.settings import ApplicationSettings +from ..mounted_fs import MountedVolumes +from ._notifier import publish_disk_usage + +_logger = logging.getLogger(__name__) + + +async def get_usage(path: Path) -> DiskUsage: + usage = await asyncio.get_event_loop().run_in_executor( + None, psutil.disk_usage, f"{path}" + ) + return DiskUsage.from_ps_util_disk_usage(usage) + + +@dataclass +class DiskUsageMonitor: + app: FastAPI + user_id: UserID + node_id: NodeID + interval: timedelta + monitored_paths: list[Path] + _monitor_task: asyncio.Task | None = None + _last_usage: dict[Path, DiskUsage] = field(default_factory=dict) + + async def _publish_disk_usage(self, usage: dict[Path, DiskUsage]): + await publish_disk_usage( + self.app, user_id=self.user_id, node_id=self.node_id, usage=usage + ) + + async def _monitor(self) -> None: + disk_usages: list[DiskUsage] = await logged_gather( + *[get_usage(monitored_path) for monitored_path in self.monitored_paths] + ) + + usage: dict[Path, DiskUsage] = dict( + zip(self.monitored_paths, disk_usages, strict=True) + ) + + # notify only when changed + if self._last_usage != usage: + await self._publish_disk_usage(usage) + self._last_usage = usage + + async def setup(self) -> None: + self._monitor_task = start_periodic_task( + self._monitor, interval=self.interval, task_name="monitor_disk_usage" + ) + + async def shutdown(self) -> None: + if self._monitor_task: + await stop_periodic_task(self._monitor_task) + + +def _get_monitored_paths(app: FastAPI) -> list[Path]: + mounted_volumes: MountedVolumes = app.state.mounted_volumes + return [ + Path("/"), # root file system and /tmp usage mainly + *list(mounted_volumes.all_disk_paths_iter()), + ] + + +def setup_disk_usage(app: FastAPI) -> None: + async def on_startup() -> None: + with log_context(_logger, logging.INFO, "setup disk monitor"): + settings: ApplicationSettings = app.state.settings + + app.state.disk_usage_monitor = disk_usage_monitor = DiskUsageMonitor( + app, + user_id=settings.DY_SIDECAR_USER_ID, + node_id=settings.DY_SIDECAR_NODE_ID, + interval=settings.DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL, + monitored_paths=_get_monitored_paths(app), + ) + await disk_usage_monitor.setup() + + async def on_shutdown() -> None: + with log_context(_logger, logging.INFO, "shutdown disk monitor"): + if disk_usage_monitor := getattr( # noqa: B009 + app.state, "disk_usage_monitor" + ): + await disk_usage_monitor.shutdown() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py new file mode 100644 index 00000000000..3eb6c8fcc50 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py @@ -0,0 +1,60 @@ +import contextlib +from pathlib import Path + +import socketio +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_dynamic_sidecar.socketio import ( + SOCKET_IO_SERVICE_DISK_USAGE_EVENT, +) +from models_library.api_schemas_dynamic_sidecar.telemetry import ( + DiskUsage, + ServiceDiskUsage, +) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from servicelib.fastapi.app_state import SingletonInAppStateMixin + + +class Notifier(SingletonInAppStateMixin): + app_state_name: str = "notifier" + + def __init__(self, sio_manager: socketio.AsyncAioPikaManager): + self._sio_manager = sio_manager + + async def notify_service_disk_usage( + self, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_SERVICE_DISK_USAGE_EVENT, + data=jsonable_encoder(ServiceDiskUsage(node_id=node_id, usage=usage)), + room=SocketIORoomStr.from_user_id(user_id), + ) + + +async def publish_disk_usage( + app: FastAPI, *, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] +) -> None: + notifier: Notifier = Notifier.get_from_app_state(app) + await notifier.notify_service_disk_usage( + user_id=user_id, node_id=node_id, usage=usage + ) + + +def setup_notifier(app: FastAPI): + async def _on_startup() -> None: + assert app.state.external_socketio # nosec + + notifier = Notifier( + sio_manager=app.state.external_socketio, + ) + notifier.set_to_app_state(app) + assert Notifier.get_from_app_state(app) == notifier # nosec + + async def _on_shutdown() -> None: + with contextlib.suppress(AttributeError): + Notifier.pop_from_app_state(app) + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py new file mode 100644 index 00000000000..e460f7a9ee3 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py @@ -0,0 +1,24 @@ +import logging + +from fastapi import FastAPI +from servicelib.logging_utils import log_context + +from ...core.settings import SystemMonitorSettings +from ._disk_usage import setup_disk_usage +from ._notifier import setup_notifier +from ._socketio import setup_socketio + +_logger = logging.getLogger(__name__) + + +def setup_system_monitor(app: FastAPI) -> None: + with log_context(_logger, logging.INFO, "setup system monitor"): + settings: SystemMonitorSettings = app.state.settings.SYSTEM_MONITOR_SETTINGS + + if not settings.DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE: + _logger.warning("system monitor disabled") + return + + setup_socketio(app) # required by notifier + setup_notifier(app) + setup_disk_usage(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py new file mode 100644 index 00000000000..0bb2f808059 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py @@ -0,0 +1,32 @@ +import logging + +import socketio +from fastapi import FastAPI +from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager + +from ...core.settings import ApplicationSettings + +_logger = logging.getLogger(__name__) + + +def setup_socketio(app: FastAPI): + settings: ApplicationSettings = app.state.settings + + async def _on_startup() -> None: + assert app.state.rabbitmq_client # nosec + + # Connect to the as an external process in write-only mode + # SEE https://python-socketio.readthedocs.io/en/stable/server.html#emitting-from-external-processes + assert settings.RABBIT_SETTINGS # nosec + app.state.external_socketio = socketio.AsyncAioPikaManager( + url=settings.RABBIT_SETTINGS.dsn, logger=_logger, write_only=True + ) + + async def _on_shutdown() -> None: + if external_socketio := getattr(app.state, "external_socketio"): # noqa: B009 + await cleanup_socketio_async_pubsub_manager( + server_manager=external_socketio + ) + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index 1f42e9cd3c8..9b451ff4be0 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -38,6 +38,7 @@ "pytest_simcore.docker_swarm", "pytest_simcore.minio_service", "pytest_simcore.pytest_global_environs", + "pytest_simcore.pytest_socketio", "pytest_simcore.rabbit_service", "pytest_simcore.repository_paths", "pytest_simcore.simcore_service_library_fixtures", diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py new file mode 100644 index 00000000000..2c4c3c6a125 --- /dev/null +++ b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__disk_usage.py @@ -0,0 +1,131 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from collections.abc import Callable +from datetime import timedelta +from pathlib import Path +from unittest.mock import AsyncMock, Mock + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from psutil._common import sdiskusage +from pydantic import ByteSize +from pytest_mock import MockerFixture +from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( + DiskUsageMonitor, + get_usage, +) + + +@pytest.fixture +def mock_disk_usage(mocker: MockerFixture) -> Callable[[dict[str, ByteSize]], None]: + base_module = "simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage" + + def _(free: dict[str, ByteSize]) -> None: + def _disk_usage(path: str) -> sdiskusage: + return sdiskusage(total=0, used=0, free=free[path], percent=0) + + mocker.patch(f"{base_module}.psutil.disk_usage", side_effect=_disk_usage) + + return _ + + +@pytest.fixture +def publish_disk_usage_spy(mocker: MockerFixture) -> Mock: + mock = Mock() + + def __publish_disk_usage( + app: FastAPI, + *, + user_id: UserID, + node_id: NodeID, + usage: dict[Path, DiskUsage], + ) -> None: + mock(usage) + + mocker.patch( + "simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage.publish_disk_usage", + side_effect=__publish_disk_usage, + ) + return mock + + +def _get_entry(mock: Mock, *, index: int) -> dict[Path, DiskUsage]: + return mock.call_args_list[index].args[0] + + +def _get_byte_size(byte_size_as_str: str) -> ByteSize: + return ByteSize.validate(byte_size_as_str) + + +def _get_mocked_disk_usage(byte_size_as_str: str) -> DiskUsage: + bytes_size = _get_byte_size(byte_size_as_str) + return DiskUsage( + total=bytes_size, used=ByteSize(0), free=bytes_size, used_percent=0 + ) + + +async def _assert_monitor_triggers( + disk_usage_monitor: DiskUsageMonitor, + publish_disk_usage_spy: Mock, + *, + expected_events: int, + monitor_call_count: int = 10, +) -> None: + for _ in range(monitor_call_count): + # regardless of how many times it's called only generates 1 publish event + await disk_usage_monitor._monitor() # pylint:disable=protected-access # noqa: SLF001 + assert len(publish_disk_usage_spy.call_args_list) == expected_events + + +async def test_disk_usage_monitor( + mock_disk_usage: Callable[[dict[str, ByteSize]], None], + publish_disk_usage_spy: Mock, + faker: Faker, +) -> None: + disk_usage_monitor = DiskUsageMonitor( + app=AsyncMock(), + user_id=1, + node_id=faker.uuid4(), + interval=timedelta(seconds=5), + monitored_paths=[Path("/"), Path("/tmp")], # noqa: S108 + ) + + assert len(publish_disk_usage_spy.call_args_list) == 0 + + for i in range(1, 3): + mock_disk_usage( + { + "/": _get_byte_size(f"{i}kb"), + "/tmp": _get_byte_size(f"{i*2}kb"), # noqa: S108 + } + ) + + await _assert_monitor_triggers( + disk_usage_monitor, publish_disk_usage_spy, expected_events=1 + ) + + assert _get_entry(publish_disk_usage_spy, index=0) == { + Path("/"): _get_mocked_disk_usage(f"{i}kb"), + Path("/tmp"): _get_mocked_disk_usage(f"{i*2}kb"), # noqa: S108 + } + + # reset mock to test again + publish_disk_usage_spy.reset_mock() + + +def _random_tmp_file(tmp_path: Path, faker: Faker) -> None: + some_path: Path = tmp_path / faker.uuid4() + some_path.write_text("some text here") + + +async def test_get_usage(tmp_path: Path, faker: Faker): + usage_before = await get_usage(Path("/")) + _random_tmp_file(tmp_path, faker) + usage_after = await get_usage(Path("/")) + + assert usage_after.free < usage_before.free diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py new file mode 100644 index 00000000000..585b861f6bb --- /dev/null +++ b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py @@ -0,0 +1,188 @@ +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +from collections.abc import AsyncIterable, Callable +from contextlib import AsyncExitStack, _AsyncGeneratorContextManager +from pathlib import Path +from unittest.mock import AsyncMock + +import pytest +import socketio +from asgi_lifespan import LifespanManager +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_dynamic_sidecar.socketio import ( + SOCKET_IO_SERVICE_DISK_USAGE_EVENT, +) +from models_library.api_schemas_dynamic_sidecar.telemetry import ( + DiskUsage, + ServiceDiskUsage, +) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from pydantic import ByteSize, NonNegativeInt, parse_obj_as +from pytest_mock import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from servicelib.utils import logged_gather +from settings_library.rabbit import RabbitSettings +from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings +from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( + DiskUsageMonitor, +) +from simcore_service_dynamic_sidecar.modules.system_monitor._notifier import ( + publish_disk_usage, +) +from socketio import AsyncServer +from tenacity import AsyncRetrying +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_fixed + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def mock_environment( + monkeypatch: pytest.MonkeyPatch, + mock_environment: EnvVarsDict, + rabbit_service: RabbitSettings, +) -> EnvVarsDict: + setenvs_from_dict( + monkeypatch, {"DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": "true"} + ) + return mock_environment + + +@pytest.fixture +async def app(mocker: MockerFixture, app: FastAPI) -> AsyncIterable[FastAPI]: + mocker.patch( + "simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage._get_monitored_paths", + return_value=[], + ) + async with LifespanManager(app): + yield app + + +@pytest.fixture +async def disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor: + return app.state.disk_usage_monitor + + +@pytest.fixture +async def socketio_server( + app: FastAPI, + socketio_server_factory: Callable[ + [RabbitSettings], _AsyncGeneratorContextManager[AsyncServer] + ], +) -> AsyncIterable[AsyncServer]: + # Same configuration as simcore_service_webserver/socketio/server.py + settings: ApplicationSettings = app.state.settings + assert settings.RABBIT_SETTINGS + + async with socketio_server_factory(settings.RABBIT_SETTINGS) as server: + yield server + + +@pytest.fixture +def room_name(user_id: UserID) -> SocketIORoomStr: + return SocketIORoomStr.from_user_id(user_id) + + +def _get_on_service_disk_usage_event( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(dict[Path, DiskUsage], data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_SERVICE_DISK_USAGE_EVENT, on_event_spy) + + return on_event_spy + + +async def _assert_call_count(mock: AsyncMock, *, call_count: int) -> None: + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), stop=stop_after_attempt(5000), reraise=True + ): + with attempt: + assert mock.call_count == call_count + + +def _get_mocked_disk_usage(byte_size_str: str) -> DiskUsage: + return DiskUsage( + total=ByteSize(0), + used=ByteSize(0), + free=ByteSize.validate(byte_size_str), + used_percent=0, + ) + + +@pytest.mark.parametrize( + "usage", + [ + pytest.param({}, id="empty"), + pytest.param({Path("/"): _get_mocked_disk_usage("1kb")}, id="one_entry"), + pytest.param( + { + Path("/"): _get_mocked_disk_usage("1kb"), + Path("/tmp"): _get_mocked_disk_usage("2kb"), # noqa: S108 + }, + id="two_entries", + ), + ], +) +async def test_notifier_publish_message( + disk_usage_monitor: DiskUsageMonitor, + socketio_server_events: dict[str, AsyncMock], + app: FastAPI, + user_id: UserID, + usage: dict[Path, DiskUsage], + node_id: NodeID, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + number_of_clients: NonNegativeInt = 10 + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(number_of_clients) + ] + ) + await _assert_call_count(server_connect, call_count=number_of_clients) + + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=number_of_clients) + + # attach spy to client + on_service_disk_usage_events: list[AsyncMock] = [ + _get_on_service_disk_usage_event(c) for c in frontend_clients + ] + + # server publishes a message + await publish_disk_usage(app, user_id=user_id, node_id=node_id, usage=usage) + + # check that all clients received it + for on_service_disk_usage_event in on_service_disk_usage_events: + await _assert_call_count(on_service_disk_usage_event, call_count=1) + on_service_disk_usage_event.assert_awaited_once_with( + jsonable_encoder(ServiceDiskUsage(node_id=node_id, usage=usage)) + ) + + await _assert_call_count(server_disconnect, call_count=number_of_clients) diff --git a/services/payments/requirements/_test.in b/services/payments/requirements/_test.in index e67da6f24e9..32d6384e0d0 100644 --- a/services/payments/requirements/_test.in +++ b/services/payments/requirements/_test.in @@ -17,7 +17,6 @@ docker faker jsonref pytest -pytest-aiohttp pytest-asyncio pytest-cov pytest-icdiff diff --git a/services/payments/requirements/_test.txt b/services/payments/requirements/_test.txt index 9b76509ff98..a1847d847dd 100644 --- a/services/payments/requirements/_test.txt +++ b/services/payments/requirements/_test.txt @@ -8,7 +8,6 @@ aiohttp==3.8.6 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt - # pytest-aiohttp # python-socketio aiosignal==1.3.1 # via @@ -107,18 +106,13 @@ pprintpp==0.4.0 pytest==7.4.3 # via # -r requirements/_test.in - # pytest-aiohttp # pytest-asyncio # pytest-cov # pytest-icdiff # pytest-mock # pytest-sugar -pytest-aiohttp==1.0.5 - # via -r requirements/_test.in pytest-asyncio==0.21.1 - # via - # -r requirements/_test.in - # pytest-aiohttp + # via -r requirements/_test.in pytest-cov==4.1.0 # via -r requirements/_test.in pytest-icdiff==0.8 diff --git a/services/payments/src/simcore_service_payments/services/notifier.py b/services/payments/src/simcore_service_payments/services/notifier.py index b1c6b64fe38..96f47d96061 100644 --- a/services/payments/src/simcore_service_payments/services/notifier.py +++ b/services/payments/src/simcore_service_payments/services/notifier.py @@ -8,7 +8,7 @@ SOCKET_IO_PAYMENT_COMPLETED_EVENT, SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT, ) -from models_library.api_schemas_webserver.socketio import SocketIORoom +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.api_schemas_webserver.wallets import ( PaymentMethodTransaction, PaymentTransaction, @@ -49,7 +49,7 @@ async def notify_payment_completed( return await self._sio_manager.emit( SOCKET_IO_PAYMENT_COMPLETED_EVENT, data=jsonable_encoder(payment, by_alias=True), - room=SocketIORoom.from_group_id(user_primary_group_id), + room=SocketIORoomStr.from_group_id(user_primary_group_id), ) async def notify_payment_method_acked( @@ -62,7 +62,7 @@ async def notify_payment_method_acked( return await self._sio_manager.emit( SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT, data=jsonable_encoder(payment_method, by_alias=True), - room=SocketIORoom.from_group_id(user_primary_group_id), + room=SocketIORoomStr.from_group_id(user_primary_group_id), ) diff --git a/services/payments/tests/conftest.py b/services/payments/tests/conftest.py index 5c09c597b24..19f7c5a682c 100644 --- a/services/payments/tests/conftest.py +++ b/services/payments/tests/conftest.py @@ -27,6 +27,7 @@ "pytest_simcore.environment_configs", "pytest_simcore.httpbin_service", "pytest_simcore.postgres_service", + "pytest_simcore.pytest_socketio", "pytest_simcore.rabbit_service", "pytest_simcore.repository_paths", "pytest_simcore.tmp_path_extra", diff --git a/services/payments/tests/unit/test_services_notifier.py b/services/payments/tests/unit/test_services_notifier.py index 57b4e5b9432..fcf5f776dd5 100644 --- a/services/payments/tests/unit/test_services_notifier.py +++ b/services/payments/tests/unit/test_services_notifier.py @@ -6,19 +6,18 @@ import asyncio import threading -from collections.abc import AsyncIterable, AsyncIterator, Callable +from collections.abc import AsyncIterable, Callable +from contextlib import _AsyncGeneratorContextManager from unittest.mock import AsyncMock import arrow import pytest import socketio -from aiohttp import web -from aiohttp.test_utils import TestServer from fastapi import FastAPI from models_library.api_schemas_payments.socketio import ( SOCKET_IO_PAYMENT_COMPLETED_EVENT, ) -from models_library.api_schemas_webserver.socketio import SocketIORoom +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.api_schemas_webserver.wallets import PaymentTransaction from models_library.users import GroupID, UserID from pydantic import parse_obj_as @@ -26,22 +25,19 @@ from pytest_simcore.helpers.rawdata_fakers import random_payment_transaction from pytest_simcore.helpers.typing_env import EnvVarsDict from pytest_simcore.helpers.utils_envs import setenvs_from_dict -from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager from settings_library.rabbit import RabbitSettings from simcore_service_payments.models.db import PaymentsTransactionsDB from simcore_service_payments.models.db_to_api import to_payments_api_model from simcore_service_payments.services.notifier import Notifier from simcore_service_payments.services.rabbitmq import get_rabbitmq_settings -from socketio import AsyncAioPikaManager, AsyncServer +from socketio import AsyncServer from tenacity import AsyncRetrying from tenacity.stop import stop_after_attempt from tenacity.wait import wait_fixed -from yarl import URL pytest_simcore_core_services_selection = [ "rabbit", ] -pytest_simcore_ops_services_selection = [] @pytest.fixture @@ -74,114 +70,29 @@ def app_environment( @pytest.fixture -async def socketio_server(app: FastAPI) -> AsyncIterable[AsyncServer]: - # Same configuration as simcore_service_webserver/socketio/server.py - settings: RabbitSettings = get_rabbitmq_settings(app) - server_manager = AsyncAioPikaManager(url=settings.dsn) - - server = AsyncServer( - async_mode="aiohttp", - engineio_logger=True, - client_manager=server_manager, - ) - - yield server - - await cleanup_socketio_async_pubsub_manager(server_manager) - - -@pytest.fixture -def socketio_server_events( - socketio_server: AsyncServer, - mocker: MockerFixture, - user_primary_group_id: GroupID, -) -> dict[str, AsyncMock]: - room_name = SocketIORoom.from_group_id(user_primary_group_id) - - # handlers - async def connect(sid: str, environ): - print("connecting", sid) - await socketio_server.enter_room(sid, room_name) - - async def on_check(sid, data): - print("check", sid, data) - - async def on_payment(sid, data): - print("payment", sid, parse_obj_as(PaymentTransaction, data)) - - async def disconnect(sid: str): - print("disconnecting", sid) - await socketio_server.leave_room(sid, room_name) - - # spies - spy_connect = mocker.AsyncMock(wraps=connect) - socketio_server.on("connect", spy_connect) - - spy_on_payment = mocker.AsyncMock(wraps=on_payment) - socketio_server.on(SOCKET_IO_PAYMENT_COMPLETED_EVENT, spy_on_payment) - - spy_on_check = mocker.AsyncMock(wraps=on_check) - socketio_server.on("check", spy_on_check) - - spy_disconnect = mocker.AsyncMock(wraps=disconnect) - socketio_server.on("disconnect", spy_disconnect) - - return { - connect.__name__: spy_connect, - disconnect.__name__: spy_disconnect, - on_check.__name__: spy_on_check, - on_payment.__name__: spy_on_payment, - } +async def socketio_server( + app: FastAPI, + socketio_server_factory: Callable[ + [RabbitSettings], _AsyncGeneratorContextManager[AsyncServer] + ], +) -> AsyncIterable[AsyncServer]: + async with socketio_server_factory(get_rabbitmq_settings(app)) as server: + yield server @pytest.fixture -async def web_server( - socketio_server: AsyncServer, aiohttp_unused_port: Callable -) -> AsyncIterator[URL]: - """ - this emulates the webserver setup: socketio server with - an aiopika manager that attaches an aiohttp web app - """ - aiohttp_app = web.Application() - socketio_server.attach(aiohttp_app) - - async def _lifespan( - server: TestServer, started: asyncio.Event, teardown: asyncio.Event - ): - # NOTE: this is necessary to avoid blocking comms between client and this server - await server.start_server() - started.set() # notifies started - await teardown.wait() # keeps test0server until needs to close - await server.close() - - setup = asyncio.Event() - teardown = asyncio.Event() - - server = TestServer(aiohttp_app, port=aiohttp_unused_port()) - t = asyncio.create_task(_lifespan(server, setup, teardown), name="server-lifespan") - - await setup.wait() - - yield URL(server.make_url("/")) - - assert t - teardown.set() +def room_name(user_primary_group_id: GroupID) -> SocketIORoomStr: + return SocketIORoomStr.from_group_id(user_primary_group_id) @pytest.fixture -async def server_url(web_server: URL) -> str: - return f'{web_server.with_path("/")}' - - -@pytest.fixture -async def socketio_client(server_url: str) -> AsyncIterable[socketio.AsyncClient]: - """This emulates a socketio client in the front-end""" - client = socketio.AsyncClient(logger=True, engineio_logger=True) - await client.connect(f"{server_url}", transports=["websocket"]) - - yield client - - await client.disconnect() +async def socketio_client( + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], +) -> AsyncIterable[socketio.AsyncClient]: + async with socketio_client_factory() as client: + yield client @pytest.fixture @@ -237,7 +148,6 @@ async def test_emit_message_as_external_process_to_frontend_client( server_connect = socketio_server_events["connect"] server_disconnect = socketio_server_events["disconnect"] server_on_check = socketio_server_events["on_check"] - server_on_payment = socketio_server_events["on_payment"] # client spy events client_on_payment = socketio_client_events["on_payment"] @@ -266,4 +176,3 @@ def _(lp): async for attempt in AsyncRetrying(**context_switch_retry_kwargs): with attempt: assert client_on_payment.called - assert not server_on_payment.called diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py b/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py index 83f7138ca7d..926b865e0f2 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py @@ -86,6 +86,7 @@ async def start_computation(request: web.Request) -> web.Response: simcore_user_agent = request.headers.get( X_SIMCORE_USER_AGENT, UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE ) + async with get_database_engine(request.app).acquire() as conn: group_properties = ( await GroupExtraPropertiesRepo.get_aggregated_properties_for_user( diff --git a/services/web/server/src/simcore_service_webserver/products/_events.py b/services/web/server/src/simcore_service_webserver/products/_events.py index 48c23dc1ec1..17ebfc8412b 100644 --- a/services/web/server/src/simcore_service_webserver/products/_events.py +++ b/services/web/server/src/simcore_service_webserver/products/_events.py @@ -83,6 +83,7 @@ async def load_products_on_startup(app: web.Application): payments = await get_product_payment_fields( connection, product_name=row.name ) + app_products[name] = Product( **dict(row.items()), is_payment_enabled=payments.enabled, diff --git a/services/web/server/src/simcore_service_webserver/products/_model.py b/services/web/server/src/simcore_service_webserver/products/_model.py index c67486d09d9..eb172a9786b 100644 --- a/services/web/server/src/simcore_service_webserver/products/_model.py +++ b/services/web/server/src/simcore_service_webserver/products/_model.py @@ -234,6 +234,7 @@ def to_statics(self) -> dict[str, Any]: "manuals", "support", "is_payment_enabled", + "is_dynamic_services_telemetry_enabled", }, exclude_none=True, exclude_unset=True, diff --git a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py index cbf5536899a..10fe058e1f3 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py +++ b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py @@ -8,7 +8,7 @@ from typing import Any from aiohttp import web -from models_library.api_schemas_webserver.socketio import SocketIORoom +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.socketio import SocketMessageDict from models_library.users import UserID from servicelib.aiohttp.observer import emit @@ -91,9 +91,9 @@ async def _set_user_in_group_rooms( sio = get_socket_server(app) for group in groups: # NOTE socketio need to be upgraded that's why enter_room is not an awaitable - sio.enter_room(socket_id, SocketIORoom.from_group_id(group["gid"])) + sio.enter_room(socket_id, SocketIORoomStr.from_group_id(group["gid"])) - sio.enter_room(socket_id, SocketIORoom.from_user_id(user_id)) + sio.enter_room(socket_id, SocketIORoomStr.from_user_id(user_id)) # diff --git a/services/web/server/src/simcore_service_webserver/socketio/messages.py b/services/web/server/src/simcore_service_webserver/socketio/messages.py index 8412870814a..b32f8ebfeaa 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/messages.py +++ b/services/web/server/src/simcore_service_webserver/socketio/messages.py @@ -7,7 +7,7 @@ from typing import Final from aiohttp.web import Application -from models_library.api_schemas_webserver.socketio import SocketIORoom +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.socketio import SocketMessageDict from models_library.users import GroupID, UserID from servicelib.json_serialization import json_dumps @@ -47,7 +47,7 @@ async def send_messages( sio.emit( message["event_type"], json_dumps(message["data"]), - room=SocketIORoom.from_socket_id(sid), + room=SocketIORoomStr.from_socket_id(sid), ) for message in messages for sid in socket_ids @@ -66,7 +66,7 @@ async def send_group_messages( sio.emit( message["event_type"], json_dumps(message["data"]), - room=SocketIORoom.from_group_id(group_id), + room=SocketIORoomStr.from_group_id(group_id), ) for message in messages ] diff --git a/services/web/server/src/simcore_service_webserver/users/_preferences_api.py b/services/web/server/src/simcore_service_webserver/users/_preferences_api.py index fa9d0e6a226..083671a8ebb 100644 --- a/services/web/server/src/simcore_service_webserver/users/_preferences_api.py +++ b/services/web/server/src/simcore_service_webserver/users/_preferences_api.py @@ -15,11 +15,17 @@ from models_library.users import UserID from pydantic import NonNegativeInt, parse_obj_as from servicelib.utils import logged_gather +from simcore_postgres_database.utils_groups_extra_properties import ( + GroupExtraPropertiesRepo, +) +from ..db.plugin import get_database_engine from . import _preferences_db from ._preferences_models import ( ALL_FRONTEND_PREFERENCES, - get_preference_identifier_to_preference_name_map, + TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference, + get_preference_identifier, + get_preference_name, ) _MAX_PARALLEL_DB_QUERIES: Final[NonNegativeInt] = 2 @@ -74,11 +80,32 @@ async def get_frontend_user_preference( async def get_frontend_user_preferences_aggregation( app: web.Application, *, user_id: UserID, product_name: ProductName ) -> AggregatedPreferences: + async with get_database_engine(app).acquire() as conn: + group_extra_properties = ( + await GroupExtraPropertiesRepo.get_aggregated_properties_for_user( + conn, user_id=user_id, product_name=product_name + ) + ) + + is_telemetry_enabled: bool = group_extra_properties.enable_telemetry + + low_disk_warning_identifier = get_preference_identifier( + TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference.get_preference_name() + ) + + def include_preference(identifier: PreferenceIdentifier) -> bool: + # NOTE: some preferences are included or excluded based on + # the configuration specified in the backend + if identifier == low_disk_warning_identifier: + return is_telemetry_enabled + return True + aggregated_preferences: AggregatedPreferences = { p.preference_identifier: Preference.parse_obj( {"value": p.value, "default_value": p.get_default_value()} ) for p in await _get_frontend_user_preferences(app, user_id, product_name) + if include_preference(p.preference_identifier) } return aggregated_preferences @@ -92,10 +119,8 @@ async def set_frontend_user_preference( value: Any, ) -> None: try: - preference_name: PreferenceName = ( - get_preference_identifier_to_preference_name_map()[ - frontend_preference_identifier - ] + preference_name: PreferenceName = get_preference_name( + frontend_preference_identifier ) except KeyError as e: raise FrontendUserPreferenceIsNotDefinedError( diff --git a/services/web/server/src/simcore_service_webserver/users/_preferences_models.py b/services/web/server/src/simcore_service_webserver/users/_preferences_models.py index 1ed4a14eed9..70383358f00 100644 --- a/services/web/server/src/simcore_service_webserver/users/_preferences_models.py +++ b/services/web/server/src/simcore_service_webserver/users/_preferences_models.py @@ -14,80 +14,87 @@ class ConfirmationBackToDashboardFrontendUserPreference(FrontendUserPreference): - preference_identifier = "confirmBackToDashboard" + preference_identifier: PreferenceIdentifier = "confirmBackToDashboard" value: bool = True class ConfirmationDeleteStudyFrontendUserPreference(FrontendUserPreference): - preference_identifier = "confirmDeleteStudy" + preference_identifier: PreferenceIdentifier = "confirmDeleteStudy" value: bool = True class ConfirmationDeleteNodeFrontendUserPreference(FrontendUserPreference): - preference_identifier = "confirmDeleteNode" + preference_identifier: PreferenceIdentifier = "confirmDeleteNode" value: bool = True class ConfirmationStopNodeFrontendUserPreference(FrontendUserPreference): - preference_identifier = "confirmStopNode" + preference_identifier: PreferenceIdentifier = "confirmStopNode" value: bool = True class SnapNodeToGridFrontendUserPreference(FrontendUserPreference): - preference_identifier = "snapNodeToGrid" + preference_identifier: PreferenceIdentifier = "snapNodeToGrid" value: bool = True class ConnectPortsAutomaticallyFrontendUserPreference(FrontendUserPreference): - preference_identifier = "autoConnectPorts" + preference_identifier: PreferenceIdentifier = "autoConnectPorts" value: bool = True class DoNotShowAnnouncementsFrontendUserPreference(FrontendUserPreference): - preference_identifier = "dontShowAnnouncements" + preference_identifier: PreferenceIdentifier = "dontShowAnnouncements" value: list = Field(default_factory=list) class ServicesFrontendUserPreference(FrontendUserPreference): - preference_identifier = "services" + preference_identifier: PreferenceIdentifier = "services" value: dict = Field(default_factory=dict) class ThemeNameFrontendUserPreference(FrontendUserPreference): - preference_identifier = "themeName" + preference_identifier: PreferenceIdentifier = "themeName" value: str | None = None class LastVcsRefUIFrontendUserPreference(FrontendUserPreference): - preference_identifier = "lastVcsRefUI" + preference_identifier: PreferenceIdentifier = "lastVcsRefUI" value: str | None = None class PreferredWalletIdFrontendUserPreference(FrontendUserPreference): - preference_identifier = "preferredWalletId" + preference_identifier: PreferenceIdentifier = "preferredWalletId" value: int | None = None class CreditsWarningThresholdFrontendUserPreference(FrontendUserPreference): - preference_identifier = "creditsWarningThreshold" + preference_identifier: PreferenceIdentifier = "creditsWarningThreshold" value: int = 200 class WalletIndicatorVisibilityFrontendUserPreference(FrontendUserPreference): - preference_identifier = "walletIndicatorVisibility" + preference_identifier: PreferenceIdentifier = "walletIndicatorVisibility" value: str | None = "always" class UserInactivityThresholdFrontendUserPreference(FrontendUserPreference): - preference_identifier = "userInactivityThreshold" + preference_identifier: PreferenceIdentifier = "userInactivityThreshold" value: int = 30 * _MINUTE # in seconds class JobConcurrencyLimitFrontendUserPreference(FrontendUserPreference): - preference_identifier = "jobConcurrencyLimit" + preference_identifier: PreferenceIdentifier = "jobConcurrencyLimit" value: int | None = 1 +class TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference( + FrontendUserPreference +): + preference_identifier: PreferenceIdentifier = "lowDiskSpaceThreshold" + value: int = 5 # in gigabytes + + ALL_FRONTEND_PREFERENCES: list[type[FrontendUserPreference]] = [ ConfirmationBackToDashboardFrontendUserPreference, ConfirmationDeleteStudyFrontendUserPreference, @@ -105,15 +112,21 @@ class JobConcurrencyLimitFrontendUserPreference(FrontendUserPreference): UserInactivityThresholdFrontendUserPreference, JobConcurrencyLimitFrontendUserPreference, AllowMetricsCollectionFrontendUserPreference, + TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference, ] +_PREFERENCE_NAME_TO_IDENTIFIER_MAPPING: dict[PreferenceName, PreferenceIdentifier] = { + p.get_preference_name(): p.__fields__["preference_identifier"].default + for p in ALL_FRONTEND_PREFERENCES +} +_PREFERENCE_IDENTIFIER_TO_NAME_MAPPING: dict[PreferenceIdentifier, PreferenceName] = { + i: n for n, i in _PREFERENCE_NAME_TO_IDENTIFIER_MAPPING.items() +} + + +def get_preference_name(preference_identifier: PreferenceIdentifier) -> PreferenceName: + return _PREFERENCE_IDENTIFIER_TO_NAME_MAPPING[preference_identifier] -def get_preference_identifier_to_preference_name_map() -> ( - dict[PreferenceIdentifier, PreferenceName] -): - mapping: dict[PreferenceIdentifier, str] = {} - for preference in ALL_FRONTEND_PREFERENCES: - preference_identifier = preference.__fields__["preference_identifier"].default - mapping[preference_identifier] = preference.get_preference_name() - return mapping +def get_preference_identifier(preference_name: PreferenceName) -> PreferenceIdentifier: + return _PREFERENCE_NAME_TO_IDENTIFIER_MAPPING[preference_name] diff --git a/services/web/server/tests/unit/with_dbs/03/test_users__preferences_api.py b/services/web/server/tests/unit/with_dbs/03/test_users__preferences_api.py index 18caf84557a..488d6f9cf36 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_users__preferences_api.py +++ b/services/web/server/tests/unit/with_dbs/03/test_users__preferences_api.py @@ -5,6 +5,7 @@ from collections.abc import AsyncIterator from typing import Any +import aiopg.sa import pytest from aiohttp import web from aiohttp.test_utils import TestClient @@ -16,6 +17,9 @@ from pydantic.fields import ModelField from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict from pytest_simcore.helpers.utils_login import NewUser +from simcore_postgres_database.models.groups_extra_properties import ( + groups_extra_properties, +) from simcore_postgres_database.models.users import UserStatus from simcore_service_webserver.users._preferences_api import ( ALL_FRONTEND_PREFERENCES, @@ -111,8 +115,21 @@ async def test__get_frontend_user_preferences_list_defaults( assert preference.value == _get_default_field_value(preference.__class__) +@pytest.fixture +async def enable_all_frontend_preferences( + aiopg_engine: aiopg.sa.engine.Engine, product_name: ProductName +) -> None: + async with aiopg_engine.acquire() as conn: + await conn.execute( + groups_extra_properties.update() + .where(groups_extra_properties.c.product_name == product_name) + .values(enable_telemetry=True) + ) + + async def test_get_frontend_user_preferences_aggregation( app: web.Application, + enable_all_frontend_preferences: None, user_id: UserID, product_name: ProductName, drop_all_preferences: None, diff --git a/services/web/server/tests/unit/with_dbs/03/test_users__preferences_models.py b/services/web/server/tests/unit/with_dbs/03/test_users__preferences_models.py index 182998e2c8b..d8ce542e460 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_users__preferences_models.py +++ b/services/web/server/tests/unit/with_dbs/03/test_users__preferences_models.py @@ -1,7 +1,25 @@ +from models_library.user_preferences import PreferenceIdentifier, PreferenceName from simcore_service_webserver.users._preferences_models import ( - get_preference_identifier_to_preference_name_map, + TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference, + get_preference_identifier, + get_preference_name, ) -def test_get_preference_identifier_to_preference_name_map(): - assert get_preference_identifier_to_preference_name_map() +def test_get_preference_name_and_get_preference_identifier(): + preference_name: PreferenceName = ( + TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference.get_preference_name() + ) + assert ( + preference_name == "TelemetryLowDiskSpaceWarningThresholdFrontendUserPreference" + ) + preference_identifier: PreferenceIdentifier = get_preference_identifier( + preference_name + ) + assert preference_identifier != preference_name + assert preference_identifier == "lowDiskSpaceThreshold" + + preference_name_via_identifier: PreferenceName = get_preference_name( + preference_identifier + ) + assert preference_name_via_identifier == preference_name