diff --git a/packages/models-library/src/models_library/services_creation.py b/packages/models-library/src/models_library/services_creation.py new file mode 100644 index 00000000000..bd46273963c --- /dev/null +++ b/packages/models-library/src/models_library/services_creation.py @@ -0,0 +1,42 @@ +from typing import Any, ClassVar + +from pydantic import BaseModel + +from .services import ServiceKey, ServiceVersion +from .services_resources import ServiceResourcesDict +from .wallets import WalletID + + +class CreateServiceMetricsAdditionalParams(BaseModel): + wallet_id: WalletID | None + wallet_name: str | None + pricing_plan_id: int | None + pricing_detail_id: int | None + product_name: str + simcore_user_agent: str + user_email: str + project_name: str + node_name: str + service_key: ServiceKey + service_version: ServiceVersion + service_resources: ServiceResourcesDict + service_additional_metadata: dict[str, Any] + + class Config: + schema_extra: ClassVar[dict[str, Any]] = { + "example": { + "wallet_id": 1, + "wallet_name": "a private wallet for me", + "pricing_plan_id": 1, + "pricing_detail_id": 1, + "product_name": "osparc", + "simcore_user_agent": "undefined", + "user_email": "test@test.com", + "project_name": "_!New Study", + "node_name": "the service of a lifetime _ *!", + "service_key": ServiceKey("simcore/services/dynamic/test"), + "service_version": ServiceVersion("0.0.1"), + "service_resources": {}, + "service_additional_metadata": {}, + } + } diff --git a/packages/settings-library/src/settings_library/resource_usage_tracker.py b/packages/settings-library/src/settings_library/resource_usage_tracker.py index 999f8674e5d..dc696fab76c 100644 --- a/packages/settings-library/src/settings_library/resource_usage_tracker.py +++ b/packages/settings-library/src/settings_library/resource_usage_tracker.py @@ -1,3 +1,4 @@ +from datetime import timedelta from functools import cached_property from pydantic import parse_obj_as @@ -9,6 +10,8 @@ URLPart, ) +DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL: timedelta = timedelta(seconds=60) + class ResourceUsageTrackerSettings(BaseCustomSettings, MixinServiceSettings): RESOURCE_USAGE_TRACKER_HOST: str = "resource-usage-tracker" diff --git a/scripts/pydeps.bash b/scripts/pydeps.bash index 0916fc29d7a..18fcce0a469 100755 --- a/scripts/pydeps.bash +++ b/scripts/pydeps.bash @@ -1,5 +1,7 @@ #!/bin/bash # http://redsymbol.net/articles/unofficial-bash-strict-mode/ +# NOTE: used for circular depedndency detection + set -o errexit set -o nounset set -o pipefail diff --git a/services/director-v2/src/simcore_service_director_v2/constants.py b/services/director-v2/src/simcore_service_director_v2/constants.py index a826f410274..424ac151acb 100644 --- a/services/director-v2/src/simcore_service_director_v2/constants.py +++ b/services/director-v2/src/simcore_service_director_v2/constants.py @@ -21,5 +21,5 @@ REGEX_DY_SERVICE_SIDECAR = rf"^{DYNAMIC_SIDECAR_SERVICE_PREFIX}_[a-zA-Z0-9-_]*" REGEX_DY_SERVICE_PROXY = rf"^{DYNAMIC_PROXY_SERVICE_PREFIX}_[a-zA-Z0-9-_]*" -UNDEFINED_METADATA = "undefined-metadata" +UNDEFINED_STR_METADATA = "undefined-metadata" UNDEFINED_DOCKER_LABEL = "undefined-label" diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 1ab26b35889..d77bb963af8 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -44,6 +44,9 @@ from settings_library.r_clone import RCloneSettings as SettingsLibraryRCloneSettings from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings +from settings_library.resource_usage_tracker import ( + DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL, +) from settings_library.utils_logging import MixinLoggingSettings from settings_library.utils_service import DEFAULT_FASTAPI_PORT from simcore_postgres_database.models.clusters import ClusterType @@ -534,7 +537,7 @@ class AppSettings(BaseCustomSettings, MixinLoggingSettings): PUBLISHED_HOSTS_NAME: str = Field("", env="PUBLISHED_HOSTS_NAME") SWARM_STACK_NAME: str = Field("undefined-please-check", env="SWARM_STACK_NAME") SERVICE_TRACKING_HEARTBEAT: datetime.timedelta = Field( - default=datetime.timedelta(seconds=60), + default=DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL, description="Service scheduler heartbeat (everytime a heartbeat is sent into RabbitMQ)", ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index 1cef0cc206a..61474b9d0bb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -31,7 +31,7 @@ from servicelib.rabbitmq import RabbitMQClient from servicelib.utils import logged_gather -from ...constants import UNDEFINED_METADATA +from ...constants import UNDEFINED_STR_METADATA from ...core.errors import ( ComputationalBackendNotConnectedError, ComputationalSchedulerChangedError, @@ -363,17 +363,21 @@ async def _process_started_tasks( wallet_name=run_metadata.get("wallet_name"), pricing_plan_id=run_metadata.get("pricing_plan_id"), pricing_detail_id=run_metadata.get("pricing_detail_id"), - product_name=run_metadata.get("product_name", UNDEFINED_METADATA), + product_name=run_metadata.get( + "product_name", UNDEFINED_STR_METADATA + ), simcore_user_agent=run_metadata.get( "simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE ), user_id=user_id, - user_email=run_metadata.get("user_email", UNDEFINED_METADATA), + user_email=run_metadata.get("user_email", UNDEFINED_STR_METADATA), project_id=t.project_id, - project_name=run_metadata.get("project_name", UNDEFINED_METADATA), + project_name=run_metadata.get( + "project_name", UNDEFINED_STR_METADATA + ), node_id=t.node_id, node_name=run_metadata.get("node_id_names_map", {}).get( - t.node_id, UNDEFINED_METADATA + t.node_id, UNDEFINED_STR_METADATA ), service_key=ServiceKey(t.image.name), service_version=ServiceVersion(t.image.tag), diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py index a1add06f3d5..b8202f4d6ea 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py @@ -1,7 +1,8 @@ import logging from collections import deque +from collections.abc import Coroutine from functools import cached_property -from typing import Any, Coroutine, Final +from typing import Any, Final from fastapi import FastAPI, status from httpx import AsyncClient @@ -9,6 +10,7 @@ from models_library.projects import ProjectID from models_library.projects_networks import DockerNetworkAlias from models_library.projects_nodes_io import NodeID +from models_library.services_creation import CreateServiceMetricsAdditionalParams from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl, PositiveFloat from servicelib.fastapi.long_running_tasks.client import ( @@ -29,15 +31,15 @@ from ._errors import BaseClientHTTPError, UnexpectedStatusError from ._thin import ThinSidecarsClient -STATUS_POLL_INTERVAL: Final[PositiveFloat] = 1 +_logger = logging.getLogger(__name__) -logger = logging.getLogger(__name__) +_STATUS_POLL_INTERVAL: Final[PositiveFloat] = 1 async def _debug_progress_callback( message: ProgressMessage, percent: ProgressPercent, task_id: TaskId ) -> None: - logger.debug("%s: %.2f %s", task_id, percent, message) + _logger.debug("%s: %.2f %s", task_id, percent, message) class SidecarsClient: @@ -92,7 +94,7 @@ async def containers_inspect( result: dict[str, Any] = response.json() return result - @log_decorator(logger=logger) + @log_decorator(logger=_logger) async def containers_docker_status( self, dynamic_sidecar_endpoint: AnyHttpUrl ) -> dict[str, dict[str, str]]: @@ -105,7 +107,7 @@ async def containers_docker_status( except UnexpectedStatusError: return {} - @log_decorator(logger=logger) + @log_decorator(logger=_logger) async def disable_service_ports_io( self, dynamic_sidecar_endpoint: AnyHttpUrl ) -> None: @@ -113,7 +115,7 @@ async def disable_service_ports_io( dynamic_sidecar_endpoint, is_enabled=False ) - @log_decorator(logger=logger) + @log_decorator(logger=_logger) async def enable_service_ports_io( self, dynamic_sidecar_endpoint: AnyHttpUrl ) -> None: @@ -121,7 +123,7 @@ async def enable_service_ports_io( dynamic_sidecar_endpoint, is_enabled=True ) - @log_decorator(logger=logger) + @log_decorator(logger=_logger) async def service_outputs_create_dirs( self, dynamic_sidecar_endpoint: AnyHttpUrl, outputs_labels: dict[str, Any] ) -> None: @@ -129,7 +131,7 @@ async def service_outputs_create_dirs( dynamic_sidecar_endpoint, outputs_labels=outputs_labels ) - @log_decorator(logger=logger) + @log_decorator(logger=_logger) async def get_entrypoint_container_name( self, dynamic_sidecar_endpoint: AnyHttpUrl, dynamic_sidecar_network_name: str ) -> str: @@ -280,19 +282,22 @@ async def _await_for_result( task_id, task_timeout=task_timeout, progress_callback=progress_callback, - status_poll_interval=STATUS_POLL_INTERVAL, + status_poll_interval=_STATUS_POLL_INTERVAL, ) as result: - logger.debug("Task %s finished", task_id) + _logger.debug("Task %s finished", task_id) return result async def create_containers( self, dynamic_sidecar_endpoint: AnyHttpUrl, compose_spec: str, + metrics_params: CreateServiceMetricsAdditionalParams, progress_callback: ProgressCallback | None = None, ) -> None: response = await self._thin_client.post_containers_tasks( - dynamic_sidecar_endpoint, compose_spec=compose_spec + dynamic_sidecar_endpoint, + compose_spec=compose_spec, + metrics_params=metrics_params, ) task_id: TaskId = response.json() @@ -472,12 +477,12 @@ def _get_proxy_configuration( async def setup(app: FastAPI) -> None: - with log_context(logger, logging.DEBUG, "dynamic-sidecar api client setup"): + with log_context(_logger, logging.DEBUG, "dynamic-sidecar api client setup"): app.state.sidecars_api_clients = {} async def shutdown(app: FastAPI) -> None: - with log_context(logger, logging.DEBUG, "dynamic-sidecar api client closing..."): + with log_context(_logger, logging.DEBUG, "dynamic-sidecar api client closing..."): await logged_gather( *( x._thin_client.close() # pylint: disable=protected-access diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py index 8eec5da37cf..8ef0492ea73 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py @@ -1,9 +1,9 @@ import json -import logging from typing import Any from fastapi import FastAPI, status from httpx import Response, Timeout +from models_library.services_creation import CreateServiceMetricsAdditionalParams from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl from servicelib.docker_constants import SUFFIX_EGRESS_PROXY_NAME @@ -11,8 +11,6 @@ from ....core.settings import DynamicSidecarSettings from ._base import BaseThinClient, expect_status, retry_on_errors -logger = logging.getLogger(__name__) - class ThinSidecarsClient(BaseThinClient): """ @@ -159,11 +157,21 @@ async def post_containers_networks_detach( @retry_on_errors @expect_status(status.HTTP_202_ACCEPTED) async def post_containers_tasks( - self, dynamic_sidecar_endpoint: AnyHttpUrl, *, compose_spec: str + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + *, + compose_spec: str, + metrics_params: CreateServiceMetricsAdditionalParams, ) -> Response: url = self._get_url(dynamic_sidecar_endpoint, "/containers") # change introduce in OAS version==1.1.0 - return await self.client.post(url, json={"docker_compose_yaml": compose_spec}) + return await self.client.post( + url, + json={ + "docker_compose_yaml": compose_spec, + "metrics_params": metrics_params.dict(), + }, + ) @retry_on_errors @expect_status(status.HTTP_202_ACCEPTED) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py index c5056a437ff..85d4ff1b690 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py @@ -14,20 +14,11 @@ ProgressRabbitMessageNode, ProgressType, ) -from models_library.service_settings_labels import ( - SimcoreServiceLabels, - SimcoreServiceSettingsLabel, -) -from models_library.services import RunID, ServiceKeyVersion -from pydantic import PositiveFloat -from servicelib.fastapi.long_running_tasks.client import TaskId +from models_library.service_settings_labels import SimcoreServiceSettingsLabel +from models_library.services import RunID from servicelib.json_serialization import json_dumps from servicelib.rabbitmq import RabbitMQClient from simcore_postgres_database.models.comp_tasks import NodeClass -from tenacity._asyncio import AsyncRetrying -from tenacity.before_sleep import before_sleep_log -from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed from .....core.settings import DynamicSidecarProxySettings, DynamicSidecarSettings from .....models.dynamic_services_scheduler import ( @@ -56,15 +47,15 @@ get_swarm_network, is_dynamic_sidecar_stack_missing, ) -from ...docker_compose_specs import assemble_spec from ...docker_service_specs import ( extract_service_port_service_settings, get_dynamic_proxy_spec, get_dynamic_sidecar_spec, merge_settings_before_use, ) -from ...errors import EntrypointContainerNotFoundError, UnexpectedContainerStatusError +from ...errors import UnexpectedContainerStatusError from ._abc import DynamicSchedulerEvent +from ._events_user_services import create_user_services from ._events_utils import ( are_all_user_services_containers_running, attach_project_networks, @@ -75,9 +66,9 @@ wait_for_sidecar_api, ) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) -DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS: Final[tuple[list[str], ...]] = ( +_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS: Final[tuple[list[str], ...]] = ( ["labels"], ["task_template", "Resources", "Limits"], ["task_template", "Resources", "Reservation", "MemoryBytes"], @@ -144,7 +135,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: if node is not None and node.boot_options is not None else {} ) - logger.info("%s", f"{boot_options=}") + _logger.info("%s", f"{boot_options=}") settings: SimcoreServiceSettingsLabel = await merge_settings_before_use( director_v0_client=director_v0_client, @@ -214,7 +205,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: nested_update( jsonable_encoder(dynamic_sidecar_service_spec_base, exclude_unset=True), jsonable_encoder(user_specific_service_spec, exclude_unset=True), - include=DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS, + include=_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS, ) ) rabbit_message = ProgressRabbitMessageNode( @@ -268,7 +259,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: swarm_network_id=swarm_network_id, swarm_network_name=swarm_network_name, ) - logger.debug( + _logger.debug( "dynamic-sidecar-proxy create_service_params %s", json_dumps(dynamic_sidecar_proxy_create_service_params), ) @@ -292,6 +283,7 @@ class WaitForSidecarAPI(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return ( scheduler_data.dynamic_sidecar.was_dynamic_sidecar_started and not scheduler_data.dynamic_sidecar.is_healthy @@ -309,6 +301,7 @@ class UpdateHealth(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return scheduler_data.dynamic_sidecar.was_dynamic_sidecar_started @classmethod @@ -328,6 +321,7 @@ class GetStatus(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return ( scheduler_data.dynamic_sidecar.status.current == DynamicSidecarStatus.OK and scheduler_data.dynamic_sidecar.is_ready @@ -397,6 +391,7 @@ class PrepareServicesEnvironment(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return ( scheduler_data.dynamic_sidecar.status.current == DynamicSidecarStatus.OK and scheduler_data.dynamic_sidecar.is_ready @@ -417,6 +412,7 @@ class CreateUserServices(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return ( scheduler_data.dynamic_sidecar.is_service_environment_ready and not scheduler_data.dynamic_sidecar.compose_spec_submitted @@ -424,135 +420,7 @@ async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool @classmethod async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: - logger.debug( - "Getting docker compose spec for service %s", scheduler_data.service_name - ) - - dynamic_sidecar_settings: DynamicSidecarSettings = ( - app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR - ) - sidecars_client = get_sidecars_client(app, scheduler_data.node_uuid) - dynamic_sidecar_endpoint = scheduler_data.endpoint - - # check values have been set by previous step - if ( - scheduler_data.dynamic_sidecar.dynamic_sidecar_id is None - or scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id is None - or scheduler_data.dynamic_sidecar.swarm_network_id is None - or scheduler_data.dynamic_sidecar.swarm_network_name is None - or scheduler_data.proxy_admin_api_port is None - ): - msg = ( - "Did not expect None for any of the following: " - f"{scheduler_data.dynamic_sidecar.dynamic_sidecar_id=} " - f"{scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id=} " - f"{scheduler_data.dynamic_sidecar.swarm_network_id=} " - f"{scheduler_data.dynamic_sidecar.swarm_network_name=} " - f"{scheduler_data.proxy_admin_api_port=}" - ) - raise ValueError(msg) - - # Starts dynamic SIDECAR ------------------------------------- - # creates a docker compose spec given the service key and tag - # fetching project form DB and fetching user settings - - director_v0_client: DirectorV0Client = get_director_v0_client(app) - simcore_service_labels: SimcoreServiceLabels = ( - await director_v0_client.get_service_labels( - service=ServiceKeyVersion( - key=scheduler_data.key, version=scheduler_data.version - ) - ) - ) - - groups_extra_properties = get_repository(app, GroupsExtraPropertiesRepository) - assert scheduler_data.product_name is not None # nosec - allow_internet_access: bool = await groups_extra_properties.has_internet_access( - user_id=scheduler_data.user_id, product_name=scheduler_data.product_name - ) - - compose_spec: str = assemble_spec( - app=app, - service_key=scheduler_data.key, - service_version=scheduler_data.version, - paths_mapping=scheduler_data.paths_mapping, - compose_spec=scheduler_data.compose_spec, - container_http_entry=scheduler_data.container_http_entry, - dynamic_sidecar_network_name=scheduler_data.dynamic_sidecar_network_name, - swarm_network_name=scheduler_data.dynamic_sidecar.swarm_network_name, - service_resources=scheduler_data.service_resources, - has_quota_support=dynamic_sidecar_settings.DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS, - simcore_service_labels=simcore_service_labels, - allow_internet_access=allow_internet_access, - product_name=scheduler_data.product_name, - user_id=scheduler_data.user_id, - project_id=scheduler_data.project_id, - node_id=scheduler_data.node_uuid, - simcore_user_agent=scheduler_data.request_simcore_user_agent, - swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, - ) - - logger.debug( - "Starting containers %s with compose-specs:\n%s", - scheduler_data.service_name, - compose_spec, - ) - - async def progress_create_containers( - message: str, percent: PositiveFloat, task_id: TaskId - ) -> None: - # TODO: detect when images are pulling and change the status - # of the service to pulling - logger.debug("%s: %.2f %s", task_id, percent, message) - - await sidecars_client.create_containers( - dynamic_sidecar_endpoint, compose_spec, progress_create_containers - ) - - # NOTE: when in READ ONLY mode disable the outputs watcher - if scheduler_data.dynamic_sidecar.service_removal_state.can_save: - await sidecars_client.enable_service_ports_io(dynamic_sidecar_endpoint) - else: - await sidecars_client.disable_service_ports_io(dynamic_sidecar_endpoint) - - # Starts PROXY ----------------------------------------------- - # The entrypoint container name was now computed - # continue starting the proxy - - async for attempt in AsyncRetrying( - stop=stop_after_delay( - dynamic_sidecar_settings.DYNAMIC_SIDECAR_WAIT_FOR_CONTAINERS_TO_START - ), - wait=wait_fixed(1), - retry_error_cls=EntrypointContainerNotFoundError, - before_sleep=before_sleep_log(logger, logging.WARNING), - ): - with attempt: - if scheduler_data.dynamic_sidecar.service_removal_state.was_removed: - # the service was removed while waiting for the operation to finish - logger.warning( - "Stopping `get_entrypoint_container_name` operation. " - "Will no try to start the service." - ) - return - - entrypoint_container = await sidecars_client.get_entrypoint_container_name( - dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, - dynamic_sidecar_network_name=scheduler_data.dynamic_sidecar_network_name, - ) - logger.info( - "Fetched container entrypoint name %s", entrypoint_container - ) - - await sidecars_client.configure_proxy( - proxy_endpoint=scheduler_data.get_proxy_endpoint, - entrypoint_container_name=entrypoint_container, - service_port=scheduler_data.service_port, - ) - - scheduler_data.dynamic_sidecar.were_containers_created = True - - scheduler_data.dynamic_sidecar.was_compose_spec_submitted = True + await create_user_services(app, scheduler_data) class AttachProjectsNetworks(DynamicSchedulerEvent): @@ -565,6 +433,7 @@ class AttachProjectsNetworks(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return ( scheduler_data.dynamic_sidecar.were_containers_created and not scheduler_data.dynamic_sidecar.is_project_network_attached @@ -593,6 +462,7 @@ class RemoveUserCreatedServices(DynamicSchedulerEvent): @classmethod async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool: + assert app # nose return scheduler_data.dynamic_sidecar.service_removal_state.can_remove @classmethod diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py new file mode 100644 index 00000000000..55c5eb7609c --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py @@ -0,0 +1,188 @@ +import logging + +from fastapi import FastAPI +from models_library.projects import ProjectAtDB +from models_library.projects_nodes_io import NodeIDStr +from models_library.service_settings_labels import SimcoreServiceLabels +from models_library.services import ServiceKeyVersion, ServiceVersion +from models_library.services_creation import CreateServiceMetricsAdditionalParams +from pydantic import PositiveFloat, parse_obj_as +from servicelib.fastapi.long_running_tasks.client import TaskId +from tenacity._asyncio import AsyncRetrying +from tenacity.before_sleep import before_sleep_log +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + +from .....core.settings import DynamicSidecarSettings +from .....models.dynamic_services_scheduler import SchedulerData +from .....utils.db import get_repository +from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository +from ....db.repositories.projects import ProjectsRepository +from ....db.repositories.users import UsersRepository +from ....director_v0 import DirectorV0Client +from ...api_client import get_sidecars_client +from ...docker_compose_specs import assemble_spec +from ...errors import EntrypointContainerNotFoundError +from ._events_utils import get_director_v0_client + +_logger = logging.getLogger(__name__) + + +async def create_user_services(app: FastAPI, scheduler_data: SchedulerData): + _logger.debug( + "Getting docker compose spec for service %s", scheduler_data.service_name + ) + + dynamic_sidecar_settings: DynamicSidecarSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR + ) + sidecars_client = get_sidecars_client(app, scheduler_data.node_uuid) + dynamic_sidecar_endpoint = scheduler_data.endpoint + + # check values have been set by previous step + if ( + scheduler_data.dynamic_sidecar.dynamic_sidecar_id is None + or scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id is None + or scheduler_data.dynamic_sidecar.swarm_network_id is None + or scheduler_data.dynamic_sidecar.swarm_network_name is None + or scheduler_data.proxy_admin_api_port is None + ): + msg = ( + "Did not expect None for any of the following: " + f"{scheduler_data.dynamic_sidecar.dynamic_sidecar_id=} " + f"{scheduler_data.dynamic_sidecar.dynamic_sidecar_network_id=} " + f"{scheduler_data.dynamic_sidecar.swarm_network_id=} " + f"{scheduler_data.dynamic_sidecar.swarm_network_name=} " + f"{scheduler_data.proxy_admin_api_port=}" + ) + raise ValueError(msg) + + # Starts dynamic SIDECAR ------------------------------------- + # creates a docker compose spec given the service key and tag + # fetching project form DB and fetching user settings + + director_v0_client: DirectorV0Client = get_director_v0_client(app) + simcore_service_labels: SimcoreServiceLabels = ( + await director_v0_client.get_service_labels( + service=ServiceKeyVersion( + key=scheduler_data.key, version=scheduler_data.version + ) + ) + ) + + groups_extra_properties = get_repository(app, GroupsExtraPropertiesRepository) + assert scheduler_data.product_name is not None # nosec + allow_internet_access: bool = await groups_extra_properties.has_internet_access( + user_id=scheduler_data.user_id, product_name=scheduler_data.product_name + ) + + compose_spec: str = assemble_spec( + app=app, + service_key=scheduler_data.key, + service_version=scheduler_data.version, + paths_mapping=scheduler_data.paths_mapping, + compose_spec=scheduler_data.compose_spec, + container_http_entry=scheduler_data.container_http_entry, + dynamic_sidecar_network_name=scheduler_data.dynamic_sidecar_network_name, + swarm_network_name=scheduler_data.dynamic_sidecar.swarm_network_name, + service_resources=scheduler_data.service_resources, + has_quota_support=dynamic_sidecar_settings.DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS, + simcore_service_labels=simcore_service_labels, + allow_internet_access=allow_internet_access, + product_name=scheduler_data.product_name, + user_id=scheduler_data.user_id, + project_id=scheduler_data.project_id, + node_id=scheduler_data.node_uuid, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + swarm_stack_name=dynamic_sidecar_settings.SWARM_STACK_NAME, + ) + + _logger.debug( + "Starting containers %s with compose-specs:\n%s", + scheduler_data.service_name, + compose_spec, + ) + + async def progress_create_containers( + message: str, percent: PositiveFloat, task_id: TaskId + ) -> None: + # TODO: detect when images are pulling and change the status + # of the service to pulling + _logger.debug("%s: %.2f %s", task_id, percent, message) + + # data from project + projects_repository = get_repository(app, ProjectsRepository) + project: ProjectAtDB = await projects_repository.get_project( + project_id=scheduler_data.project_id + ) + project_name = project.name + node_name = project.workbench[NodeIDStr(scheduler_data.node_uuid)].label + + # data from user + users_repository = get_repository(app, UsersRepository) + user_email = await users_repository.get_user_email(scheduler_data.user_id) + + metrics_params = CreateServiceMetricsAdditionalParams( + wallet_id=None, + wallet_name=None, + pricing_plan_id=None, + pricing_detail_id=None, + product_name=scheduler_data.product_name, + simcore_user_agent=scheduler_data.request_simcore_user_agent, + user_email=user_email, + project_name=project_name, + node_name=node_name, + service_key=scheduler_data.key, + service_version=parse_obj_as(ServiceVersion, scheduler_data.version), + service_resources=scheduler_data.service_resources, + service_additional_metadata={}, + ) + await sidecars_client.create_containers( + dynamic_sidecar_endpoint, + compose_spec, + metrics_params, + progress_create_containers, + ) + + # NOTE: when in READ ONLY mode disable the outputs watcher + if scheduler_data.dynamic_sidecar.service_removal_state.can_save: + await sidecars_client.enable_service_ports_io(dynamic_sidecar_endpoint) + else: + await sidecars_client.disable_service_ports_io(dynamic_sidecar_endpoint) + + # Starts PROXY ----------------------------------------------- + # The entrypoint container name was now computed + # continue starting the proxy + + async for attempt in AsyncRetrying( + stop=stop_after_delay( + dynamic_sidecar_settings.DYNAMIC_SIDECAR_WAIT_FOR_CONTAINERS_TO_START + ), + wait=wait_fixed(1), + retry_error_cls=EntrypointContainerNotFoundError, + before_sleep=before_sleep_log(_logger, logging.WARNING), + ): + with attempt: + if scheduler_data.dynamic_sidecar.service_removal_state.was_removed: + # the service was removed while waiting for the operation to finish + _logger.warning( + "Stopping `get_entrypoint_container_name` operation. " + "Will no try to start the service." + ) + return + + entrypoint_container = await sidecars_client.get_entrypoint_container_name( + dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, + dynamic_sidecar_network_name=scheduler_data.dynamic_sidecar_network_name, + ) + _logger.info("Fetched container entrypoint name %s", entrypoint_container) + + await sidecars_client.configure_proxy( + proxy_endpoint=scheduler_data.get_proxy_endpoint, + entrypoint_container_name=entrypoint_container, + service_port=scheduler_data.service_port, + ) + + scheduler_data.dynamic_sidecar.were_containers_created = True + + scheduler_data.dynamic_sidecar.was_compose_spec_submitted = True diff --git a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py index c5a9a99b0e7..f8b727f6cdb 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_services_routes.py +++ b/services/director-v2/tests/integration/02/test_dynamic_services_routes.py @@ -4,9 +4,10 @@ import asyncio import json import logging +from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable from contextlib import asynccontextmanager -from typing import Any, AsyncIterable, AsyncIterator, Awaitable, Callable -from unittest.mock import Mock +from typing import Any +from unittest.mock import MagicMock import aiodocker import pytest @@ -111,17 +112,19 @@ def start_request_data( ensure_swarm_and_networks: None, osparc_product_name: str, ) -> dict[str, Any]: - return dict( - user_id=user_id, - project_id=project_id, - product_name=osparc_product_name, - service_uuid=node_uuid, - service_key=dy_static_file_server_dynamic_sidecar_service["image"]["name"], - service_version=dy_static_file_server_dynamic_sidecar_service["image"]["tag"], - request_scheme="http", - request_dns="localhost:50000", - can_save=True, - settings=[ + return { + "user_id": user_id, + "project_id": project_id, + "product_name": osparc_product_name, + "service_uuid": node_uuid, + "service_key": dy_static_file_server_dynamic_sidecar_service["image"]["name"], + "service_version": dy_static_file_server_dynamic_sidecar_service["image"][ + "tag" + ], + "request_scheme": "http", + "request_dns": "localhost:50000", + "can_save": True, + "settings": [ { "name": "resources", "type": "Resources", @@ -134,11 +137,11 @@ def start_request_data( "value": ["node.platform.os == linux"], }, ], - paths_mapping={"outputs_path": "/tmp/outputs", "inputs_path": "/tmp/inputs"}, - service_resources=ServiceResourcesDictHelpers.create_jsonable( + "paths_mapping": {"outputs_path": "/tmp/outputs", "inputs_path": "/tmp/inputs"}, + "service_resources": ServiceResourcesDictHelpers.create_jsonable( service_resources ), - ) + } @pytest.fixture @@ -209,9 +212,18 @@ async def ensure_services_stopped( @pytest.fixture def mock_project_repository(mocker: MockerFixture) -> None: + class ExtendedMagicMock(MagicMock): + @property + def name(self) -> str: + return "test_name" + + @property + def label(self) -> str: + return "test_label" + mocker.patch( f"{DIRECTOR_V2_MODULES}.db.repositories.projects.ProjectsRepository.get_project", - side_effect=lambda *args, **kwargs: Mock(), + side_effect=lambda *args, **kwargs: ExtendedMagicMock(), ) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py index 980aadbc7f3..d1df9083312 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py @@ -2,14 +2,15 @@ # pylint:disable=redefined-outer-name import json -from typing import Any, Callable, Optional +from collections.abc import Callable +from typing import Any import pytest from fastapi import FastAPI, status from httpx import Response +from models_library.services_creation import CreateServiceMetricsAdditionalParams from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl, parse_obj_as -from pytest import MonkeyPatch from pytest_simcore.helpers.typing_env import EnvVarsDict from respx import MockRouter, Route from respx.types import SideEffectTypes @@ -19,10 +20,7 @@ ThinSidecarsClient, ) -# NOTE: typing and callables cannot -MockRequestType = Callable[ - [str, str, Optional[Response], Optional[SideEffectTypes]], Route -] +MockRequestType = Callable[[str, str, Response | None, SideEffectTypes | None], Route] # UTILS @@ -36,7 +34,7 @@ def assert_responses(mocked: Response, result: Response | None) -> None: @pytest.fixture -def mocked_app(monkeypatch: MonkeyPatch, mock_env: EnvVarsDict) -> FastAPI: +def mocked_app(monkeypatch: pytest.MonkeyPatch, mock_env: EnvVarsDict) -> FastAPI: monkeypatch.setenv("S3_ENDPOINT", "") monkeypatch.setenv("S3_ACCESS_KEY", "") monkeypatch.setenv("S3_SECRET_KEY", "") @@ -176,10 +174,10 @@ async def test_get_containers_name( mock_response = Response(status.HTTP_200_OK) encoded_filters = json.dumps( - dict( - network=dynamic_sidecar_network_name, - exclude=SUFFIX_EGRESS_PROXY_NAME, - ) + { + "network": dynamic_sidecar_network_name, + "exclude": SUFFIX_EGRESS_PROXY_NAME, + } ) mock_request( "GET", @@ -274,7 +272,13 @@ async def test_put_volumes( pytest.param( "post_containers_tasks", "/containers", - dict(compose_spec="some_fake_compose_as_str"), + { + "compose_spec": "some_fake_compose_as_str", + "metrics_params": parse_obj_as( + CreateServiceMetricsAdditionalParams, + CreateServiceMetricsAdditionalParams.Config.schema_extra["example"], + ), + }, id="post_containers_tasks", ), pytest.param( diff --git a/services/dynamic-sidecar/openapi.json b/services/dynamic-sidecar/openapi.json index aab02c4460a..0e6b7a062f4 100644 --- a/services/dynamic-sidecar/openapi.json +++ b/services/dynamic-sidecar/openapi.json @@ -767,16 +767,30 @@ ], "title": "AttachContainerToNetworkItem" }, + "BootMode": { + "type": "string", + "enum": [ + "CPU", + "GPU", + "MPI" + ], + "title": "BootMode", + "description": "An enumeration." + }, "ContainersCreate": { "properties": { "docker_compose_yaml": { "type": "string", "title": "Docker Compose Yaml" + }, + "metrics_params": { + "$ref": "#/components/schemas/CreateServiceMetricsAdditionalParams" } }, "type": "object", "required": [ - "docker_compose_yaml" + "docker_compose_yaml", + "metrics_params" ], "title": "ContainersCreate" }, @@ -796,6 +810,97 @@ ], "title": "CreateDirsRequestItem" }, + "CreateServiceMetricsAdditionalParams": { + "properties": { + "wallet_id": { + "type": "integer", + "exclusiveMinimum": true, + "title": "Wallet Id", + "minimum": 0 + }, + "wallet_name": { + "type": "string", + "title": "Wallet Name" + }, + "pricing_plan_id": { + "type": "integer", + "title": "Pricing Plan Id" + }, + "pricing_detail_id": { + "type": "integer", + "title": "Pricing Detail Id" + }, + "product_name": { + "type": "string", + "title": "Product Name" + }, + "simcore_user_agent": { + "type": "string", + "title": "Simcore User Agent" + }, + "user_email": { + "type": "string", + "title": "User Email" + }, + "project_name": { + "type": "string", + "title": "Project Name" + }, + "node_name": { + "type": "string", + "title": "Node Name" + }, + "service_key": { + "type": "string", + "pattern": "^simcore/services/((comp|dynamic|frontend))/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", + "title": "Service Key" + }, + "service_version": { + "type": "string", + "pattern": "^(0|[1-9]\\d*)(\\.(0|[1-9]\\d*)){2}(-(0|[1-9]\\d*|\\d*[-a-zA-Z][-\\da-zA-Z]*)(\\.(0|[1-9]\\d*|\\d*[-a-zA-Z][-\\da-zA-Z]*))*)?(\\+[-\\da-zA-Z]+(\\.[-\\da-zA-Z-]+)*)?$", + "title": "Service Version" + }, + "service_resources": { + "additionalProperties": { + "$ref": "#/components/schemas/ImageResources" + }, + "type": "object", + "title": "Service Resources" + }, + "service_additional_metadata": { + "type": "object", + "title": "Service Additional Metadata" + } + }, + "type": "object", + "required": [ + "product_name", + "simcore_user_agent", + "user_email", + "project_name", + "node_name", + "service_key", + "service_version", + "service_resources", + "service_additional_metadata" + ], + "title": "CreateServiceMetricsAdditionalParams", + "example": { + "wallet_id": 1, + "wallet_name": "a private wallet for me", + "pricing_plan_id": 1, + "pricing_detail_id": 1, + "product_name": "osparc", + "simcore_user_agent": "undefined", + "user_email": "test@test.com", + "project_name": "_!New Study", + "node_name": "the service of a lifetime _ *!", + "service_key": "simcore/services/dynamic/test", + "service_version": "0.0.1", + "service_resources": {}, + "service_additional_metadata": {} + } + }, "DetachContainerFromNetworkItem": { "properties": { "network_id": { @@ -822,6 +927,64 @@ "type": "object", "title": "HTTPValidationError" }, + "ImageResources": { + "properties": { + "image": { + "type": "string", + "pattern": "^(?:([a-z0-9-]+(?:\\.[a-z0-9-]+)+(?::\\d+)?|[a-z0-9-]+:\\d+)/)?((?:[a-z0-9][a-z0-9_.-]*/)*[a-z0-9-_]+[a-z0-9])(?::([\\w][\\w.-]{0,127}))?(\\@sha256:[a-fA-F0-9]{32,64})?$", + "title": "Image", + "description": "Used by the frontend to provide a context for the users.Services with a docker-compose spec will have multiple entries.Using the `image:version` instead of the docker-compose spec is more helpful for the end user." + }, + "resources": { + "additionalProperties": { + "$ref": "#/components/schemas/ResourceValue" + }, + "type": "object", + "title": "Resources" + }, + "boot_modes": { + "items": { + "$ref": "#/components/schemas/BootMode" + }, + "type": "array", + "description": "describe how a service shall be booted, using CPU, MPI, openMP or GPU", + "default": [ + "CPU" + ] + } + }, + "type": "object", + "required": [ + "image", + "resources" + ], + "title": "ImageResources", + "example": { + "image": "simcore/service/dynamic/pretty-intense:1.0.0", + "resources": { + "CPU": { + "limit": 4, + "reservation": 0.1 + }, + "RAM": { + "limit": 103079215104, + "reservation": 536870912 + }, + "VRAM": { + "limit": 1, + "reservation": 1 + }, + "AIRAM": { + "limit": 1, + "reservation": 1 + }, + "ANY_resource": { + "limit": "some_value", + "reservation": "some_value" + } + } + } + }, "PatchPortsIOItem": { "properties": { "is_enabled": { @@ -847,6 +1010,44 @@ ], "title": "PutVolumeItem" }, + "ResourceValue": { + "properties": { + "limit": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ], + "title": "Limit" + }, + "reservation": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ], + "title": "Reservation" + } + }, + "type": "object", + "required": [ + "limit", + "reservation" + ], + "title": "ResourceValue" + }, "SelectBox": { "properties": { "structure": { diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py index 6eccbe09a76..0e22f3ede1b 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py @@ -14,8 +14,8 @@ from ..modules.mounted_fs import MountedVolumes from ..modules.outputs import ( OutputsContext, - disable_outputs_watcher, - enable_outputs_watcher, + disable_event_propagation, + enable_event_propagation, ) from ._dependencies import get_application, get_mounted_volumes, get_outputs_context @@ -63,10 +63,10 @@ async def toggle_ports_io( - inputs ports from pulling data """ if patch_ports_io_item.is_enabled: - await enable_outputs_watcher(app) + await enable_event_propagation(app) enable_inputs_state_pulling(app) else: - await disable_outputs_watcher(app) + await disable_event_propagation(app) disable_inputs_state_pulling(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index 737c4bba8bd..5a0a9796773 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -1,5 +1,6 @@ import logging from asyncio import Lock +from typing import Any, ClassVar from fastapi import FastAPI from models_library.basic_types import BootModeEnum @@ -20,6 +21,7 @@ from ..modules.inputs import setup_inputs from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs from ..modules.outputs import setup_outputs +from ..modules.resource_tracking import setup_resource_tracking from .docker_compose_utils import docker_compose_down from .docker_logs import setup_background_log_fetcher from .error_handlers import http_error_handler, node_not_found_error_handler @@ -58,7 +60,7 @@ class AppState: of the different app.state fields during the app's lifespan """ - _STATES = { + _STATES: ClassVar[dict[str, Any]] = { "settings": ApplicationSettings, "mounted_volumes": MountedVolumes, "shared_store": SharedStore, @@ -72,9 +74,8 @@ def __init__(self, initialized_app: FastAPI): if not isinstance(getattr(initialized_app.state, name, None), type_) ] if errors: - raise ValueError( - f"These app states were not properly initialized: {errors}" - ) + msg = f"These app states were not properly initialized: {errors}" + raise ValueError(msg) self._app = initialized_app @@ -150,6 +151,7 @@ def create_app(): if app.state.settings.RABBIT_SETTINGS: setup_rabbitmq(app) setup_background_log_fetcher(app) + setup_resource_tracking(app) # also sets up mounted_volumes setup_mounted_fs(app) @@ -171,7 +173,7 @@ async def _on_startup() -> None: await login_registry(app_state.settings.REGISTRY_SETTINGS) await volumes_fix_permissions(app_state.mounted_volumes) # STARTED - print(APP_STARTED_BANNER_MSG, flush=True) + print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201 async def _on_shutdown() -> None: app_state = AppState(app) @@ -189,7 +191,7 @@ async def _on_shutdown() -> None: await cancel_sequential_workers() # FINISHED - print(APP_FINISHED_BANNER_MSG, flush=True) + print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201 app.add_event_handler("startup", _on_startup) app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py index 74363b1927f..1033ec53f37 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py @@ -11,6 +11,7 @@ from contextlib import suppress from typing import Any, AsyncGenerator, Callable, Coroutine, cast +from aiodocker import DockerError from fastapi import FastAPI from servicelib.logging_utils import guess_message_log_level @@ -35,11 +36,19 @@ async def _logs_fetcher_worker( image_name = container_inspect["Config"]["Image"].split("/")[-1] logger.debug("Streaming logs from %s, image %s", container_name, image_name) - async for line in cast( - AsyncGenerator[str, None], - container.log(stdout=True, stderr=True, follow=True), - ): - await dispatch_log(image_name=image_name, message=line) + try: + async for line in cast( + AsyncGenerator[str, None], + container.log(stdout=True, stderr=True, follow=True), + ): + await dispatch_log(image_name=image_name, message=line) + except DockerError as e: + logger.warning( + "Cannot stream logs from %s, image %s, because: %s", + container_name, + image_name, + e, + ) class BackgroundLogFetcher: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py index 914d3bdf882..502b7ca3342 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py @@ -1,21 +1,33 @@ import asyncio import logging +from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable from contextlib import asynccontextmanager from enum import Enum -from typing import Any, AsyncGenerator, Awaitable, Callable, Final, TypedDict +from typing import Any, Final, TypedDict import aiodocker import yaml +from aiodocker.containers import DockerContainer from aiodocker.utils import clean_filters from models_library.basic_regex import DOCKER_GENERIC_TAG_KEY_RE +from models_library.generated_models.docker_rest_api import ContainerState +from models_library.generated_models.docker_rest_api import Status2 as ContainerStatus from models_library.services import RunID from pydantic import PositiveInt, parse_obj_as from servicelib.logging_utils import log_catch +from servicelib.utils import logged_gather from settings_library.docker_registry import RegistrySettings +from starlette import status as http_status from .errors import UnexpectedDockerError, VolumeNotFoundError -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) + + +_ACCEPTED_CONTAINER_STATUSES: set[str] = { + ContainerStatus.created, + ContainerStatus.running, +} @asynccontextmanager @@ -24,7 +36,7 @@ async def docker_client() -> AsyncGenerator[aiodocker.Docker, None]: try: yield docker except aiodocker.exceptions.DockerError as error: - logger.debug("An unexpected Docker error occurred", exc_info=True) + _logger.debug("An unexpected Docker error occurred", exc_info=True) raise UnexpectedDockerError( message=error.message, status=error.status ) from error @@ -36,29 +48,81 @@ async def get_volume_by_label(label: str, run_id: RunID) -> dict[str, Any]: async with docker_client() as docker: filters = {"label": [f"source={label}", f"run_id={run_id}"]} params = {"filters": clean_filters(filters)} - data = await docker._query_json( # pylint: disable=protected-access + data = await docker._query_json( # pylint: disable=protected-access # noqa: SLF001 "volumes", method="GET", params=params ) volumes = data["Volumes"] - logger.debug( # pylint: disable=logging-fstring-interpolation - f"volumes query for {label=} {volumes=}" - ) + _logger.debug("volumes query for label=%s volumes=%s", label, volumes) if len(volumes) != 1: raise VolumeNotFoundError(label, run_id, volumes) - volume_details = volumes[0] - return volume_details # type: ignore + volume_details: dict[str, Any] = volumes[0] + return volume_details -async def get_running_containers_count_from_names( +async def _get_container( + docker: aiodocker.Docker, container_name: str +) -> DockerContainer | None: + try: + return await docker.containers.get(container_name) + except aiodocker.DockerError as e: + if e.status == http_status.HTTP_404_NOT_FOUND: + return None + raise + + +async def _get_containers_inspect_from_names( container_names: list[str], -) -> PositiveInt: +) -> dict[str, DockerContainer | None]: + # NOTE: returned objects have their associated Docker client session closed if len(container_names) == 0: - return 0 + return {} + + containers_inspect: dict[str, DockerContainer | None] = { + x: None for x in container_names + } async with docker_client() as docker: - filters = clean_filters({"name": container_names}) - containers = await docker.containers.list(all=True, filters=filters) - return len(containers) + docker_containers: list[DockerContainer | None] = await logged_gather( + *( + _get_container(docker, container_name) + for container_name in container_names + ) + ) + for docker_container in docker_containers: + if docker_container is None: + continue + + stripped_name = docker_container["Name"].lstrip("/") + if stripped_name in containers_inspect: + containers_inspect[stripped_name] = docker_container + + return containers_inspect + + +async def get_container_states( + container_names: list[str], +) -> dict[str, ContainerState | None]: + """if a container is not found it's status is None""" + containers_inspect = await _get_containers_inspect_from_names(container_names) + return { + k: None if v is None else ContainerState(**v["State"]) + for k, v in containers_inspect.items() + } + + +def are_all_containers_in_expected_states( + states: Iterable[ContainerState | None], +) -> bool: + return all( + s is not None and s.Status in _ACCEPTED_CONTAINER_STATUSES for s in states + ) + + +async def get_containers_count_from_names( + container_names: list[str], +) -> PositiveInt: + # this one could handle the error + return len(await _get_containers_inspect_from_names(container_names)) def get_docker_service_images(compose_spec_yaml: str) -> set[str]: @@ -108,7 +172,7 @@ async def pull_images( _ImagesInfoDict = dict[ImageName, _LayersInfoDict] -class _ProgressDetailDict(TypedDict, total=True): +class _ProgressDetailDict(TypedDict, total=False): current: int total: int @@ -151,6 +215,8 @@ def _parse_docker_pull_progress( if status == _TargetPullStatus.DOWNLOADING: # writes + assert "current" in docker_pull_progress["progressDetail"] # nosec + assert "total" in docker_pull_progress["progressDetail"] # nosec image_pulling_data[layer_id] = ( round( _DOWNLOAD_RATIO * docker_pull_progress["progressDetail"]["current"] @@ -170,6 +236,7 @@ def _parse_docker_pull_progress( _, layer_total_size = image_pulling_data[layer_id] # writes + assert "current" in docker_pull_progress["progressDetail"] # nosec image_pulling_data[layer_id] = ( round( _DOWNLOAD_RATIO * layer_total_size @@ -217,7 +284,7 @@ async def _pull_image_with_progress( if match: registry_host = match.group("registry_host") else: - logger.error( + _logger.error( "%s does not match typical docker image pattern, please check! Image pulling will still be attempted but may fail.", f"{image_name=}", ) @@ -234,7 +301,7 @@ async def _pull_image_with_progress( if registry_host else None, ): - with log_catch(logger, reraise=False): + with log_catch(_logger, reraise=False): if _parse_docker_pull_progress( parse_obj_as(_DockerProgressDict, pull_progress), all_image_pulling_data[image_name], diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index b124f644174..3b0c7048b1d 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -10,6 +10,7 @@ ProgressType, RabbitEventMessageType, RabbitMessageBase, + RabbitResourceTrackingMessages, ) from pydantic import NonNegativeFloat from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context @@ -25,6 +26,12 @@ async def _post_rabbit_message(app: FastAPI, message: RabbitMessageBase) -> None await get_rabbitmq_client(app).publish(message.channel_name, message) +async def post_resource_tracking_message( + app: FastAPI, message: RabbitResourceTrackingMessages +): + await _post_rabbit_message(app, message) + + async def post_log_message( app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt ) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py index 06be167af9d..241ce3cfc40 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py @@ -1,4 +1,5 @@ import warnings +from datetime import timedelta from functools import lru_cache from pathlib import Path from typing import cast @@ -13,9 +14,19 @@ from settings_library.docker_registry import RegistrySettings from settings_library.r_clone import RCloneSettings from settings_library.rabbit import RabbitSettings +from settings_library.resource_usage_tracker import ( + DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL, +) from settings_library.utils_logging import MixinLoggingSettings +class ResourceTrackingSettings(BaseCustomSettings): + RESOURCE_TRACKING_HEARTBEAT_INTERVAL: timedelta = Field( + default=DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL, + description="each time the status of the service is propagated", + ) + + class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): SC_BOOT_MODE: BootModeEnum = Field( ..., @@ -108,6 +119,8 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): RABBIT_SETTINGS: RabbitSettings | None = Field(auto_default_from_env=True) DY_SIDECAR_R_CLONE_SETTINGS: RCloneSettings = Field(auto_default_from_env=True) + RESOURCE_TRACKING: ResourceTrackingSettings = Field(auto_default_from_env=True) + @validator("LOG_LEVEL") @classmethod def _check_log_level(cls, value): diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/schemas/containers.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/schemas/containers.py index 60d9606fefc..3a9606394ef 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/schemas/containers.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/schemas/containers.py @@ -1,5 +1,7 @@ +from models_library.services_creation import CreateServiceMetricsAdditionalParams from pydantic import BaseModel class ContainersCreate(BaseModel): docker_compose_yaml: str + metrics_params: CreateServiceMetricsAdditionalParams diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 3c74ac623fa..6625305be75 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -1,10 +1,14 @@ import functools import logging +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager from pathlib import Path from typing import Final from fastapi import FastAPI -from models_library.rabbitmq_messages import ProgressType +from models_library.generated_models.docker_rest_api import ContainerState +from models_library.rabbitmq_messages import ProgressType, SimcorePlatformStatus +from pydantic import PositiveInt from servicelib.fastapi.long_running_tasks.server import TaskProgress from servicelib.progress_bar import ProgressBarData from servicelib.utils import logged_gather @@ -24,7 +28,11 @@ docker_compose_start, ) from ..core.docker_logs import start_log_fetching, stop_log_fetching -from ..core.docker_utils import get_running_containers_count_from_names +from ..core.docker_utils import ( + are_all_containers_in_expected_states, + get_container_states, + get_containers_count_from_names, +) from ..core.rabbitmq import ( post_event_reload_iframe, post_progress_message, @@ -38,7 +46,8 @@ from ..models.shared_store import SharedStore from ..modules import nodeports from ..modules.mounted_fs import MountedVolumes -from ..modules.outputs import OutputsManager, outputs_watcher_disabled +from ..modules.outputs import OutputsManager, event_propagation_disabled +from .resource_tracking import send_service_started, send_service_stopped _logger = logging.getLogger(__name__) @@ -108,13 +117,24 @@ async def _retry_docker_compose_create( container_names = list(compose_spec_dict["services"].keys()) expected_num_containers = len(container_names) - actual_num_containers = await get_running_containers_count_from_names( - container_names - ) + actual_num_containers = await get_containers_count_from_names(container_names) return expected_num_containers == actual_num_containers +@asynccontextmanager +async def _reset_on_error( + shared_store: SharedStore, +) -> AsyncGenerator[None, None]: + try: + yield None + except Exception: + async with shared_store: + shared_store.compose_spec = None + shared_store.container_names = [] + raise + + async def task_create_service_containers( progress: TaskProgress, settings: ApplicationSettings, @@ -140,7 +160,7 @@ async def task_create_service_containers( assert shared_store.compose_spec # nosec - async with outputs_watcher_disabled(app): + async with event_propagation_disabled(app), _reset_on_error(shared_store): # removes previous pending containers progress.update(message="cleanup previous used resources") result = await docker_compose_rm(shared_store.compose_spec, settings) @@ -164,13 +184,19 @@ async def task_create_service_containers( await _retry_docker_compose_create(shared_store.compose_spec, settings) progress.update(message="ensure containers are started", percent=0.95) - r = await _retry_docker_compose_start(shared_store.compose_spec, settings) + compose_start_result = await _retry_docker_compose_start( + shared_store.compose_spec, settings + ) + + await send_service_started(app, metrics_params=containers_create.metrics_params) - message = f"Finished docker compose start with output\n{r.message}" + message = ( + f"Finished docker-compose start with output\n{compose_start_result.message}" + ) - if r.success: + if compose_start_result.success: await post_sidecar_log_message( - app, "service containers started", log_level=logging.INFO + app, "user services started", log_level=logging.INFO ) _logger.debug(message) for container_name in shared_store.container_names: @@ -182,7 +208,7 @@ async def task_create_service_containers( "Marked sidecar as unhealthy, see below for details\n:%s", message ) await post_sidecar_log_message( - app, "could not start service containers", log_level=logging.ERROR + app, "could not start user services", log_level=logging.ERROR ) return shared_store.container_names @@ -198,17 +224,58 @@ async def task_runs_docker_compose_down( _logger.warning("No compose-spec was found") return - progress.update(message="running docker compose down", percent=0.1) - result = await _retry_docker_compose_down(shared_store.compose_spec, settings) - _raise_for_errors(result, "down") + container_states: dict[str, ContainerState | None] = await get_container_states( + shared_store.container_names + ) + containers_were_ok = are_all_containers_in_expected_states( + container_states.values() + ) + + container_count_before_down: PositiveInt = await get_containers_count_from_names( + shared_store.container_names + ) + + async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus): + # NOTE: avoids sending a stop message without a start or any heartbeats, + # which makes no sense for the purpose of billing + if container_count_before_down > 0: + # if containers were not OK, we need to check their status + # only if oom killed we report as BAD + simcore_platform_status = platform_status + if not containers_were_ok: + any_container_oom_killed = any( + c.OOMKilled is True + for c in container_states.values() + if c is not None + ) + # if it's not an OOM killer (the user killed it) we set it as bad + # since the platform failed the container + if any_container_oom_killed: + _logger.warning( + "Containers killed to to OOMKiller: %s", container_states + ) + else: + simcore_platform_status = SimcorePlatformStatus.BAD + + await send_service_stopped(app, simcore_platform_status) + + try: + progress.update(message="running docker-compose-down", percent=0.1) + result = await _retry_docker_compose_down(shared_store.compose_spec, settings) + _raise_for_errors(result, "down") + + progress.update(message="stopping logs", percent=0.9) + for container_name in shared_store.container_names: + await stop_log_fetching(app, container_name) - progress.update(message="stopping logs", percent=0.9) - for container_name in shared_store.container_names: - await stop_log_fetching(app, container_name) + progress.update(message="removing pending resources", percent=0.95) + result = await docker_compose_rm(shared_store.compose_spec, settings) + _raise_for_errors(result, "rm") + except Exception: + await _send_resource_tracking_stop(SimcorePlatformStatus.BAD) + raise - progress.update(message="removing pending resources", percent=0.95) - result = await docker_compose_rm(shared_store.compose_spec, settings) - _raise_for_errors(result, "rm") + await _send_resource_tracking_stop(SimcorePlatformStatus.OK) # removing compose-file spec async with shared_store: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py index 68a48b7b0fe..2b890ef48dd 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py @@ -1,7 +1,7 @@ import os +from collections.abc import AsyncGenerator, Generator, Iterator from functools import cached_property from pathlib import Path -from typing import AsyncGenerator, Generator, Iterator from fastapi import FastAPI from models_library.projects_nodes import NodeID @@ -125,7 +125,7 @@ async def iter_state_paths_to_docker_volumes( self, run_id: RunID ) -> AsyncGenerator[str, None]: for volume_state_path, state_path in zip( - self.volume_name_state_paths(), self.state_paths + self.volume_name_state_paths(), self.state_paths, strict=True ): bind_path: Path = await self._get_bind_path_from_label( volume_state_path, run_id @@ -148,6 +148,3 @@ def setup_mounted_fs(app: FastAPI) -> MountedVolumes: ) return app.state.mounted_volumes - - -__all__: tuple[str, ...] = ("MountedVolumes",) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/__init__.py index bc82628c30d..0311ff0462e 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/__init__.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/__init__.py @@ -3,9 +3,9 @@ from ._context import OutputsContext, setup_outputs_context from ._manager import OutputsManager, setup_outputs_manager from ._watcher import ( - disable_outputs_watcher, - enable_outputs_watcher, - outputs_watcher_disabled, + disable_event_propagation, + enable_event_propagation, + event_propagation_disabled, setup_outputs_watcher, ) @@ -17,9 +17,9 @@ def setup_outputs(app: FastAPI) -> None: __all__: tuple[str, ...] = ( - "disable_outputs_watcher", - "enable_outputs_watcher", - "outputs_watcher_disabled", + "disable_event_propagation", + "enable_event_propagation", + "event_propagation_disabled", "OutputsContext", "OutputsManager", "setup_outputs", diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watcher.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watcher.py index 19e88c2174f..da0cdea251c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watcher.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watcher.py @@ -79,7 +79,7 @@ async def on_startup() -> None: outputs_context=outputs_context, ) await app.state.outputs_watcher.start() - await disable_outputs_watcher(app) + await disable_event_propagation(app) async def on_shutdown() -> None: outputs_watcher: OutputsWatcher | None = app.state.outputs_watcher @@ -90,22 +90,22 @@ async def on_shutdown() -> None: app.add_event_handler("shutdown", on_shutdown) -async def disable_outputs_watcher(app: FastAPI) -> None: +async def disable_event_propagation(app: FastAPI) -> None: outputs_watcher: OutputsWatcher | None = app.state.outputs_watcher if outputs_watcher is not None: await outputs_watcher.disable_event_propagation() -async def enable_outputs_watcher(app: FastAPI) -> None: +async def enable_event_propagation(app: FastAPI) -> None: outputs_watcher: OutputsWatcher | None = app.state.outputs_watcher if outputs_watcher is not None: await outputs_watcher.enable_event_propagation() @asynccontextmanager -async def outputs_watcher_disabled(app: FastAPI) -> AsyncGenerator[None, None]: +async def event_propagation_disabled(app: FastAPI) -> AsyncGenerator[None, None]: try: - await disable_outputs_watcher(app) + await disable_event_propagation(app) yield None finally: - await enable_outputs_watcher(app) + await enable_event_propagation(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/__init__.py new file mode 100644 index 00000000000..1c74097a4be --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/__init__.py @@ -0,0 +1,8 @@ +from ._core import send_service_started, send_service_stopped +from ._setup import setup_resource_tracking + +__all__: tuple[str, ...] = ( + "send_service_started", + "send_service_stopped", + "setup_resource_tracking", +) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py new file mode 100644 index 00000000000..40bcebf0921 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -0,0 +1,122 @@ +import logging +from typing import Final + +from fastapi import FastAPI +from models_library.generated_models.docker_rest_api import ContainerState +from models_library.rabbitmq_messages import ( + RabbitResourceTrackingHeartbeatMessage, + RabbitResourceTrackingStartedMessage, + RabbitResourceTrackingStoppedMessage, + SimcorePlatformStatus, +) +from models_library.services import ServiceType +from models_library.services_creation import CreateServiceMetricsAdditionalParams +from pydantic import NonNegativeFloat +from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.logging_utils import log_context + +from ...core.docker_utils import ( + are_all_containers_in_expected_states, + get_container_states, +) +from ...core.rabbitmq import post_resource_tracking_message +from ...core.settings import ApplicationSettings, ResourceTrackingSettings +from ...models.shared_store import SharedStore +from ._models import ResourceTrackingState + +_STOP_WORKER_TIMEOUT_S: Final[NonNegativeFloat] = 1.0 + +_logger = logging.getLogger(__name__) + + +def _get_settings(app: FastAPI) -> ApplicationSettings: + settings: ApplicationSettings = app.state.settings + return settings + + +async def _start_heart_beat_task(app: FastAPI) -> None: + settings: ApplicationSettings = _get_settings(app) + resource_tracking_settings: ResourceTrackingSettings = settings.RESOURCE_TRACKING + resource_tracking: ResourceTrackingState = app.state.resource_tracking + + if resource_tracking.heart_beat_task is not None: + msg = f"Unexpected task={resource_tracking.heart_beat_task} already running!" + raise RuntimeError(msg) + + with log_context(_logger, logging.DEBUG, "starting heart beat task"): + resource_tracking.heart_beat_task = start_periodic_task( + _heart_beat_task, + app=app, + interval=resource_tracking_settings.RESOURCE_TRACKING_HEARTBEAT_INTERVAL, + task_name="resource_tracking_heart_beat", + ) + + +async def stop_heart_beat_task(app: FastAPI) -> None: + resource_tracking: ResourceTrackingState = app.state.resource_tracking + if resource_tracking.heart_beat_task: + await stop_periodic_task( + resource_tracking.heart_beat_task, timeout=_STOP_WORKER_TIMEOUT_S + ) + + +async def _heart_beat_task(app: FastAPI): + settings: ApplicationSettings = _get_settings(app) + shared_store: SharedStore = app.state.shared_store + + container_states: dict[str, ContainerState | None] = await get_container_states( + shared_store.container_names + ) + + if are_all_containers_in_expected_states(container_states.values()): + message = RabbitResourceTrackingHeartbeatMessage( + service_run_id=settings.DY_SIDECAR_RUN_ID + ) + await post_resource_tracking_message(app, message) + else: + _logger.info( + "heart beat message skipped: container_states=%s", container_states + ) + + +async def send_service_stopped( + app: FastAPI, simcore_platform_status: SimcorePlatformStatus +) -> None: + await stop_heart_beat_task(app) + + settings: ApplicationSettings = _get_settings(app) + message = RabbitResourceTrackingStoppedMessage( + service_run_id=settings.DY_SIDECAR_RUN_ID, + simcore_platform_status=simcore_platform_status, + ) + await post_resource_tracking_message(app, message) + + +async def send_service_started( + app: FastAPI, *, metrics_params: CreateServiceMetricsAdditionalParams +) -> None: + settings: ApplicationSettings = _get_settings(app) + + message = RabbitResourceTrackingStartedMessage( + service_run_id=settings.DY_SIDECAR_RUN_ID, + wallet_id=metrics_params.wallet_id, + wallet_name=metrics_params.wallet_name, + product_name=metrics_params.product_name, + simcore_user_agent=metrics_params.simcore_user_agent, + user_id=settings.DY_SIDECAR_USER_ID, + user_email=metrics_params.user_email, + project_id=settings.DY_SIDECAR_PROJECT_ID, + project_name=metrics_params.project_name, + node_id=settings.DY_SIDECAR_NODE_ID, + node_name=metrics_params.node_name, + service_key=metrics_params.service_key, + service_version=metrics_params.service_version, + service_type=ServiceType.DYNAMIC, + service_resources=metrics_params.service_resources, + service_additional_metadata=metrics_params.service_additional_metadata, + pricing_plan_id=metrics_params.pricing_plan_id, + pricing_detail_id=metrics_params.pricing_detail_id, + ) + await post_resource_tracking_message(app, message) + + await _start_heart_beat_task(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_models.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_models.py new file mode 100644 index 00000000000..4708e9c291f --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_models.py @@ -0,0 +1,10 @@ +from asyncio import Task + +from pydantic import BaseModel + + +class ResourceTrackingState(BaseModel): + heart_beat_task: Task | None = None + + class Config: + arbitrary_types_allowed = True diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_setup.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_setup.py new file mode 100644 index 00000000000..22469fad6d1 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_setup.py @@ -0,0 +1,15 @@ +from fastapi import FastAPI + +from ._core import stop_heart_beat_task +from ._models import ResourceTrackingState + + +def setup_resource_tracking(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.resource_tracking = ResourceTrackingState() + + async def on_shutdown() -> None: + await stop_heart_beat_task(app) + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index 27b8eff5cb4..ed670d67799 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -6,8 +6,8 @@ import logging import sys +from collections.abc import Iterable, Iterator from pathlib import Path -from typing import Iterable, Iterator from unittest.mock import AsyncMock import pytest @@ -16,8 +16,9 @@ from models_library.projects import ProjectID from models_library.projects_nodes import NodeID from models_library.services import RunID +from models_library.services_creation import CreateServiceMetricsAdditionalParams from models_library.users import UserID -from pytest import LogCaptureFixture, MonkeyPatch +from pydantic import parse_obj_as from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.utils_envs import ( EnvVarsDict, @@ -176,7 +177,7 @@ def base_mock_envs( @pytest.fixture def mock_environment( - monkeypatch: MonkeyPatch, + monkeypatch: pytest.MonkeyPatch, base_mock_envs: EnvVarsDict, user_id: UserID, project_id: ProjectID, @@ -234,7 +235,7 @@ def mock_environment( @pytest.fixture def mock_environment_with_envdevel( - monkeypatch: MonkeyPatch, project_slug_dir: Path + monkeypatch: pytest.MonkeyPatch, project_slug_dir: Path ) -> EnvVarsDict: """Alternative environment loaded fron .env-devel. @@ -245,7 +246,9 @@ def mock_environment_with_envdevel( @pytest.fixture() -def caplog_info_debug(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture]: +def caplog_info_debug( + caplog: pytest.LogCaptureFixture, +) -> Iterable[pytest.LogCaptureFixture]: with caplog.at_level(logging.DEBUG): yield caplog @@ -267,7 +270,7 @@ def mock_core_rabbitmq(mocker: MockerFixture) -> dict[str, AsyncMock]: return_value=None, autospec=True, ), - "post_log_message": mocker.patch( + "post_rabbit_message": mocker.patch( "simcore_service_dynamic_sidecar.core.rabbitmq._post_rabbit_message", return_value=None, autospec=True, @@ -278,3 +281,19 @@ def mock_core_rabbitmq(mocker: MockerFixture) -> dict[str, AsyncMock]: autospec=True, ), } + + +@pytest.fixture +def mock_stop_heart_beat_task(mocker: MockerFixture) -> AsyncMock: + return mocker.patch( + "simcore_service_dynamic_sidecar.modules.resource_tracking._core.stop_heart_beat_task", + return_value=None, + ) + + +@pytest.fixture +def mock_metrics_params(faker: Faker) -> CreateServiceMetricsAdditionalParams: + return parse_obj_as( + CreateServiceMetricsAdditionalParams, + CreateServiceMetricsAdditionalParams.Config.schema_extra["example"], + ) diff --git a/services/dynamic-sidecar/tests/unit/conftest.py b/services/dynamic-sidecar/tests/unit/conftest.py index ddf31c6f18d..6df34b37c63 100644 --- a/services/dynamic-sidecar/tests/unit/conftest.py +++ b/services/dynamic-sidecar/tests/unit/conftest.py @@ -11,7 +11,7 @@ from aiodocker.volumes import DockerVolume from async_asgi_testclient import TestClient from fastapi import FastAPI -from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict from simcore_service_dynamic_sidecar.core.application import AppState, create_app from simcore_service_dynamic_sidecar.core.docker_compose_utils import ( docker_compose_down, @@ -124,3 +124,21 @@ async def cleanup_containers(app: FastAPI) -> AsyncIterator[None]: return await docker_compose_down(app_state.compose_spec, app_state.settings) + + +@pytest.fixture +def mock_rabbitmq_envs( + mock_core_rabbitmq: dict[str, AsyncMock], + monkeypatch: pytest.MonkeyPatch, + mock_environment: EnvVarsDict, +) -> EnvVarsDict: + setenvs_from_dict( + monkeypatch, + { + "RABBIT_HOST": "mocked_host", + "RABBIT_SECURE": "false", + "RABBIT_USER": "mocked_user", + "RABBIT_PASSWORD": "mocked_password", + }, + ) + return mock_environment diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers.py b/services/dynamic-sidecar/tests/unit/test_api_containers.py index 22b354c7c03..c10297e1754 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers.py +++ b/services/dynamic-sidecar/tests/unit/test_api_containers.py @@ -23,7 +23,9 @@ from faker import Faker from fastapi import FastAPI, status from models_library.services import ServiceOutput +from models_library.services_creation import CreateServiceMetricsAdditionalParams from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict from servicelib.docker_constants import SUFFIX_EGRESS_PROXY_NAME from servicelib.fastapi.long_running_tasks.client import TaskId from simcore_service_dynamic_sidecar._meta import API_VTAG @@ -82,10 +84,18 @@ async def _assert_disable_ports_io(test_client: TestClient) -> None: assert response.text == "" -async def _start_containers(test_client: TestClient, compose_spec: str) -> list[str]: +async def _start_containers( + test_client: TestClient, + compose_spec: str, + mock_metrics_params: CreateServiceMetricsAdditionalParams, +) -> list[str]: # start containers response = await test_client.post( - f"/{API_VTAG}/containers", json={"docker_compose_yaml": compose_spec} + f"/{API_VTAG}/containers", + json={ + "docker_compose_yaml": compose_spec, + "metrics_params": mock_metrics_params.dict(), + }, ) assert response.status_code == status.HTTP_202_ACCEPTED, response.text task_id: TaskId = response.json() @@ -144,6 +154,11 @@ async def _assert_compose_spec_pulled(compose_spec: str, settings: ApplicationSe assert len(started_containers) == expected_services_count +@pytest.fixture +def mock_environment(mock_rabbitmq_envs: EnvVarsDict) -> EnvVarsDict: + return mock_rabbitmq_envs + + @pytest.fixture def app(app: FastAPI) -> FastAPI: app.state.shared_store = SharedStore() # emulate on_startup event @@ -224,11 +239,15 @@ def selected_spec(request, compose_spec: str, compose_spec_single_service: str) @pytest.fixture -async def started_containers(test_client: TestClient, compose_spec: str) -> list[str]: +async def started_containers( + test_client: TestClient, + compose_spec: str, + mock_metrics_params: CreateServiceMetricsAdditionalParams, +) -> list[str]: settings: ApplicationSettings = test_client.application.state.settings await _assert_compose_spec_pulled(compose_spec, settings) - return await _start_containers(test_client, compose_spec) + return await _start_containers(test_client, compose_spec, mock_metrics_params) @pytest.fixture @@ -642,8 +661,11 @@ async def test_attach_detach_container_to_network( test_client: TestClient, selected_spec: str, attachable_networks_and_ids: dict[str, str], + mock_metrics_params: CreateServiceMetricsAdditionalParams, ): - container_names = await _start_containers(test_client, selected_spec) + container_names = await _start_containers( + test_client, selected_spec, mock_metrics_params + ) async with aiodocker.Docker() as docker: for container_name in container_names: diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py index 2aaa6a6406f..3eda0788220 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py @@ -9,6 +9,7 @@ from inspect import getmembers, isfunction from pathlib import Path from typing import Any, Final +from unittest.mock import AsyncMock import aiodocker import faker @@ -19,8 +20,10 @@ from fastapi import FastAPI from fastapi.routing import APIRoute from httpx import AsyncClient +from models_library.services_creation import CreateServiceMetricsAdditionalParams from pydantic import AnyHttpUrl, parse_obj_as from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict from servicelib.fastapi.long_running_tasks.client import ( Client, TaskClientResultError, @@ -31,6 +34,7 @@ from simcore_sdk.node_ports_common.exceptions import NodeNotFound from simcore_service_dynamic_sidecar._meta import API_VTAG from simcore_service_dynamic_sidecar.api import containers_long_running_tasks +from simcore_service_dynamic_sidecar.models.schemas.containers import ContainersCreate from simcore_service_dynamic_sidecar.models.shared_store import SharedStore from simcore_service_dynamic_sidecar.modules.inputs import enable_inputs_state_pulling from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext @@ -112,19 +116,25 @@ def dynamic_sidecar_network_name() -> str: "version": "3", "services": { "first-box": { - "image": "busybox:latest", + "image": "alpine:latest", "networks": { _get_dynamic_sidecar_network_name(): None, }, }, - "second-box": {"image": "busybox:latest"}, + "second-box": { + "image": "alpine:latest", + "command": ["sh", "-c", "sleep 100000"], + }, }, "networks": {_get_dynamic_sidecar_network_name(): None}, }, { "version": "3", "services": { - "solo-box": {"image": "busybox:latest"}, + "solo-box": { + "image": "alpine:latest", + "command": ["sh", "-c", "sleep 100000"], + }, }, }, ] @@ -139,6 +149,11 @@ def backend_url() -> AnyHttpUrl: return parse_obj_as(AnyHttpUrl, "http://backgroud.testserver.io") +@pytest.fixture +def mock_environment(mock_rabbitmq_envs: EnvVarsDict) -> EnvVarsDict: + return mock_rabbitmq_envs + + @pytest.fixture async def app(app: FastAPI) -> AsyncIterable[FastAPI]: # add the client setup to the same application @@ -246,10 +261,17 @@ async def _mocked(*args, **kwargs) -> None: async def _get_task_id_create_service_containers( - httpx_async_client: AsyncClient, compose_spec: str, *args, **kwargs + httpx_async_client: AsyncClient, + compose_spec: str, + mock_metrics_params: CreateServiceMetricsAdditionalParams, + *args, + **kwargs, ) -> TaskId: + containers_create = ContainersCreate( + docker_compose_yaml=compose_spec, metrics_params=mock_metrics_params + ) response = await httpx_async_client.post( - f"/{API_VTAG}/containers", json={"docker_compose_yaml": compose_spec} + f"/{API_VTAG}/containers", json=containers_create.dict() ) task_id: TaskId = response.json() assert isinstance(task_id, str) @@ -336,6 +358,8 @@ async def test_create_containers_task( httpx_async_client: AsyncClient, client: Client, compose_spec: str, + mock_stop_heart_beat_task: AsyncMock, + mock_metrics_params: CreateServiceMetricsAdditionalParams, shared_store: SharedStore, ) -> None: last_progress_message: tuple[str, float] | None = None @@ -348,7 +372,7 @@ async def create_progress(message: str, percent: float, _: TaskId) -> None: async with periodic_task_result( client=client, task_id=await _get_task_id_create_service_containers( - httpx_async_client, compose_spec + httpx_async_client, compose_spec, mock_metrics_params ), task_timeout=CREATE_SERVICE_CONTAINERS_TIMEOUT, status_poll_interval=FAST_STATUS_POLL, @@ -360,13 +384,16 @@ async def create_progress(message: str, percent: float, _: TaskId) -> None: async def test_create_containers_task_invalid_yaml_spec( - httpx_async_client: AsyncClient, client: Client + httpx_async_client: AsyncClient, + client: Client, + mock_stop_heart_beat_task: AsyncMock, + mock_metrics_params: CreateServiceMetricsAdditionalParams, ): with pytest.raises(TaskClientResultError) as exec_info: async with periodic_task_result( client=client, task_id=await _get_task_id_create_service_containers( - httpx_async_client, "" + httpx_async_client, "", mock_metrics_params ), task_timeout=CREATE_SERVICE_CONTAINERS_TIMEOUT, status_poll_interval=FAST_STATUS_POLL, @@ -394,11 +421,14 @@ async def test_same_task_id_is_returned_if_task_exists( client: Client, mocker: MockerFixture, get_task_id_callable: Callable[..., Awaitable], + mock_stop_heart_beat_task: AsyncMock, + mock_metrics_params: CreateServiceMetricsAdditionalParams, ) -> None: def _get_awaitable() -> Awaitable: return get_task_id_callable( httpx_async_client=httpx_async_client, compose_spec="", + mock_metrics_params=mock_metrics_params, port_keys=None, command_timeout=0, ) @@ -420,13 +450,17 @@ async def test_containers_down_after_starting( httpx_async_client: AsyncClient, client: Client, compose_spec: str, + mock_stop_heart_beat_task: AsyncMock, + mock_metrics_params: CreateServiceMetricsAdditionalParams, shared_store: SharedStore, + mock_core_rabbitmq: dict[str, AsyncMock], + mocker: MockerFixture, ): # start containers async with periodic_task_result( client=client, task_id=await _get_task_id_create_service_containers( - httpx_async_client, compose_spec + httpx_async_client, compose_spec, mock_metrics_params ), task_timeout=CREATE_SERVICE_CONTAINERS_TIMEOUT, status_poll_interval=FAST_STATUS_POLL, @@ -582,12 +616,14 @@ async def test_containers_restart( httpx_async_client: AsyncClient, client: Client, compose_spec: str, + mock_stop_heart_beat_task: AsyncMock, + mock_metrics_params: CreateServiceMetricsAdditionalParams, shared_store: SharedStore, ): async with periodic_task_result( client=client, task_id=await _get_task_id_create_service_containers( - httpx_async_client, compose_spec + httpx_async_client, compose_spec, mock_metrics_params ), task_timeout=CREATE_SERVICE_CONTAINERS_TIMEOUT, status_poll_interval=FAST_STATUS_POLL, diff --git a/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py new file mode 100644 index 00000000000..6347f3a96b4 --- /dev/null +++ b/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py @@ -0,0 +1,446 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=no-member + + +import asyncio +import json +from collections.abc import AsyncIterable, Callable +from pathlib import Path +from typing import Any, Final +from unittest.mock import AsyncMock + +import aiodocker +import pytest +from aiodocker.containers import DockerContainer +from aiodocker.utils import clean_filters +from aiodocker.volumes import DockerVolume +from asgi_lifespan import LifespanManager +from fastapi import FastAPI +from httpx import AsyncClient +from models_library.generated_models.docker_rest_api import ContainerState +from models_library.generated_models.docker_rest_api import Status2 as ContainerStatus +from models_library.rabbitmq_messages import ( + RabbitResourceTrackingHeartbeatMessage, + RabbitResourceTrackingMessages, + RabbitResourceTrackingStartedMessage, + RabbitResourceTrackingStoppedMessage, + SimcorePlatformStatus, +) +from models_library.services_creation import CreateServiceMetricsAdditionalParams +from pydantic import AnyHttpUrl, parse_obj_as +from pytest_mock import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from servicelib.fastapi.long_running_tasks.client import ( + Client, + TaskClientResultError, + TaskId, + periodic_task_result, +) +from servicelib.fastapi.long_running_tasks.client import setup as client_setup +from simcore_service_dynamic_sidecar._meta import API_VTAG +from simcore_service_dynamic_sidecar.core.docker_utils import get_container_states +from simcore_service_dynamic_sidecar.models.schemas.containers import ContainersCreate +from simcore_service_dynamic_sidecar.models.shared_store import SharedStore +from tenacity import AsyncRetrying, TryAgain +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + +_FAST_STATUS_POLL: Final[float] = 0.1 +_CREATE_SERVICE_CONTAINERS_TIMEOUT: Final[float] = 60 +_BASE_HEART_BEAT_INTERVAL: Final[float] = 0.1 + + +@pytest.fixture(params=[1, 2]) +def container_names(request: pytest.FixtureRequest) -> list[str]: + return [f"service-{i}" for i in range(request.param)] + + +@pytest.fixture +def raw_compose_spec(container_names: list[str]) -> dict[str, Any]: + base_spec: dict[str, Any] = {"version": "3", "services": {}} + + for container_name in container_names: + base_spec["services"][container_name] = { + "image": "alpine:latest", + "command": ["sh", "-c", "sleep 100000"], + } + + return base_spec + + +@pytest.fixture +def compose_spec(raw_compose_spec: dict[str, Any]) -> str: + return json.dumps(raw_compose_spec) + + +@pytest.fixture +def backend_url() -> AnyHttpUrl: + return parse_obj_as(AnyHttpUrl, "http://backgroud.testserver.io") + + +@pytest.fixture +def mock_environment( + monkeypatch: pytest.MonkeyPatch, mock_rabbitmq_envs: EnvVarsDict +) -> EnvVarsDict: + setenvs_from_dict( + monkeypatch, + {"RESOURCE_TRACKING_HEARTBEAT_INTERVAL": f"{_BASE_HEART_BEAT_INTERVAL}"}, + ) + return mock_rabbitmq_envs + + +@pytest.fixture +async def app(app: FastAPI) -> AsyncIterable[FastAPI]: + # add the client setup to the same application + # this is only required for testing, in reality + # this will be in a different process + client_setup(app) + async with LifespanManager(app): + yield app + + +@pytest.fixture +async def httpx_async_client( + app: FastAPI, + backend_url: AnyHttpUrl, + ensure_external_volumes: tuple[DockerVolume], + cleanup_containers: None, + ensure_shared_store_dir: Path, +) -> AsyncIterable[AsyncClient]: + # crete dir here + async with AsyncClient( + app=app, base_url=backend_url, headers={"Content-Type": "application/json"} + ) as client: + yield client + + +@pytest.fixture +def client( + app: FastAPI, httpx_async_client: AsyncClient, backend_url: AnyHttpUrl +) -> Client: + return Client(app=app, async_client=httpx_async_client, base_url=backend_url) + + +@pytest.fixture +def mock_user_services_fail_to_start(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_dynamic_sidecar.modules.long_running_tasks._retry_docker_compose_create", + side_effect=RuntimeError(""), + ) + + +@pytest.fixture +def mock_user_services_fail_to_stop(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_dynamic_sidecar.modules.long_running_tasks._retry_docker_compose_down", + side_effect=RuntimeError(""), + ) + + +async def _get_task_id_create_service_containers( + httpx_async_client: AsyncClient, + compose_spec: str, + mock_metrics_params: CreateServiceMetricsAdditionalParams, +) -> TaskId: + containers_create = ContainersCreate( + docker_compose_yaml=compose_spec, metrics_params=mock_metrics_params + ) + response = await httpx_async_client.post( + f"/{API_VTAG}/containers", json=containers_create.dict() + ) + task_id: TaskId = response.json() + assert isinstance(task_id, str) + return task_id + + +async def _get_task_id_docker_compose_down(httpx_async_client: AsyncClient) -> TaskId: + response = await httpx_async_client.post(f"/{API_VTAG}/containers:down") + task_id: TaskId = response.json() + assert isinstance(task_id, str) + return task_id + + +def _get_resource_tracking_messages( + mock_core_rabbitmq: dict[str, AsyncMock] +) -> list[RabbitResourceTrackingMessages]: + return [ + x[0][1] + for x in mock_core_rabbitmq["post_rabbit_message"].call_args_list + if isinstance(x[0][1], RabbitResourceTrackingMessages) + ] + + +async def _wait_for_containers_to_be_running(app: FastAPI) -> None: + shared_store: SharedStore = app.state.shared_store + async for attempt in AsyncRetrying(wait=wait_fixed(0.1), stop=stop_after_delay(4)): + with attempt: + containers_statuses = await get_container_states( + shared_store.container_names + ) + + running_container_statuses = [ + x + for x in containers_statuses.values() + if x is not None and x.Status == ContainerStatus.running + ] + + if len(running_container_statuses) != len(shared_store.container_names): + raise TryAgain + + +async def test_service_starts_and_closes_as_expected( + mock_core_rabbitmq: dict[str, AsyncMock], + app: FastAPI, + httpx_async_client: AsyncClient, + client: Client, + compose_spec: str, + container_names: list[str], + mock_metrics_params: CreateServiceMetricsAdditionalParams, +): + async with periodic_task_result( + client=client, + task_id=await _get_task_id_create_service_containers( + httpx_async_client, compose_spec, mock_metrics_params + ), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ) as result: + assert isinstance(result, list) + assert len(result) == len(container_names) + + await _wait_for_containers_to_be_running(app) + + async with periodic_task_result( + client=client, + task_id=await _get_task_id_docker_compose_down(httpx_async_client), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ) as result: + assert result is None + + # NOTE: task was not properly cancelled and events were still + # generated. This is here to catch regressions. + await asyncio.sleep(_BASE_HEART_BEAT_INTERVAL * 10) + + # Ensure messages arrive in the expected order + resource_tracking_messages = _get_resource_tracking_messages(mock_core_rabbitmq) + assert len(resource_tracking_messages) >= 3 + + start_message = resource_tracking_messages[0] + heart_beat_messages = resource_tracking_messages[1:-1] + assert len(heart_beat_messages) > 0 + stop_message = resource_tracking_messages[-1] + + assert isinstance(start_message, RabbitResourceTrackingStartedMessage) + for heart_beat_message in heart_beat_messages: + assert isinstance(heart_beat_message, RabbitResourceTrackingHeartbeatMessage) + assert isinstance(stop_message, RabbitResourceTrackingStoppedMessage) + assert stop_message.simcore_platform_status == SimcorePlatformStatus.OK + + +@pytest.mark.parametrize("with_compose_down", [True, False]) +async def test_user_services_fail_to_start( + mock_core_rabbitmq: dict[str, AsyncMock], + app: FastAPI, + httpx_async_client: AsyncClient, + client: Client, + compose_spec: str, + mock_metrics_params: CreateServiceMetricsAdditionalParams, + with_compose_down: bool, + mock_user_services_fail_to_start: None, +): + with pytest.raises(TaskClientResultError): + async with periodic_task_result( + client=client, + task_id=await _get_task_id_create_service_containers( + httpx_async_client, compose_spec, mock_metrics_params + ), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ): + ... + shared_store: SharedStore = app.state.shared_store + assert len(shared_store.container_names) == 0 + + if with_compose_down: + async with periodic_task_result( + client=client, + task_id=await _get_task_id_docker_compose_down(httpx_async_client), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ) as result: + assert result is None + + # no messages were sent + resource_tracking_messages = _get_resource_tracking_messages(mock_core_rabbitmq) + assert len(resource_tracking_messages) == 0 + + +async def test_user_services_fail_to_stop_or_save_data( + mock_core_rabbitmq: dict[str, AsyncMock], + app: FastAPI, + httpx_async_client: AsyncClient, + client: Client, + compose_spec: str, + container_names: list[str], + mock_metrics_params: CreateServiceMetricsAdditionalParams, + mock_user_services_fail_to_stop: None, +): + async with periodic_task_result( + client=client, + task_id=await _get_task_id_create_service_containers( + httpx_async_client, compose_spec, mock_metrics_params + ), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ) as result: + assert isinstance(result, list) + assert len(result) == len(container_names) + + await _wait_for_containers_to_be_running(app) + + # in case of manual intervention multiple stops will be sent + _EXPECTED_STOP_MESSAGES = 4 + for _ in range(_EXPECTED_STOP_MESSAGES): + with pytest.raises(TaskClientResultError): + async with periodic_task_result( + client=client, + task_id=await _get_task_id_docker_compose_down(httpx_async_client), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ): + ... + + # Ensure messages arrive in the expected order + resource_tracking_messages = _get_resource_tracking_messages(mock_core_rabbitmq) + assert len(resource_tracking_messages) >= 3 + + start_message = resource_tracking_messages[0] + heart_beat_messages = resource_tracking_messages[1:-_EXPECTED_STOP_MESSAGES] + assert len(heart_beat_messages) > 0 + stop_messages = resource_tracking_messages[-_EXPECTED_STOP_MESSAGES:] + # NOTE: this is a situation where multiple stop events are sent out + # since the stopping fails and the operation is repeated + assert len(stop_messages) == _EXPECTED_STOP_MESSAGES + + assert isinstance(start_message, RabbitResourceTrackingStartedMessage) + for heart_beat_message in heart_beat_messages: + assert isinstance(heart_beat_message, RabbitResourceTrackingHeartbeatMessage) + for stop_message in stop_messages: + assert isinstance(stop_message, RabbitResourceTrackingStoppedMessage) + assert stop_message.simcore_platform_status == SimcorePlatformStatus.BAD + + +async def _simulate_container_crash(container_names: list[str]) -> None: + async with aiodocker.Docker() as docker: + filters = clean_filters({"name": container_names}) + containers: list[DockerContainer] = await docker.containers.list( + all=True, filters=filters + ) + for container in containers: + await container.kill() + + +@pytest.fixture +def mock_one_container_oom_killed(mocker: MockerFixture) -> Callable[[], None]: + def _mock() -> None: + async def _mocked_get_container_states( + container_names: list[str], + ) -> dict[str, ContainerState | None]: + results = await get_container_states(container_names) + for result in results.values(): + if result: + result.OOMKilled = True + result.Status = ContainerStatus.exited + break + return results + + mocker.patch( + "simcore_service_dynamic_sidecar.modules.long_running_tasks.get_container_states", + side_effect=_mocked_get_container_states, + ) + mocker.patch( + "simcore_service_dynamic_sidecar.modules.resource_tracking._core.get_container_states", + side_effect=_mocked_get_container_states, + ) + + return _mock + + +@pytest.mark.parametrize("expected_platform_state", SimcorePlatformStatus) +async def test_user_services_crash_when_running( + mock_core_rabbitmq: dict[str, AsyncMock], + app: FastAPI, + httpx_async_client: AsyncClient, + client: Client, + compose_spec: str, + container_names: list[str], + mock_metrics_params: CreateServiceMetricsAdditionalParams, + mock_one_container_oom_killed: Callable[[], None], + expected_platform_state: SimcorePlatformStatus, +): + async with periodic_task_result( + client=client, + task_id=await _get_task_id_create_service_containers( + httpx_async_client, compose_spec, mock_metrics_params + ), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ) as result: + assert isinstance(result, list) + assert len(result) == len(container_names) + + await _wait_for_containers_to_be_running(app) + + # let a few heartbeats pass + await asyncio.sleep(_BASE_HEART_BEAT_INTERVAL * 2) + + # crash the user services + if expected_platform_state == SimcorePlatformStatus.OK: + # was it oom killed, not our fault + mock_one_container_oom_killed() + else: + # something else happened our fault and is bad + await _simulate_container_crash(container_names) + + # check only start and heartbeats are present + resource_tracking_messages = _get_resource_tracking_messages(mock_core_rabbitmq) + assert len(resource_tracking_messages) >= 2 + + start_message = resource_tracking_messages[0] + heart_beat_messages = resource_tracking_messages[1:] + + assert isinstance(start_message, RabbitResourceTrackingStartedMessage) + for heart_beat_message in heart_beat_messages: + assert isinstance(heart_beat_message, RabbitResourceTrackingHeartbeatMessage) + + # reset mock + await asyncio.sleep(_BASE_HEART_BEAT_INTERVAL * 2) + mock_core_rabbitmq["post_rabbit_message"].reset_mock() + + # wait a bit more and check no further heartbeats are sent + await asyncio.sleep(_BASE_HEART_BEAT_INTERVAL * 2) + new_resource_tracking_messages = _get_resource_tracking_messages(mock_core_rabbitmq) + assert len(new_resource_tracking_messages) == 0 + + # sending stop events, and since there was an issue multiple stops + # will be sent due to manual intervention + _EXPECTED_STOP_MESSAGES = 4 + for _ in range(_EXPECTED_STOP_MESSAGES): + async with periodic_task_result( + client=client, + task_id=await _get_task_id_docker_compose_down(httpx_async_client), + task_timeout=_CREATE_SERVICE_CONTAINERS_TIMEOUT, + status_poll_interval=_FAST_STATUS_POLL, + ) as result: + assert result is None + + resource_tracking_messages = _get_resource_tracking_messages(mock_core_rabbitmq) + # NOTE: only 1 stop event arrives here since the stopping of the containers + # was successful + assert len(resource_tracking_messages) == 1 + + for stop_message in resource_tracking_messages: + assert isinstance(stop_message, RabbitResourceTrackingStoppedMessage) + assert stop_message.simcore_platform_status == expected_platform_state diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py b/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py index af67bd538d8..e7d780db9dd 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py @@ -2,14 +2,13 @@ # pylint: disable=unused-argument import asyncio -from typing import AsyncIterable +from collections.abc import AsyncIterable from unittest.mock import AsyncMock import aiodocker import pytest from async_asgi_testclient import TestClient from fastapi import FastAPI -from pytest import MonkeyPatch from simcore_service_dynamic_sidecar.core.docker_logs import ( _get_background_log_fetcher, start_log_fetching, @@ -19,7 +18,7 @@ @pytest.fixture def mock_environment( - monkeypatch: MonkeyPatch, + monkeypatch: pytest.MonkeyPatch, mock_environment: None, ) -> None: monkeypatch.setenv("DYNAMIC_SIDECAR_COMPOSE_NAMESPACE", "test-space") @@ -56,6 +55,6 @@ async def test_background_log_fetcher( await start_log_fetching(app=app, container_name=container_name) # wait for background log fetcher await asyncio.sleep(1) - assert mock_core_rabbitmq["post_log_message"].call_count == 1 + assert mock_core_rabbitmq["post_rabbit_message"].call_count == 1 await stop_log_fetching(app=app, container_name=container_name) diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py index 4f2670ef43a..e810edbe177 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py @@ -1,22 +1,34 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable -from typing import AsyncIterable, AsyncIterator +from collections.abc import AsyncIterable, AsyncIterator +from contextlib import suppress +from typing import Any import aiodocker import pytest import yaml +from aiodocker.containers import DockerContainer +from faker import Faker +from models_library.generated_models.docker_rest_api import ContainerState +from models_library.generated_models.docker_rest_api import Status2 as ContainerStatus from models_library.services import RunID -from pydantic import PositiveInt, SecretStr -from pytest import FixtureRequest +from pydantic import PositiveInt, SecretStr, parse_obj_as from settings_library.docker_registry import RegistrySettings from simcore_service_dynamic_sidecar.core.docker_utils import ( + _DockerProgressDict, + _get_containers_inspect_from_names, + _parse_docker_pull_progress, + get_container_states, + get_containers_count_from_names, get_docker_service_images, - get_running_containers_count_from_names, get_volume_by_label, pull_images, ) from simcore_service_dynamic_sidecar.core.errors import VolumeNotFoundError +from tenacity import AsyncRetrying, TryAgain +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed @pytest.fixture(scope="session") @@ -45,7 +57,7 @@ async def volume_with_label(volume_name: str, run_id: RunID) -> AsyncIterable[No @pytest.fixture(params=[0, 1, 2, 3]) -def container_count(request: FixtureRequest) -> PositiveInt: +def container_count(request: pytest.FixtureRequest) -> PositiveInt: return request.param @@ -59,8 +71,8 @@ async def started_services(container_names: list[str]) -> AsyncIterator[None]: async with aiodocker.Docker() as docker_client: started_containers = [] for container_name in container_names: - container = await docker_client.containers.create( - config={"Image": "busybox:latest"}, + container = await docker_client.containers.run( + config={"Image": "alpine:latest", "Cmd": ["sh", "-c", "sleep 10000"]}, name=container_name, ) started_containers.append(container) @@ -68,7 +80,8 @@ async def started_services(container_names: list[str]) -> AsyncIterator[None]: yield for container in started_containers: - await container.stop() + with suppress(aiodocker.DockerError): + await container.kill() await container.delete() @@ -87,10 +100,59 @@ async def test_volume_label_missing(run_id: RunID) -> None: assert "not_exist" in error_msg +async def _wait_for_containers_to_be_running(container_names: list[str]) -> None: + async for attempt in AsyncRetrying(wait=wait_fixed(0.1), stop=stop_after_delay(4)): + with attempt: + containers_statuses = await get_container_states(container_names) + + running_container_statuses = [ + x + for x in containers_statuses.values() + if x is not None and x.Status == ContainerStatus.running + ] + + if len(running_container_statuses) != len(container_names): + raise TryAgain + + +async def test__get_containers_inspect_from_names( + started_services: None, container_names: list[str], faker: Faker +): + MISSING_CONTAINER_NAME = f"missing-container-{faker.uuid4()}" + container_details: dict[ + str, DockerContainer | None + ] = await _get_containers_inspect_from_names( + [*container_names, MISSING_CONTAINER_NAME] + ) + # containers which do not exist always return None + assert MISSING_CONTAINER_NAME in container_details + assert container_details.pop(MISSING_CONTAINER_NAME) is None + + assert set(container_details.keys()) == set(container_names) + for docker_container in container_details.values(): + assert docker_container is not None + + +async def test_get_container_statuses( + started_services: None, container_names: list[str], faker: Faker +): + MISSING_CONTAINER_NAME = f"missing-container-{faker.uuid4()}" + container_states: dict[str, ContainerState | None] = await get_container_states( + [*container_names, MISSING_CONTAINER_NAME] + ) + # containers which do not exist always have a None status + assert MISSING_CONTAINER_NAME in container_states + assert container_states.pop(MISSING_CONTAINER_NAME) is None + + assert set(container_states.keys()) == set(container_names) + for docker_status in container_states.values(): + assert docker_status is not None + + async def test_get_running_containers_count_from_names( started_services: None, container_names: list[str], container_count: PositiveInt ): - found_containers = await get_running_containers_count_from_names(container_names) + found_containers = await get_containers_count_from_names(container_names) assert found_containers == container_count @@ -188,3 +250,53 @@ async def _print_log(msg, log_level): progress_cb=_print_progress, log_cb=_print_log, ) + + +PULL_PROGRESS_SEQUENCE: list[dict[str, Any]] = [ + {"status": "Pulling from library/busybox", "id": "latest"}, + {"status": "Pulling fs layer", "progressDetail": {}, "id": "3f4d90098f5b"}, + { + "status": "Downloading", + "progressDetail": {"current": 22621, "total": 2219949}, + "progress": "[> ] 22.62kB/2.22MB", + "id": "3f4d90098f5b", + }, + {"status": "Download complete", "progressDetail": {}, "id": "3f4d90098f5b"}, + { + "status": "Extracting", + "progressDetail": {"current": 32768, "total": 2219949}, + "progress": "[> ] 32.77kB/2.22MB", + "id": "3f4d90098f5b", + }, + { + "status": "Extracting", + "progressDetail": {"current": 884736, "total": 2219949}, + "progress": "[===================> ] 884.7kB/2.22MB", + "id": "3f4d90098f5b", + }, + { + "status": "Extracting", + "progressDetail": {"current": 2219949, "total": 2219949}, + "progress": "[==================================================>] 2.22MB/2.22MB", + "id": "3f4d90098f5b", + }, + {"status": "Pull complete", "progressDetail": {}, "id": "3f4d90098f5b"}, + { + "status": "Digest: sha256:3fbc632167424a6d997e74f52b878d7cc478225cffac6bc977eedfe51c7f4e79" + }, + { + "status": "Digest: sha256:3fbc632167424a6d997e74f52b878d7cc478225cffac6bc977eedfe51c7f4e79" + }, + {"status": "Status: Downloaded newer image for busybox:latest"}, +] + + +@pytest.mark.parametrize("pull_progress", PULL_PROGRESS_SEQUENCE) +def test_docker_progress_dict(pull_progress: dict[str, Any]): + assert parse_obj_as(_DockerProgressDict, pull_progress) + + +def test__parse_docker_pull_progress(): + some_data = {} + for entry in PULL_PROGRESS_SEQUENCE: + _parse_docker_pull_progress(parse_obj_as(_DockerProgressDict, entry), some_data) diff --git a/simcore_service_dynamic_sidecar.svg b/simcore_service_dynamic_sidecar.svg new file mode 100644 index 00000000000..129c613398d --- /dev/null +++ b/simcore_service_dynamic_sidecar.svg @@ -0,0 +1,1378 @@ + + + + + + +G + + + +simcore_service_dynamic_sidecar__meta + +simcore_service_dynamic_sidecar. +_meta + + + +simcore_service_dynamic_sidecar_api__routing + +simcore_service_dynamic_sidecar. +api. +_routing + + + +simcore_service_dynamic_sidecar__meta->simcore_service_dynamic_sidecar_api__routing + + + + + +simcore_service_dynamic_sidecar_cli + +simcore_service_dynamic_sidecar. +cli + + + +simcore_service_dynamic_sidecar__meta->simcore_service_dynamic_sidecar_cli + + + + +simcore_service_dynamic_sidecar_core_application + +simcore_service_dynamic_sidecar. +core. +application + + + +simcore_service_dynamic_sidecar__meta->simcore_service_dynamic_sidecar_core_application + + + + + + +simcore_service_dynamic_sidecar_api + +simcore_service_dynamic_sidecar. +api + + + +simcore_service_dynamic_sidecar_api->simcore_service_dynamic_sidecar_core_application + + + + +simcore_service_dynamic_sidecar_api__dependencies + +simcore_service_dynamic_sidecar. +api. +_dependencies + + + +simcore_service_dynamic_sidecar_api_containers + +simcore_service_dynamic_sidecar. +api. +containers + + + +simcore_service_dynamic_sidecar_api__dependencies->simcore_service_dynamic_sidecar_api_containers + + + + + +simcore_service_dynamic_sidecar_api_containers_extension + +simcore_service_dynamic_sidecar. +api. +containers_extension + + + +simcore_service_dynamic_sidecar_api__dependencies->simcore_service_dynamic_sidecar_api_containers_extension + + + + + +simcore_service_dynamic_sidecar_api_containers_long_running_tasks + +simcore_service_dynamic_sidecar. +api. +containers_long_running_tasks + + + +simcore_service_dynamic_sidecar_api__dependencies->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_api_health + +simcore_service_dynamic_sidecar. +api. +health + + + +simcore_service_dynamic_sidecar_api__dependencies->simcore_service_dynamic_sidecar_api_health + + + + + +simcore_service_dynamic_sidecar_api_volumes + +simcore_service_dynamic_sidecar. +api. +volumes + + + +simcore_service_dynamic_sidecar_api__dependencies->simcore_service_dynamic_sidecar_api_volumes + + + + + +simcore_service_dynamic_sidecar_api__routing->simcore_service_dynamic_sidecar_api + + + + + +simcore_service_dynamic_sidecar_api_containers->simcore_service_dynamic_sidecar_api__routing + + + + + +simcore_service_dynamic_sidecar_api_containers_extension->simcore_service_dynamic_sidecar_api__routing + + + + + +simcore_service_dynamic_sidecar_api_containers_long_running_tasks->simcore_service_dynamic_sidecar_api__routing + + + + + +simcore_service_dynamic_sidecar_api_health->simcore_service_dynamic_sidecar_api__routing + + + + + +simcore_service_dynamic_sidecar_api_volumes->simcore_service_dynamic_sidecar_api__routing + + + + + +simcore_service_dynamic_sidecar_core + +simcore_service_dynamic_sidecar. +core + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_api__dependencies + + + + + + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_api_containers + + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_api_containers_extension + + + + + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_cli + + + + + + +simcore_service_dynamic_sidecar_main + +simcore_service_dynamic_sidecar. +main + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_main + + + + + +simcore_service_dynamic_sidecar_models_shared_store + +simcore_service_dynamic_sidecar. +models. +shared_store + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_models_shared_store + + + + + + + +simcore_service_dynamic_sidecar_modules_attribute_monitor + +simcore_service_dynamic_sidecar. +modules. +attribute_monitor + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_modules_attribute_monitor + + + + + +simcore_service_dynamic_sidecar_modules_long_running_tasks + +simcore_service_dynamic_sidecar. +modules. +long_running_tasks + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs + +simcore_service_dynamic_sidecar. +modules. +mounted_fs + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_modules_mounted_fs + + + + + +simcore_service_dynamic_sidecar_modules_nodeports + +simcore_service_dynamic_sidecar. +modules. +nodeports + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_modules_nodeports + + + + + + + +simcore_service_dynamic_sidecar_modules_outputs__manager + +simcore_service_dynamic_sidecar. +modules. +outputs. +_manager + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_modules_outputs__manager + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_core + +simcore_service_dynamic_sidecar. +modules. +resource_tracking. +core + + + +simcore_service_dynamic_sidecar_core->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + +simcore_service_dynamic_sidecar_core_application->simcore_service_dynamic_sidecar_cli + + + + +simcore_service_dynamic_sidecar_core_application->simcore_service_dynamic_sidecar_main + + + + + +simcore_service_dynamic_sidecar_core_docker_compose_utils + +simcore_service_dynamic_sidecar. +core. +docker_compose_utils + + + +simcore_service_dynamic_sidecar_core_docker_compose_utils->simcore_service_dynamic_sidecar_core_application + + + + +simcore_service_dynamic_sidecar_core_validation + +simcore_service_dynamic_sidecar. +core. +validation + + + +simcore_service_dynamic_sidecar_core_docker_compose_utils->simcore_service_dynamic_sidecar_core_validation + + + + + +simcore_service_dynamic_sidecar_core_docker_compose_utils->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_core_docker_logs + +simcore_service_dynamic_sidecar. +core. +docker_logs + + + +simcore_service_dynamic_sidecar_core_docker_logs->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_core_docker_logs->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_core_docker_utils + +simcore_service_dynamic_sidecar. +core. +docker_utils + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_api_containers + + + + + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_api_containers_extension + + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_core_docker_compose_utils + + + + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_core_docker_logs + + + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_modules_mounted_fs + + + + + +simcore_service_dynamic_sidecar_core_docker_utils->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + +simcore_service_dynamic_sidecar_core_error_handlers + +simcore_service_dynamic_sidecar. +core. +error_handlers + + + +simcore_service_dynamic_sidecar_core_error_handlers->simcore_service_dynamic_sidecar_core_application + + + + +simcore_service_dynamic_sidecar_core_errors + +simcore_service_dynamic_sidecar. +core. +errors + + + +simcore_service_dynamic_sidecar_core_errors->simcore_service_dynamic_sidecar_core_application + + + + + + + + +simcore_service_dynamic_sidecar_core_errors->simcore_service_dynamic_sidecar_core_docker_utils + + + + + +simcore_service_dynamic_sidecar_core_errors->simcore_service_dynamic_sidecar_core_error_handlers + + + + + +simcore_service_dynamic_sidecar_core_rabbitmq + +simcore_service_dynamic_sidecar. +core. +rabbitmq + + + +simcore_service_dynamic_sidecar_core_rabbitmq->simcore_service_dynamic_sidecar_core_application + + + + + + +simcore_service_dynamic_sidecar_core_rabbitmq->simcore_service_dynamic_sidecar_core_docker_compose_utils + + + + + +simcore_service_dynamic_sidecar_core_rabbitmq->simcore_service_dynamic_sidecar_core_docker_logs + + + + + +simcore_service_dynamic_sidecar_core_rabbitmq->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_core_rabbitmq->simcore_service_dynamic_sidecar_modules_outputs__manager + + + + + +simcore_service_dynamic_sidecar_core_rabbitmq->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + + + +simcore_service_dynamic_sidecar_core_remote_debug + +simcore_service_dynamic_sidecar. +core. +remote_debug + + + +simcore_service_dynamic_sidecar_core_remote_debug->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_core_settings + +simcore_service_dynamic_sidecar. +core. +settings + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_api__dependencies + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_cli + + + + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_core_application + + + + + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_core_docker_compose_utils + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_core_rabbitmq + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_core_remote_debug + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_core_validation + + + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_models_shared_store + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_modules_attribute_monitor + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_modules_mounted_fs + + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_modules_nodeports + + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_modules_outputs__manager + + + + +simcore_service_dynamic_sidecar_core_settings->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + +simcore_service_dynamic_sidecar_core_utils + +simcore_service_dynamic_sidecar. +core. +utils + + + +simcore_service_dynamic_sidecar_core_utils->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_core_utils->simcore_service_dynamic_sidecar_core_docker_compose_utils + + + + + +simcore_service_dynamic_sidecar_core_utils->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_core_validation->simcore_service_dynamic_sidecar_api_containers + + + + +simcore_service_dynamic_sidecar_core_validation->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_models + +simcore_service_dynamic_sidecar. +models + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_api__dependencies + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_api_containers + + + + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_api_health + + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_api_volumes + + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_core_application + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_models->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + +simcore_service_dynamic_sidecar_models_schemas + +simcore_service_dynamic_sidecar. +models. +schemas + + + +simcore_service_dynamic_sidecar_models_schemas->simcore_service_dynamic_sidecar_api__dependencies + + + + +simcore_service_dynamic_sidecar_models_schemas->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + +simcore_service_dynamic_sidecar_models_schemas->simcore_service_dynamic_sidecar_api_health + + + + + + +simcore_service_dynamic_sidecar_models_schemas->simcore_service_dynamic_sidecar_core_application + + + + + + +simcore_service_dynamic_sidecar_models_schemas->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_models_schemas_application_health + +simcore_service_dynamic_sidecar. +models. +schemas. +application_health + + + +simcore_service_dynamic_sidecar_models_schemas_application_health->simcore_service_dynamic_sidecar_api__dependencies + + + + + +simcore_service_dynamic_sidecar_models_schemas_application_health->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_models_schemas_application_health->simcore_service_dynamic_sidecar_api_health + + + + +simcore_service_dynamic_sidecar_models_schemas_application_health->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_models_schemas_application_health->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_models_schemas_containers + +simcore_service_dynamic_sidecar. +models. +schemas. +containers + + + +simcore_service_dynamic_sidecar_models_schemas_containers->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_models_schemas_containers->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_api__dependencies + + + + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_api_containers + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_api_volumes + + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_core_application + + + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_models_shared_store->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + +simcore_service_dynamic_sidecar_modules + +simcore_service_dynamic_sidecar. +modules + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_api__dependencies + + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_api_containers_extension + + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_cli + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_core_settings + + + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_core_utils + + + + + + +simcore_service_dynamic_sidecar_modules->simcore_service_dynamic_sidecar_core_validation + + + + +simcore_service_dynamic_sidecar_modules_attribute_monitor->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_modules_attribute_monitor__logging_event_handler + +simcore_service_dynamic_sidecar. +modules. +attribute_monitor. +_logging_event_handler + + + +simcore_service_dynamic_sidecar_modules_attribute_monitor__logging_event_handler->simcore_service_dynamic_sidecar_modules_attribute_monitor + + + + + +simcore_service_dynamic_sidecar_modules_attribute_monitor__watchdog_extensions + +simcore_service_dynamic_sidecar. +modules. +attribute_monitor. +_watchdog_extensions + + + +simcore_service_dynamic_sidecar_modules_attribute_monitor__watchdog_extensions->simcore_service_dynamic_sidecar_modules_attribute_monitor__logging_event_handler + + + + + +simcore_service_dynamic_sidecar_modules_inputs + +simcore_service_dynamic_sidecar. +modules. +inputs + + + +simcore_service_dynamic_sidecar_modules_inputs->simcore_service_dynamic_sidecar_api__dependencies + + + + + +simcore_service_dynamic_sidecar_modules_inputs->simcore_service_dynamic_sidecar_api_containers_extension + + + + + + +simcore_service_dynamic_sidecar_modules_inputs->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + +simcore_service_dynamic_sidecar_modules_inputs->simcore_service_dynamic_sidecar_core_application + + + + + + +simcore_service_dynamic_sidecar_modules_long_running_tasks->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_modules_long_running_tasks->simcore_service_dynamic_sidecar_cli + + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_api__dependencies + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_api_containers_extension + + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_cli + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_core_application + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_core_utils + + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_core_validation + + + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_modules_outputs__context + +simcore_service_dynamic_sidecar. +modules. +outputs. +_context + + + +simcore_service_dynamic_sidecar_modules_mounted_fs->simcore_service_dynamic_sidecar_modules_outputs__context + + + + + +simcore_service_dynamic_sidecar_modules_nodeports->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_modules_nodeports->simcore_service_dynamic_sidecar_modules_outputs__manager + + + + + + +simcore_service_dynamic_sidecar_modules_outputs + +simcore_service_dynamic_sidecar. +modules. +outputs + + + +simcore_service_dynamic_sidecar_modules_outputs->simcore_service_dynamic_sidecar_api__dependencies + + + + + + +simcore_service_dynamic_sidecar_modules_outputs->simcore_service_dynamic_sidecar_api_containers_extension + + + + + +simcore_service_dynamic_sidecar_modules_outputs->simcore_service_dynamic_sidecar_api_containers_long_running_tasks + + + + +simcore_service_dynamic_sidecar_modules_outputs->simcore_service_dynamic_sidecar_cli + + + + +simcore_service_dynamic_sidecar_modules_outputs->simcore_service_dynamic_sidecar_core_application + + + + + +simcore_service_dynamic_sidecar_modules_outputs->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_modules_outputs__context->simcore_service_dynamic_sidecar_modules_outputs + + + + + + + +simcore_service_dynamic_sidecar_modules_outputs__event_handler + +simcore_service_dynamic_sidecar. +modules. +outputs. +_event_handler + + + +simcore_service_dynamic_sidecar_modules_outputs__context->simcore_service_dynamic_sidecar_modules_outputs__event_handler + + + + + +simcore_service_dynamic_sidecar_modules_outputs__context->simcore_service_dynamic_sidecar_modules_outputs__manager + + + + + +simcore_service_dynamic_sidecar_modules_outputs__watcher + +simcore_service_dynamic_sidecar. +modules. +outputs. +_watcher + + + +simcore_service_dynamic_sidecar_modules_outputs__context->simcore_service_dynamic_sidecar_modules_outputs__watcher + + + + + +simcore_service_dynamic_sidecar_modules_outputs__directory_utils + +simcore_service_dynamic_sidecar. +modules. +outputs. +_directory_utils + + + +simcore_service_dynamic_sidecar_modules_outputs__event_filter + +simcore_service_dynamic_sidecar. +modules. +outputs. +_event_filter + + + +simcore_service_dynamic_sidecar_modules_outputs__directory_utils->simcore_service_dynamic_sidecar_modules_outputs__event_filter + + + + + +simcore_service_dynamic_sidecar_modules_outputs__event_filter->simcore_service_dynamic_sidecar_modules_outputs__watcher + + + + + +simcore_service_dynamic_sidecar_modules_outputs__event_handler->simcore_service_dynamic_sidecar_modules_outputs__watcher + + + + + +simcore_service_dynamic_sidecar_modules_outputs__manager->simcore_service_dynamic_sidecar_modules_outputs + + + + + +simcore_service_dynamic_sidecar_modules_outputs__manager->simcore_service_dynamic_sidecar_modules_outputs__event_filter + + + + + +simcore_service_dynamic_sidecar_modules_outputs__manager->simcore_service_dynamic_sidecar_modules_outputs__event_handler + + + + + +simcore_service_dynamic_sidecar_modules_outputs__manager->simcore_service_dynamic_sidecar_modules_outputs__watcher + + + + + +simcore_service_dynamic_sidecar_modules_outputs__watchdog_extensions + +simcore_service_dynamic_sidecar. +modules. +outputs. +_watchdog_extensions + + + +simcore_service_dynamic_sidecar_modules_outputs__watchdog_extensions->simcore_service_dynamic_sidecar_modules_outputs__event_handler + + + + + +simcore_service_dynamic_sidecar_modules_outputs__watcher->simcore_service_dynamic_sidecar_modules_outputs + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking + +simcore_service_dynamic_sidecar. +modules. +resource_tracking + + + +simcore_service_dynamic_sidecar_modules_resource_tracking->simcore_service_dynamic_sidecar_core_application + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking->simcore_service_dynamic_sidecar_core_settings + + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking__models + +simcore_service_dynamic_sidecar. +modules. +resource_tracking. +_models + + + +simcore_service_dynamic_sidecar_modules_resource_tracking__models->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_setup + +simcore_service_dynamic_sidecar. +modules. +resource_tracking. +setup + + + +simcore_service_dynamic_sidecar_modules_resource_tracking__models->simcore_service_dynamic_sidecar_modules_resource_tracking_setup + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_core->simcore_service_dynamic_sidecar_modules_long_running_tasks + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_core->simcore_service_dynamic_sidecar_modules_resource_tracking_setup + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_settings + +simcore_service_dynamic_sidecar. +modules. +resource_tracking. +settings + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_settings->simcore_service_dynamic_sidecar_core_settings + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_settings->simcore_service_dynamic_sidecar_modules_resource_tracking_core + + + + + + + +simcore_service_dynamic_sidecar_modules_resource_tracking_setup->simcore_service_dynamic_sidecar_core_application + + + +