diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/services.py b/packages/models-library/src/models_library/api_schemas_directorv2/services.py index c797c687fd1..3d2fb51f302 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/services.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/services.py @@ -103,3 +103,7 @@ class ServiceExtras(BaseModel): CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME: Final[NonNegativeInt] = 89 + + +DYNAMIC_SIDECAR_SERVICE_PREFIX: Final[str] = "dy-sidecar" +DYNAMIC_PROXY_SERVICE_PREFIX: Final[str] = "dy-proxy" diff --git a/packages/models-library/src/models_library/docker.py b/packages/models-library/src/models_library/docker.py index 6e87f06b62e..db5f51ef359 100644 --- a/packages/models-library/src/models_library/docker.py +++ b/packages/models-library/src/models_library/docker.py @@ -37,7 +37,15 @@ def from_key(cls, key: str) -> "DockerLabelKey": str, StringConstraints(pattern=DOCKER_GENERIC_TAG_KEY_RE) ] -DockerPlacementConstraint: TypeAlias = Annotated[str, StringConstraints(strip_whitespace = True, pattern = re.compile(r"^(?!-)(?![.])(?!.*--)(?!.*[.][.])[a-zA-Z0-9.-]*(? "StandardSimcoreDockerLabels": ] }, ) + + +DockerNodeID: TypeAlias = Annotated[ + str, StringConstraints(strip_whitespace=True, pattern=re.compile(r"[a-zA-Z0-9]")) +] diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/containers.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/containers.py new file mode 100644 index 00000000000..2049f0a409f --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/containers.py @@ -0,0 +1,37 @@ +import logging +from datetime import timedelta +from typing import Final + +from models_library.docker import DockerNodeID +from models_library.projects_nodes_io import NodeID +from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace +from pydantic import NonNegativeInt, TypeAdapter +from servicelib.logging_utils import log_decorator +from servicelib.rabbitmq import RabbitMQRPCClient + +_logger = logging.getLogger(__name__) + +_REQUEST_TIMEOUT: Final[NonNegativeInt] = int(timedelta(minutes=60).total_seconds()) + + +@log_decorator(_logger, level=logging.DEBUG) +async def force_container_cleanup( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + docker_node_id: DockerNodeID, + swarm_stack_name: str, + node_id: NodeID, +) -> None: + result = await rabbitmq_rpc_client.request( + RPCNamespace.from_entries( + { + "service": "agent", + "docker_node_id": docker_node_id, + "swarm_stack_name": swarm_stack_name, + } + ), + TypeAdapter(RPCMethodName).validate_python("force_container_cleanup"), + node_id=node_id, + timeout_s=_REQUEST_TIMEOUT, + ) + assert result is None # nosec diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py index 043898dcb30..41cf2ffd8b8 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/agent/volumes.py @@ -2,6 +2,7 @@ from datetime import timedelta from typing import Final +from models_library.docker import DockerNodeID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace from pydantic import NonNegativeInt, TypeAdapter @@ -17,7 +18,7 @@ async def remove_volumes_without_backup_for_service( rabbitmq_rpc_client: RabbitMQRPCClient, *, - docker_node_id: str, + docker_node_id: DockerNodeID, swarm_stack_name: str, node_id: NodeID, ) -> None: @@ -42,7 +43,7 @@ async def remove_volumes_without_backup_for_service( async def backup_and_remove_volumes_for_all_services( rabbitmq_rpc_client: RabbitMQRPCClient, *, - docker_node_id: str, + docker_node_id: DockerNodeID, swarm_stack_name: str, ) -> None: result = await rabbitmq_rpc_client.request( diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index 62cc42d91ba..f58500d771e 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -1,7 +1,10 @@ import logging from typing import Final -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, @@ -11,6 +14,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import NonNegativeInt, TypeAdapter from servicelib.logging_utils import log_decorator @@ -95,6 +99,41 @@ async def stop_dynamic_service( assert result is None # nosec +@log_decorator(_logger, level=logging.DEBUG) +async def restart_user_services( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + node_id: NodeID, + timeout_s: NonNegativeInt, +) -> None: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SCHEDULER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("restart_user_services"), + node_id=node_id, + timeout_s=timeout_s, + ) + assert result is None # nosec + + +@log_decorator(_logger, level=logging.DEBUG) +async def retrieve_inputs( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout_s: NonNegativeInt, +) -> RetrieveDataOutEnveloped: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SCHEDULER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"), + node_id=node_id, + port_keys=port_keys, + timeout_s=timeout_s, + ) + assert isinstance(result, RetrieveDataOutEnveloped) # nosec + return result + + @log_decorator(_logger, level=logging.DEBUG) async def update_projects_networks( rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID diff --git a/services/agent/src/simcore_service_agent/api/rpc/_containers.py b/services/agent/src/simcore_service_agent/api/rpc/_containers.py new file mode 100644 index 00000000000..e7d651d6ede --- /dev/null +++ b/services/agent/src/simcore_service_agent/api/rpc/_containers.py @@ -0,0 +1,20 @@ +import logging + +from fastapi import FastAPI +from models_library.projects_nodes_io import NodeID +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RPCRouter + +from ...services.containers_manager import ContainersManager + +_logger = logging.getLogger(__name__) + +router = RPCRouter() + + +@router.expose() +async def force_container_cleanup(app: FastAPI, *, node_id: NodeID) -> None: + with log_context( + _logger, logging.INFO, f"removing all orphan container for {node_id=}" + ): + await ContainersManager.get_from_app_state(app).force_container_cleanup(node_id) diff --git a/services/agent/src/simcore_service_agent/api/rpc/_volumes.py b/services/agent/src/simcore_service_agent/api/rpc/_volumes.py index 96edb817e62..9d2433a19af 100644 --- a/services/agent/src/simcore_service_agent/api/rpc/_volumes.py +++ b/services/agent/src/simcore_service_agent/api/rpc/_volumes.py @@ -7,7 +7,8 @@ from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( NoServiceVolumesFoundRPCError, ) -from simcore_service_agent.services.volumes_manager import VolumesManager + +from ...services.volumes_manager import VolumesManager _logger = logging.getLogger(__name__) diff --git a/services/agent/src/simcore_service_agent/api/rpc/routes.py b/services/agent/src/simcore_service_agent/api/rpc/routes.py index 7a658ae5280..e8b0cea8f4c 100644 --- a/services/agent/src/simcore_service_agent/api/rpc/routes.py +++ b/services/agent/src/simcore_service_agent/api/rpc/routes.py @@ -4,9 +4,10 @@ from simcore_service_agent.core.settings import ApplicationSettings from ...services.rabbitmq import get_rabbitmq_rpc_server -from . import _volumes +from . import _containers, _volumes ROUTERS: list[RPCRouter] = [ + _containers.router, _volumes.router, ] diff --git a/services/agent/src/simcore_service_agent/core/application.py b/services/agent/src/simcore_service_agent/core/application.py index fe226a33558..b0cfa8720e4 100644 --- a/services/agent/src/simcore_service_agent/core/application.py +++ b/services/agent/src/simcore_service_agent/core/application.py @@ -18,6 +18,7 @@ ) from ..api.rest.routes import setup_rest_api from ..api.rpc.routes import setup_rpc_api_routes +from ..services.containers_manager import setup_containers_manager from ..services.instrumentation import setup_instrumentation from ..services.rabbitmq import setup_rabbitmq from ..services.volumes_manager import setup_volume_manager @@ -28,8 +29,8 @@ def _setup_logger(settings: ApplicationSettings): # SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148 - logging.basicConfig(level=settings.LOGLEVEL.value) # NOSONAR - logging.root.setLevel(settings.LOGLEVEL.value) + logging.basicConfig(level=settings.LOG_LEVEL.value) # NOSONAR + logging.root.setLevel(settings.LOG_LEVEL.value) config_all_loggers( log_format_local_dev_enabled=settings.AGENT_VOLUMES_LOG_FORMAT_LOCAL_DEV_ENABLED, logger_filter_mapping=settings.AGENT_VOLUMES_LOG_FILTER_MAPPING, @@ -58,6 +59,7 @@ def create_app() -> FastAPI: setup_rabbitmq(app) setup_volume_manager(app) + setup_containers_manager(app) setup_rest_api(app) setup_rpc_api_routes(app) diff --git a/services/agent/src/simcore_service_agent/core/settings.py b/services/agent/src/simcore_service_agent/core/settings.py index f37d7c8d263..742d3bf02d1 100644 --- a/services/agent/src/simcore_service_agent/core/settings.py +++ b/services/agent/src/simcore_service_agent/core/settings.py @@ -1,6 +1,7 @@ from datetime import timedelta from models_library.basic_types import BootModeEnum, LogLevel +from models_library.docker import DockerNodeID from pydantic import AliasChoices, AnyHttpUrl, Field, field_validator from servicelib.logging_utils_filtering import LoggerName, MessageSubstring from settings_library.base import BaseCustomSettings @@ -11,7 +12,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): - LOGLEVEL: LogLevel = Field( + LOG_LEVEL: LogLevel = Field( LogLevel.WARNING, validation_alias=AliasChoices( "AGENT_LOGLEVEL", @@ -79,7 +80,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): AGENT_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True - AGENT_DOCKER_NODE_ID: str = Field(..., description="used by the rabbitmq module") + AGENT_DOCKER_NODE_ID: DockerNodeID = Field( + ..., description="used by the rabbitmq module" + ) AGENT_RABBITMQ: RabbitSettings = Field( description="settings for service/rabbitmq", @@ -91,7 +94,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): json_schema_extra={"auto_default_from_env": True}, ) - @field_validator("LOGLEVEL") + @field_validator("LOG_LEVEL") @classmethod def valid_log_level(cls, value) -> LogLevel: return LogLevel(cls.validate_log_level(value)) diff --git a/services/agent/src/simcore_service_agent/services/containers_manager.py b/services/agent/src/simcore_service_agent/services/containers_manager.py new file mode 100644 index 00000000000..ca2317e156e --- /dev/null +++ b/services/agent/src/simcore_service_agent/services/containers_manager.py @@ -0,0 +1,71 @@ +import logging +from dataclasses import dataclass, field + +from aiodocker import Docker +from fastapi import FastAPI +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) +from models_library.projects_nodes_io import NodeID +from servicelib.fastapi.app_state import SingletonInAppStateMixin + +from .docker_utils import get_containers_with_prefixes, remove_container_forcefully + +_logger = logging.getLogger(__name__) + + +@dataclass +class ContainersManager(SingletonInAppStateMixin): + app_state_name: str = "containers_manager" + + docker: Docker = field(default_factory=Docker) + + async def force_container_cleanup(self, node_id: NodeID) -> None: + # compose all possible used container prefixes + proxy_prefix = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}" + dy_sidecar_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}" + user_service_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}" + + orphan_containers = await get_containers_with_prefixes( + self.docker, {proxy_prefix, dy_sidecar_prefix, user_service_prefix} + ) + _logger.debug( + "Detected orphan containers for node_id='%s': %s", + node_id, + orphan_containers, + ) + + unexpected_orphans = { + orphan + for orphan in orphan_containers + if orphan.startswith(user_service_prefix) + } + if unexpected_orphans: + _logger.warning( + "Unexpected orphans detected for node_id='%s': %s", + node_id, + unexpected_orphans, + ) + + # avoids parallel requests to docker engine + for container in orphan_containers: + await remove_container_forcefully(self.docker, container) + + async def shutdown(self) -> None: + await self.docker.close() + + +def get_containers_manager(app: FastAPI) -> ContainersManager: + return ContainersManager.get_from_app_state(app) + + +def setup_containers_manager(app: FastAPI) -> None: + async def _on_startup() -> None: + ContainersManager().set_to_app_state(app) + + async def _on_shutdown() -> None: + await ContainersManager.get_from_app_state(app).shutdown() + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/agent/src/simcore_service_agent/services/docker_utils.py b/services/agent/src/simcore_service_agent/services/docker_utils.py index 83656783b55..1390a5b12df 100644 --- a/services/agent/src/simcore_service_agent/services/docker_utils.py +++ b/services/agent/src/simcore_service_agent/services/docker_utils.py @@ -106,3 +106,27 @@ async def remove_volume( get_instrumentation(app).agent_metrics.remove_volumes( settings.AGENT_DOCKER_NODE_ID ) + + +async def get_containers_with_prefixes(docker: Docker, prefixes: set[str]) -> set[str]: + """Returns a set of container names matching any of the given prefixes""" + all_containers = await docker.containers.list(all=True) + + result: set[str] = set() + for container in all_containers: + container_info = await container.show() + container_name = container_info.get("Name", "").lstrip("/") + if any(container_name.startswith(prefix) for prefix in prefixes): + result.add(container_name) + + return result + + +async def remove_container_forcefully(docker: Docker, container_id: str) -> None: + """Removes a container regardless of it's state""" + try: + container = await docker.containers.get(container_id) + await container.delete(force=True) + except DockerError as e: + if e.status != status.HTTP_404_NOT_FOUND: + raise diff --git a/services/agent/src/simcore_service_agent/services/instrumentation/_models.py b/services/agent/src/simcore_service_agent/services/instrumentation/_models.py index bf554374595..2c49859e897 100644 --- a/services/agent/src/simcore_service_agent/services/instrumentation/_models.py +++ b/services/agent/src/simcore_service_agent/services/instrumentation/_models.py @@ -1,6 +1,7 @@ from dataclasses import dataclass, field from typing import Final +from models_library.docker import DockerNodeID from prometheus_client import CollectorRegistry, Counter from servicelib.instrumentation import MetricsBase, get_metrics_namespace @@ -34,10 +35,10 @@ def __post_init__(self) -> None: registry=self.registry, ) - def remove_volumes(self, docker_node_id: str) -> None: + def remove_volumes(self, docker_node_id: DockerNodeID) -> None: self.volumes_removed.labels(docker_node_id=docker_node_id).inc() - def backedup_volumes(self, docker_node_id: str) -> None: + def backedup_volumes(self, docker_node_id: DockerNodeID) -> None: self.volumes_backedup.labels(docker_node_id=docker_node_id).inc() diff --git a/services/agent/tests/conftest.py b/services/agent/tests/conftest.py index 14e8cd1d9e3..97df58d4e5a 100644 --- a/services/agent/tests/conftest.py +++ b/services/agent/tests/conftest.py @@ -5,6 +5,7 @@ import pytest from faker import Faker from models_library.basic_types import BootModeEnum +from models_library.docker import DockerNodeID from moto.server import ThreadedMotoServer from pydantic import HttpUrl, TypeAdapter from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict @@ -25,8 +26,8 @@ def swarm_stack_name() -> str: @pytest.fixture -def docker_node_id() -> str: - return "test-node-id" +def docker_node_id() -> DockerNodeID: + return TypeAdapter(DockerNodeID).validate_python("testnodeid") @pytest.fixture @@ -40,7 +41,7 @@ def mock_environment( mocked_s3_server_url: HttpUrl, bucket: str, swarm_stack_name: str, - docker_node_id: str, + docker_node_id: DockerNodeID, ) -> EnvVarsDict: return setenvs_from_dict( monkeypatch, diff --git a/services/agent/tests/unit/test_api_rpc__containers.py b/services/agent/tests/unit/test_api_rpc__containers.py new file mode 100644 index 00000000000..201acf5d218 --- /dev/null +++ b/services/agent/tests/unit/test_api_rpc__containers.py @@ -0,0 +1,55 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from collections.abc import Awaitable, Callable +from unittest.mock import AsyncMock + +import pytest +import pytest_mock +from faker import Faker +from fastapi import FastAPI +from models_library.docker import DockerNodeID +from models_library.projects_nodes_io import NodeID +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.agent import containers + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +async def rpc_client( + initialized_app: FastAPI, + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + +@pytest.fixture +def mocked_force_container_cleanup(mocker: pytest_mock.MockerFixture) -> AsyncMock: + return mocker.patch( + "simcore_service_agent.services.containers_manager.ContainersManager.force_container_cleanup" + ) + + +async def test_force_container_cleanup( + rpc_client: RabbitMQRPCClient, + swarm_stack_name: str, + docker_node_id: DockerNodeID, + node_id: NodeID, + mocked_force_container_cleanup: AsyncMock, +): + assert mocked_force_container_cleanup.call_count == 0 + await containers.force_container_cleanup( + rpc_client, + docker_node_id=docker_node_id, + swarm_stack_name=swarm_stack_name, + node_id=node_id, + ) + assert mocked_force_container_cleanup.call_count == 1 diff --git a/services/agent/tests/unit/test_api_rpc__volumes.py b/services/agent/tests/unit/test_api_rpc__volumes.py index df7121d1418..6e7eeb76485 100644 --- a/services/agent/tests/unit/test_api_rpc__volumes.py +++ b/services/agent/tests/unit/test_api_rpc__volumes.py @@ -8,6 +8,7 @@ import pytest import pytest_mock from fastapi import FastAPI +from models_library.docker import DockerNodeID from servicelib.rabbitmq import RabbitMQRPCClient from servicelib.rabbitmq.rpc_interfaces.agent import volumes @@ -41,7 +42,7 @@ def mocked_remove_all_volumes(mocker: pytest_mock.MockerFixture) -> AsyncMock: async def test_backup_and_remove_volumes_for_all_services( rpc_client: RabbitMQRPCClient, swarm_stack_name: str, - docker_node_id: str, + docker_node_id: DockerNodeID, mocked_remove_all_volumes: AsyncMock, ): assert mocked_remove_all_volumes.call_count == 0 diff --git a/services/agent/tests/unit/test_services_containers_manager.py b/services/agent/tests/unit/test_services_containers_manager.py new file mode 100644 index 00000000000..4489d975ab3 --- /dev/null +++ b/services/agent/tests/unit/test_services_containers_manager.py @@ -0,0 +1,107 @@ +# pylint: disable=redefined-outer-name + + +import logging +from collections.abc import AsyncIterable, Awaitable, Callable +from enum import Enum + +import pytest +from aiodocker import Docker, DockerError +from asgi_lifespan import LifespanManager +from faker import Faker +from fastapi import FastAPI, status +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) +from models_library.projects_nodes_io import NodeID +from simcore_service_agent.services.containers_manager import ( + get_containers_manager, + setup_containers_manager, +) + + +@pytest.fixture +async def app() -> AsyncIterable[FastAPI]: + app = FastAPI() + setup_containers_manager(app) + + async with LifespanManager(app): + yield app + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +async def docker() -> AsyncIterable[Docker]: + async with Docker() as docker: + yield docker + + +class _ContainerMode(Enum): + CREATED = "CREATED" + RUNNING = "RUNNING" + STOPPED = "STOPPED" + + +@pytest.fixture +async def create_container( + docker: Docker, +) -> AsyncIterable[Callable[[str, _ContainerMode], Awaitable[str]]]: + created_containers: set[str] = set() + + async def _(name: str, container_mode: _ContainerMode) -> str: + container = await docker.containers.create( + config={ + "Image": "alpine", + "Cmd": ["sh", "-c", "while true; do sleep 1; done"], + }, + name=name, + ) + + if container_mode in (_ContainerMode.RUNNING, _ContainerMode.STOPPED): + await container.start() + if container_mode == _ContainerMode.STOPPED: + await container.stop() + + created_containers.add(container.id) + return container.id + + yield _ + + # cleanup containers + for container_id in created_containers: + try: + container = await docker.containers.get(container_id) + await container.delete(force=True) + except DockerError as e: + if e.status != status.HTTP_404_NOT_FOUND: + raise + + +async def test_force_container_cleanup( + app: FastAPI, + node_id: NodeID, + create_container: Callable[[str, _ContainerMode], Awaitable[str]], + faker: Faker, + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(logging.DEBUG) + caplog.clear() + + proxy_name = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}{faker.pystr()}" + dynamic_sidecar_name = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}{faker.pystr()}" + user_service_name = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}{faker.pystr()}" + + await create_container(proxy_name, _ContainerMode.CREATED) + await create_container(dynamic_sidecar_name, _ContainerMode.RUNNING) + await create_container(user_service_name, _ContainerMode.STOPPED) + + await get_containers_manager(app).force_container_cleanup(node_id) + + assert proxy_name in caplog.text + assert dynamic_sidecar_name in caplog.text + assert user_service_name in caplog.text diff --git a/services/director-v2/openapi.json b/services/director-v2/openapi.json index 63418baabe5..51e5f191b27 100644 --- a/services/director-v2/openapi.json +++ b/services/director-v2/openapi.json @@ -2018,7 +2018,8 @@ "docker_node_id": { "anyOf": [ { - "type": "string" + "type": "string", + "pattern": "[a-zA-Z0-9]" }, { "type": "null" diff --git a/services/director-v2/src/simcore_service_director_v2/constants.py b/services/director-v2/src/simcore_service_director_v2/constants.py index b84865745df..194425d0328 100644 --- a/services/director-v2/src/simcore_service_director_v2/constants.py +++ b/services/director-v2/src/simcore_service_director_v2/constants.py @@ -1,8 +1,11 @@ from typing import Final -# dynamic services -DYNAMIC_SIDECAR_SERVICE_PREFIX: Final[str] = "dy-sidecar" -DYNAMIC_PROXY_SERVICE_PREFIX: Final[str] = "dy-proxy" +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) + +# dynamic services # label storing scheduler_data to allow service # monitoring recovery after director-v2 reboots diff --git a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py index 7e9b3ebeac6..560517b538c 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py @@ -14,8 +14,13 @@ from models_library.api_schemas_directorv2.dynamic_services_service import ( CommonServiceDetails, ) +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_PROXY_SERVICE_PREFIX, + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) from models_library.basic_types import PortInt from models_library.callbacks_mapping import CallbacksMapping +from models_library.docker import DockerNodeID from models_library.generated_models.docker_rest_api import ContainerState, Status2 from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo, PricingInfo @@ -39,9 +44,7 @@ from servicelib.exception_utils import DelayedExceptionHandler from ..constants import ( - DYNAMIC_PROXY_SERVICE_PREFIX, DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL, - DYNAMIC_SIDECAR_SERVICE_PREFIX, REGEX_DY_SERVICE_PROXY, REGEX_DY_SERVICE_SIDECAR, ) @@ -297,7 +300,7 @@ def compose_spec_submitted(self) -> bool: default=None, description="used for starting the proxy" ) - docker_node_id: str | None = Field( + docker_node_id: DockerNodeID | None = Field( default=None, description=( "contains node id of the docker node where all services " diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py index 32bb114f095..37dc914451c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/_namespace.py @@ -1,7 +1,8 @@ +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) from models_library.projects_nodes_io import NodeID -from ...constants import DYNAMIC_SIDECAR_SERVICE_PREFIX - def get_compose_namespace(node_uuid: NodeID) -> str: # To avoid collisions for started docker resources a unique identifier is computed: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py index 1e05524b48d..350c406c1eb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py @@ -9,7 +9,10 @@ from common_library.json_serialization import json_dumps from fastapi.encoders import jsonable_encoder from models_library.aiodocker_api import AioDockerServiceSpec -from models_library.docker import to_simcore_runtime_docker_label_key +from models_library.api_schemas_directorv2.services import ( + DYNAMIC_SIDECAR_SERVICE_PREFIX, +) +from models_library.docker import DockerNodeID, to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_networks import DockerNetworkName from models_library.projects_nodes_io import NodeID @@ -22,10 +25,7 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_exponential, wait_random_exponential -from ....constants import ( - DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL, - DYNAMIC_SIDECAR_SERVICE_PREFIX, -) +from ....constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL from ....core.dynamic_services_settings.scheduler import ( DynamicServicesSchedulerSettings, ) @@ -170,7 +170,7 @@ async def _get_service_latest_task(service_id: str) -> Mapping[str, Any]: async def get_dynamic_sidecar_placement( service_id: str, dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings, -) -> str: +) -> DockerNodeID: """ Waits until the service has a task in `running` state and returns it's `docker_node_id`. @@ -205,7 +205,7 @@ async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, A task = await _get_task_data_when_service_running(service_id=service_id) - docker_node_id: None | str = task.get("NodeID", None) + docker_node_id: DockerNodeID | None = task.get("NodeID", None) if not docker_node_id: msg = f"Could not find an assigned NodeID for service_id={service_id}. Last task inspect result: {task}" raise DynamicSidecarError(msg=msg) @@ -494,7 +494,9 @@ async def update_scheduler_data_label(scheduler_data: SchedulerData) -> None: ) -async def constrain_service_to_node(service_name: str, docker_node_id: str) -> None: +async def constrain_service_to_node( + service_name: str, docker_node_id: DockerNodeID +) -> None: await _update_service_spec( service_name, update_in_service_spec={ diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index e3b6d024bf8..4a127e59e51 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -28,6 +28,7 @@ from servicelib.logging_utils import log_context from servicelib.rabbitmq import RabbitMQClient from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.agent.containers import force_container_cleanup from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( NoServiceVolumesFoundRPCError, ) @@ -210,6 +211,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( set_were_state_and_outputs_saved: bool | None = None, ) -> None: scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid) + rabbit_rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client if set_were_state_and_outputs_saved is not None: scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True @@ -221,7 +223,14 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( node_uuid=scheduler_data.node_uuid, swarm_stack_name=swarm_stack_name, ) - # remove network + if scheduler_data.dynamic_sidecar.docker_node_id: + await force_container_cleanup( + rabbit_rpc_client, + docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id, + swarm_stack_name=swarm_stack_name, + node_id=scheduler_data.node_uuid, + ) + task_progress.update(message="removing network", percent=ProgressPercent(0.2)) await remove_dynamic_sidecar_network(scheduler_data.dynamic_sidecar_network_name) @@ -237,7 +246,6 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( message="removing volumes", percent=ProgressPercent(0.3) ) with log_context(_logger, logging.DEBUG, f"removing volumes '{node_uuid}'"): - rabbit_rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client try: await remove_volumes_without_backup_for_service( rabbit_rpc_client, diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py index 5410c37f203..fd328bd66aa 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler_task.py @@ -14,6 +14,8 @@ import respx from faker import Faker from fastapi import FastAPI +from models_library.docker import DockerNodeID +from pydantic import TypeAdapter from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict @@ -72,7 +74,9 @@ def mock_env( @pytest.fixture def scheduler_data(scheduler_data_from_http_request: SchedulerData) -> SchedulerData: - scheduler_data_from_http_request.docker_node_id = "test_docker_node_id" + scheduler_data_from_http_request.dynamic_sidecar.docker_node_id = TypeAdapter( + DockerNodeID + ).validate_python("testdockernodeid") return scheduler_data_from_http_request @@ -211,8 +215,10 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: @pytest.fixture -def mock_remove_calls(mocker: MockerFixture) -> None: +def mock_rpc_calls(mocker: MockerFixture, minimal_app: FastAPI) -> None: + minimal_app.state.rabbitmq_rpc_client = AsyncMock() mocker.patch.object(_events_utils, "remove_volumes_without_backup_for_service") + mocker.patch.object(_events_utils, "force_container_cleanup") @pytest.fixture(params=[True, False]) @@ -241,8 +247,9 @@ async def test_skip_observation_cycle_after_error( mocked_dynamic_scheduler_events: ACounter, error_raised_by_saving_state: bool, use_case: UseCase, - mock_remove_calls: None, + mock_rpc_calls: None, ): + # add a task, emulate an error make sure no observation cycle is # being triggered again assert mocked_dynamic_scheduler_events.count == 0 diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py index f7423b3944c..278b386eb86 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py @@ -14,11 +14,12 @@ import pytest from aiodocker.utils import clean_filters from faker import Faker -from models_library.docker import to_simcore_runtime_docker_label_key +from models_library.docker import DockerNodeID, to_simcore_runtime_docker_label_key from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_enums import ServiceState from models_library.users import UserID +from pydantic import TypeAdapter from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict from simcore_service_director_v2.constants import ( DYNAMIC_PROXY_SERVICE_PREFIX, @@ -763,16 +764,16 @@ async def test_regression_update_service_update_out_of_sequence( @pytest.fixture -async def target_node_id(async_docker_client: aiodocker.Docker) -> str: +async def target_node_id(async_docker_client: aiodocker.Docker) -> DockerNodeID: # get a node's ID docker_nodes = await async_docker_client.nodes.list() - return docker_nodes[0]["ID"] + return TypeAdapter(DockerNodeID).validate_python(docker_nodes[0]["ID"]) async def test_constrain_service_to_node( async_docker_client: aiodocker.Docker, mock_service: str, - target_node_id: str, + target_node_id: DockerNodeID, docker_swarm: None, ): await docker_api.constrain_service_to_node( diff --git a/services/director/src/simcore_service_director/producer.py b/services/director/src/simcore_service_director/producer.py index 907e7a8e04e..81d1accf23d 100644 --- a/services/director/src/simcore_service_director/producer.py +++ b/services/director/src/simcore_service_director/producer.py @@ -1078,7 +1078,7 @@ async def _get_node_details( async def get_services_details( - app: FastAPI, user_id: str | None, study_id: str | None + app: FastAPI, user_id: str | None, project_id: str | None ) -> list[dict]: app_settings = get_application_settings(app) async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager @@ -1091,9 +1091,10 @@ async def get_services_details( filters.append( f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id ) - if study_id: + if project_id: filters.append( - f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id + f"{_to_simcore_runtime_docker_label_key('project_id')}=" + + project_id ) list_running_services = await client.services.list( filters={"label": filters} @@ -1104,7 +1105,7 @@ async def get_services_details( for service in list_running_services ] except aiodocker.DockerError as err: - msg = f"Error while accessing container for {user_id=}, {study_id=}" + msg = f"Error while accessing container for {user_id=}, {project_id=}" raise GenericDockerError(err=msg) from err diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 265d29e56ed..766c117244d 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -568,6 +568,7 @@ services: DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING} DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING} DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET} + DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT} TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT} TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT} static-webserver: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 9bba60a19f5..a0bbed6b110 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -1,5 +1,8 @@ from fastapi import FastAPI -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -7,6 +10,7 @@ from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from servicelib.rabbitmq import RPCRouter from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import ( @@ -58,6 +62,20 @@ async def stop_dynamic_service( ) +@router.expose() +async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None: + await scheduler_interface.restart_user_services(app, node_id=node_id) + + +@router.expose() +async def retrieve_inputs( + app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + return await scheduler_interface.retrieve_inputs( + app, node_id=node_id, port_keys=port_keys + ) + + @router.expose() async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None: await scheduler_interface.update_projects_networks(app, project_id=project_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py index 36be9f4b587..e44ec885ed3 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py @@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field( default=datetime.timedelta(minutes=60), + validation_alias=AliasChoices( + "DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT", + "DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT", + ), description=( "Time to wait before timing out when stopping a dynamic service. " "Since services require data to be stopped, this operation is timed out after 1 hour" ), ) + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( + default=datetime.timedelta(minutes=60), + description=( + "When dynamic services upload and download data from storage, " + "sometimes very big payloads are involved. In order to handle " + "such payloads it is required to have long timeouts which " + "allow the service to finish the operation." + ), + ) + DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field( default=False, description=( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index 52134837775..2841eb5f467 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -2,13 +2,17 @@ from typing import Any from fastapi import FastAPI, status -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, ) from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import TypeAdapter from servicelib.fastapi.app_state import SingletonInAppStateMixin @@ -75,7 +79,7 @@ async def stop_dynamic_service( node_id: NodeID, simcore_user_agent: str, save_state: bool, - timeout: datetime.timedelta, # noqa: ASYNC109 + timeout: datetime.timedelta # noqa: ASYNC109 ) -> None: try: await self.thin_client.delete_dynamic_service( @@ -100,6 +104,19 @@ async def stop_dynamic_service( raise + async def retrieve_inputs( + self, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout: datetime.timedelta # noqa: ASYNC109 + ) -> RetrieveDataOutEnveloped: + response = await self.thin_client.dynamic_service_retrieve( + node_id=node_id, port_keys=port_keys, timeout=timeout + ) + dict_response: dict[str, Any] = response.json() + return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response) + async def list_tracked_dynamic_services( self, *, user_id: UserID | None = None, project_id: ProjectID | None = None ) -> list[DynamicServiceGet]: @@ -108,6 +125,9 @@ async def list_tracked_dynamic_services( ) return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json()) + async def restart_user_services(self, *, node_id: NodeID) -> None: + await self.thin_client.post_restart(node_id=node_id) + async def update_projects_networks(self, *, project_id: ProjectID) -> None: await self.thin_client.patch_projects_networks(project_id=project_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py index 39fce3741f3..1b86d604148 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py @@ -11,6 +11,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_resources import ServiceResourcesDictHelpers +from models_library.services_types import ServicePortKey from models_library.users import UserID from servicelib.common_headers import ( X_DYNAMIC_SIDECAR_REQUEST_DNS, @@ -91,7 +92,7 @@ async def delete_dynamic_service( node_id: NodeID, simcore_user_agent: str, save_state: bool, - timeout: datetime.timedelta, + timeout: datetime.timedelta, # noqa: ASYNC109 ) -> Response: @retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds()) @expect_status(status.HTTP_204_NO_CONTENT) @@ -112,6 +113,22 @@ async def _( return await _(self) + @retry_on_errors() + @expect_status(status.HTTP_200_OK) + async def dynamic_service_retrieve( + self, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout: datetime.timedelta, # noqa: ASYNC109 + ) -> Response: + post_data = {"port_keys": port_keys} + return await self.client.post( + f"/dynamic_services/{node_id}:retrieve", + content=json_dumps(post_data), + timeout=timeout.total_seconds(), + ) + @retry_on_errors() @expect_status(status.HTTP_200_OK) async def get_dynamic_services( @@ -125,6 +142,11 @@ async def get_dynamic_services( params=as_dict_exclude_unset(user_id=user_id, project_id=project_id), ) + @retry_on_errors() + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_restart(self, *, node_id: NodeID) -> Response: + return await self.client.post(f"/dynamic_services/{node_id}:restart") + @retry_on_errors() @expect_status(status.HTTP_204_NO_CONTENT) async def patch_projects_networks(self, *, project_id: ProjectID) -> Response: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py index 382030a0489..27e854ae6db 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -1,5 +1,8 @@ from fastapi import FastAPI -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -7,6 +10,7 @@ from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from ..core.settings import ApplicationSettings @@ -75,6 +79,30 @@ async def stop_dynamic_service( await set_request_as_stopped(app, dynamic_service_stop) +async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + await director_v2_client.restart_user_services(node_id=node_id) + + +async def retrieve_inputs( + app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + return await director_v2_client.retrieve_inputs( + node_id=node_id, + port_keys=port_keys, + timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT, + ) + + async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None: settings: ApplicationSettings = app.state.settings if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index cb4dd981d19..6455ca367f5 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -9,7 +9,10 @@ from faker import Faker from fastapi import FastAPI, status from fastapi.encoders import jsonable_encoder -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -55,14 +58,14 @@ def node_not_found(faker: Faker) -> NodeID: @pytest.fixture def service_status_new_style() -> DynamicServiceGet: return TypeAdapter(DynamicServiceGet).validate_python( - DynamicServiceGet.model_config["json_schema_extra"]["examples"][1] + DynamicServiceGet.model_json_schema()["examples"][1] ) @pytest.fixture def service_status_legacy() -> NodeGet: return TypeAdapter(NodeGet).validate_python( - NodeGet.model_config["json_schema_extra"]["examples"][1] + NodeGet.model_json_schema()["examples"][1] ) @@ -112,9 +115,7 @@ def mock_director_v2_service_state( mock.get("/dynamic_services").respond( status.HTTP_200_OK, text=json.dumps( - jsonable_encoder( - DynamicServiceGet.model_config["json_schema_extra"]["examples"] - ) + jsonable_encoder(DynamicServiceGet.model_json_schema()["examples"]) ), ) @@ -193,7 +194,7 @@ async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient): assert len(results) == 2 assert results == [ TypeAdapter(DynamicServiceGet).validate_python(x) - for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"] + for x in DynamicServiceGet.model_json_schema()["examples"] ] @@ -223,7 +224,7 @@ async def test_get_state( def dynamic_service_start() -> DynamicServiceStart: # one for legacy and one for new style? return TypeAdapter(DynamicServiceStart).validate_python( - DynamicServiceStart.model_config["json_schema_extra"]["example"] + DynamicServiceStart.model_json_schema()["example"] ) @@ -492,6 +493,60 @@ async def test_stop_dynamic_service_serializes_generic_errors( ) +@pytest.fixture +def mock_director_v2_restart_user_services(node_id: NodeID) -> Iterator[None]: + with respx.mock( + base_url="http://director-v2:8000/v2", + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + mock.post(f"/dynamic_services/{node_id}:restart").respond( + status.HTTP_204_NO_CONTENT + ) + yield None + + +async def test_restart_user_services( + mock_director_v2_restart_user_services: None, + rpc_client: RabbitMQRPCClient, + node_id: NodeID, +): + await services.restart_user_services(rpc_client, node_id=node_id, timeout_s=5) + + +@pytest.fixture +def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]: + with respx.mock( + base_url="http://director-v2:8000/v2", + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + mock.post(f"/dynamic_services/{node_id}:retrieve").respond( + status.HTTP_200_OK, + text=TypeAdapter(RetrieveDataOutEnveloped) + .validate_python( + RetrieveDataOutEnveloped.model_json_schema()["examples"][0] + ) + .model_dump_json(), + ) + + yield None + + +async def test_retrieve_inputs( + mock_director_v2_service_retrieve_inputs: None, + rpc_client: RabbitMQRPCClient, + node_id: NodeID, +): + results = await services.retrieve_inputs( + rpc_client, node_id=node_id, port_keys=[], timeout_s=10 + ) + assert ( + results.model_dump(mode="python") + == RetrieveDataOutEnveloped.model_json_schema()["examples"][0] + ) + + @pytest.fixture def mock_director_v2_update_projects_networks(project_id: ProjectID) -> Iterator[None]: with respx.mock( diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py index 3c0cac4120e..5f9f3f3aad8 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py @@ -8,81 +8,16 @@ from aiohttp import web from models_library.projects import ProjectID -from models_library.services import ServicePortKey from pydantic import NonNegativeInt from servicelib.logging_utils import log_decorator from yarl import URL from ._core_base import DataType, request_director_v2 -from .exceptions import DirectorServiceError from .settings import DirectorV2Settings, get_plugin_settings _log = logging.getLogger(__name__) -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -@log_decorator(logger=_log) -async def retrieve( - app: web.Application, service_uuid: str, port_keys: list[ServicePortKey] -) -> DataType: - """Pulls data from connections to the dynamic service inputs""" - settings: DirectorV2Settings = get_plugin_settings(app) - result = await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve", - data={"port_keys": port_keys}, - timeout=settings.get_service_retrieve_timeout(), - ) - assert isinstance(result, dict) # nosec - return result - - -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -# notice that this function is identical to retrieve except that it does NOT raises -@log_decorator(logger=_log) -async def request_retrieve_dyn_service( - app: web.Application, service_uuid: str, port_keys: list[str] -) -> None: - settings: DirectorV2Settings = get_plugin_settings(app) - body = {"port_keys": port_keys} - - try: - await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve", - data=body, - timeout=settings.get_service_retrieve_timeout(), - ) - except DirectorServiceError as exc: - _log.warning( - "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s:%s]", - service_uuid, - port_keys, - exc.status, - exc.reason, - ) - - -@log_decorator(logger=_log) -async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None: - """User restart the dynamic dynamic service started in the node_uuid - - NOTE that this operation will NOT restart all sidecar services - (``simcore-service-dynamic-sidecar`` or ``reverse-proxy caddy`` services) but - ONLY those containers in the compose-spec (i.e. the ones exposed to the user) - """ - settings: DirectorV2Settings = get_plugin_settings(app) - await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{node_uuid}:restart", - expected_status=web.HTTPOk, - timeout=settings.DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT, - ) - - @log_decorator(logger=_log) async def get_project_inactivity( app: web.Application, diff --git a/services/web/server/src/simcore_service_webserver/director_v2/api.py b/services/web/server/src/simcore_service_webserver/director_v2/api.py index a4d9c351082..9c070e7cae9 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/api.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/api.py @@ -16,12 +16,7 @@ is_pipeline_running, stop_pipeline, ) -from ._core_dynamic_services import ( - get_project_inactivity, - request_retrieve_dyn_service, - restart_dynamic_service, - retrieve, -) +from ._core_dynamic_services import get_project_inactivity from ._core_utils import is_healthy from .exceptions import DirectorServiceError @@ -37,9 +32,6 @@ "get_project_run_policy", "is_healthy", "is_pipeline_running", - "request_retrieve_dyn_service", - "restart_dynamic_service", - "retrieve", "set_project_run_policy", "stop_pipeline", ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/settings.py b/services/web/server/src/simcore_service_webserver/director_v2/settings.py index 21cb368ff50..31fc096a5dd 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/settings.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/settings.py @@ -33,14 +33,6 @@ def base_url(self) -> URL: # - Mostly in floats (aiohttp.Client/) but sometimes in ints # - Typically in seconds but occasionally in ms - DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT: PositiveInt = Field( - 1 * _MINUTE, - description="timeout of containers restart", - validation_alias=AliasChoices( - "DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT", - ), - ) - DIRECTOR_V2_STORAGE_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: PositiveInt = Field( _HOUR, description=( diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index f34b90e1c2b..bdfd22c3c1d 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -3,7 +3,10 @@ from functools import partial from aiohttp import web -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -18,6 +21,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_messages import ProgressRabbitMessageProject, ProgressType +from models_library.services import ServicePortKey from models_library.users import UserID from pydantic.types import PositiveInt from servicelib.progress_bar import ProgressBarData @@ -150,6 +154,37 @@ async def stop_dynamic_services_in_project( await logged_gather(*services_to_stop) +async def restart_user_services(app: web.Application, *, node_id: NodeID) -> None: + """Restarts the user service(s) started by the the node_uuid's sidecar + + NOTE: this operation will NOT restart + sidecar services (``dy-sidecar`` or ``dy-proxy`` services), + but ONLY user services (the ones defined by the compose spec). + """ + settings: DynamicSchedulerSettings = get_plugin_settings(app) + await services.restart_user_services( + get_rabbitmq_rpc_client(app), + node_id=node_id, + timeout_s=int( + settings.DYNAMIC_SCHEDULER_RESTART_USER_SERVICES_TIMEOUT.total_seconds() + ), + ) + + +async def retrieve_inputs( + app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + settings: DynamicSchedulerSettings = get_plugin_settings(app) + return await services.retrieve_inputs( + get_rabbitmq_rpc_client(app), + node_id=node_id, + port_keys=port_keys, + timeout_s=int( + settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT.total_seconds() + ), + ) + + async def update_projects_networks( app: web.Application, *, project_id: ProjectID ) -> None: diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py index 91dac1317b6..5f33995a89e 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py @@ -26,6 +26,20 @@ class DynamicSchedulerSettings(BaseCustomSettings, MixinServiceSettings): ), ) + DYNAMIC_SCHEDULER_RESTART_USER_SERVICES_TIMEOUT: datetime.timedelta = Field( + datetime.timedelta(minutes=1), description="timeout for user services restart" + ) + + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( + datetime.timedelta(hours=1), + description=( + "When dynamic services upload and download data from storage, " + "sometimes very big payloads are involved. In order to handle " + "such payloads it is required to have long timeouts which " + "allow the service to finish the operation." + ), + ) + def get_plugin_settings(app: web.Application) -> DynamicSchedulerSettings: settings = app[APP_SETTINGS_KEY].WEBSERVER_DYNAMIC_SCHEDULER diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index 7e4b35bab5d..9ddd88c0df1 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -62,7 +62,6 @@ from .._meta import API_VTAG as VTAG from ..catalog import client as catalog_client -from ..director_v2 import api as director_v2_api from ..dynamic_scheduler import api as dynamic_scheduler_api from ..groups.api import get_group_from_gid, list_all_user_groups_ids from ..groups.exceptions import GroupNotFoundError @@ -279,8 +278,8 @@ async def retrieve_node(request: web.Request) -> web.Response: retrieve = await parse_request_body_as(NodeRetrieve, request) return web.json_response( - await director_v2_api.retrieve( - request.app, f"{path_params.node_id}", retrieve.port_keys + await dynamic_scheduler_api.retrieve_inputs( + request.app, path_params.node_id, retrieve.port_keys ), dumps=json_dumps, ) @@ -411,7 +410,9 @@ async def restart_node(request: web.Request) -> web.Response: path_params = parse_request_path_parameters_as(NodePathParams, request) - await director_v2_api.restart_dynamic_service(request.app, f"{path_params.node_id}") + await dynamic_scheduler_api.restart_user_services( + request.app, node_id=path_params.node_id + ) return web.json_response(status=status.HTTP_204_NO_CONTENT) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 80195446249..d4bec765a9e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -1050,7 +1050,7 @@ async def patch_project_node( # 5. Updates project states for user, if inputs have been changed if "inputs" in _node_patch_exclude_unset: updated_project = await add_project_states_for_user( - user_id=user_id, project=updated_project, is_template=False, app=app + user_id=user_id, project=updated_project, is_template=False, app=app ) # 6. Notify project node update @@ -1132,6 +1132,20 @@ async def is_node_id_present_in_any_project_workbench( return await db.node_id_exists(node_id) +async def _safe_retrieve( + app: web.Application, node_id: NodeID, port_keys: list[str] +) -> None: + try: + await dynamic_scheduler_api.retrieve_inputs(app, node_id, port_keys) + except RPCServerError as exc: + log.warning( + "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s]", + node_id, + port_keys, + exc, + ) + + async def _trigger_connected_service_retrieve( app: web.Application, project: dict, updated_node_uuid: str, changed_keys: list[str] ) -> None: @@ -1172,7 +1186,7 @@ async def _trigger_connected_service_retrieve( # call /retrieve on the nodes update_tasks = [ - director_v2_api.request_retrieve_dyn_service(app, node, keys) + _safe_retrieve(app, NodeID(node), keys) for node, keys in nodes_keys_to_update.items() ] await logged_gather(*update_tasks) diff --git a/services/web/server/tests/unit/isolated/test_tracing.py b/services/web/server/tests/unit/isolated/test_tracing.py index c236e446ab9..c356a31053c 100644 --- a/services/web/server/tests/unit/isolated/test_tracing.py +++ b/services/web/server/tests/unit/isolated/test_tracing.py @@ -18,14 +18,13 @@ def mock_webserver_service_environment( monkeypatch: pytest.MonkeyPatch, mock_webserver_service_environment: EnvVarsDict ) -> EnvVarsDict: monkeypatch.delenv("WEBSERVER_TRACING") - envs = mock_webserver_service_environment | setenvs_from_dict( + return mock_webserver_service_environment | setenvs_from_dict( monkeypatch, { "TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT": "http://opentelemetry-collector", "TRACING_OPENTELEMETRY_COLLECTOR_PORT": "4318", }, ) - return envs def test_middleware_restrictions_opentelemetry_is_second_middleware(