Skip to content

Commit

Permalink
Merge branch 'master' into is1779/search-users
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov authored Dec 17, 2024
2 parents 0c7e4b2 + 38109f8 commit f779d49
Show file tree
Hide file tree
Showing 41 changed files with 699 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,7 @@ class ServiceExtras(BaseModel):


CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME: Final[NonNegativeInt] = 89


DYNAMIC_SIDECAR_SERVICE_PREFIX: Final[str] = "dy-sidecar"
DYNAMIC_PROXY_SERVICE_PREFIX: Final[str] = "dy-proxy"
15 changes: 14 additions & 1 deletion packages/models-library/src/models_library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ def from_key(cls, key: str) -> "DockerLabelKey":
str, StringConstraints(pattern=DOCKER_GENERIC_TAG_KEY_RE)
]

DockerPlacementConstraint: TypeAlias = Annotated[str, StringConstraints(strip_whitespace = True, pattern = re.compile(r"^(?!-)(?![.])(?!.*--)(?!.*[.][.])[a-zA-Z0-9.-]*(?<!-)(?<![.])(!=|==)[a-zA-Z0-9_. -]*$"))]
DockerPlacementConstraint: TypeAlias = Annotated[
str,
StringConstraints(
strip_whitespace=True,
pattern=re.compile(
r"^(?!-)(?![.])(?!.*--)(?!.*[.][.])[a-zA-Z0-9.-]*(?<!-)(?<![.])(!=|==)[a-zA-Z0-9_. -]*$"
),
),
]

_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX: Final[str] = "io.simcore.runtime."
_BACKWARDS_COMPATIBILITY_SIMCORE_RUNTIME_DOCKER_LABELS_MAP: Final[dict[str, str]] = {
Expand Down Expand Up @@ -218,3 +226,8 @@ def from_docker_task(cls, docker_task: Task) -> "StandardSimcoreDockerLabels":
]
},
)


DockerNodeID: TypeAlias = Annotated[
str, StringConstraints(strip_whitespace=True, pattern=re.compile(r"[a-zA-Z0-9]"))
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from datetime import timedelta
from typing import Final

from models_library.docker import DockerNodeID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQRPCClient

_logger = logging.getLogger(__name__)

_REQUEST_TIMEOUT: Final[NonNegativeInt] = int(timedelta(minutes=60).total_seconds())


@log_decorator(_logger, level=logging.DEBUG)
async def force_container_cleanup(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
docker_node_id: DockerNodeID,
swarm_stack_name: str,
node_id: NodeID,
) -> None:
result = await rabbitmq_rpc_client.request(
RPCNamespace.from_entries(
{
"service": "agent",
"docker_node_id": docker_node_id,
"swarm_stack_name": swarm_stack_name,
}
),
TypeAdapter(RPCMethodName).validate_python("force_container_cleanup"),
node_id=node_id,
timeout_s=_REQUEST_TIMEOUT,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta
from typing import Final

from models_library.docker import DockerNodeID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
from pydantic import NonNegativeInt, TypeAdapter
Expand All @@ -17,7 +18,7 @@
async def remove_volumes_without_backup_for_service(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
docker_node_id: str,
docker_node_id: DockerNodeID,
swarm_stack_name: str,
node_id: NodeID,
) -> None:
Expand All @@ -42,7 +43,7 @@ async def remove_volumes_without_backup_for_service(
async def backup_and_remove_volumes_for_all_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
docker_node_id: str,
docker_node_id: DockerNodeID,
swarm_stack_name: str,
) -> None:
result = await rabbitmq_rpc_client.request(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Final

from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
Expand All @@ -11,6 +14,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
Expand Down Expand Up @@ -95,6 +99,41 @@ async def stop_dynamic_service(
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def restart_user_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
timeout_s: NonNegativeInt,
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("restart_user_services"),
node_id=node_id,
timeout_s=timeout_s,
)
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def retrieve_inputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout_s: NonNegativeInt,
) -> RetrieveDataOutEnveloped:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"),
node_id=node_id,
port_keys=port_keys,
timeout_s=timeout_s,
)
assert isinstance(result, RetrieveDataOutEnveloped) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def update_projects_networks(
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID
Expand Down
20 changes: 20 additions & 0 deletions services/agent/src/simcore_service_agent/api/rpc/_containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging

from fastapi import FastAPI
from models_library.projects_nodes_io import NodeID
from servicelib.logging_utils import log_context
from servicelib.rabbitmq import RPCRouter

from ...services.containers_manager import ContainersManager

_logger = logging.getLogger(__name__)

router = RPCRouter()


@router.expose()
async def force_container_cleanup(app: FastAPI, *, node_id: NodeID) -> None:
with log_context(
_logger, logging.INFO, f"removing all orphan container for {node_id=}"
):
await ContainersManager.get_from_app_state(app).force_container_cleanup(node_id)
3 changes: 2 additions & 1 deletion services/agent/src/simcore_service_agent/api/rpc/_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from servicelib.rabbitmq.rpc_interfaces.agent.errors import (
NoServiceVolumesFoundRPCError,
)
from simcore_service_agent.services.volumes_manager import VolumesManager

from ...services.volumes_manager import VolumesManager

_logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion services/agent/src/simcore_service_agent/api/rpc/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from simcore_service_agent.core.settings import ApplicationSettings

from ...services.rabbitmq import get_rabbitmq_rpc_server
from . import _volumes
from . import _containers, _volumes

ROUTERS: list[RPCRouter] = [
_containers.router,
_volumes.router,
]

Expand Down
6 changes: 4 additions & 2 deletions services/agent/src/simcore_service_agent/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from ..api.rest.routes import setup_rest_api
from ..api.rpc.routes import setup_rpc_api_routes
from ..services.containers_manager import setup_containers_manager
from ..services.instrumentation import setup_instrumentation
from ..services.rabbitmq import setup_rabbitmq
from ..services.volumes_manager import setup_volume_manager
Expand All @@ -28,8 +29,8 @@

def _setup_logger(settings: ApplicationSettings):
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148
logging.basicConfig(level=settings.LOGLEVEL.value) # NOSONAR
logging.root.setLevel(settings.LOGLEVEL.value)
logging.basicConfig(level=settings.LOG_LEVEL.value) # NOSONAR
logging.root.setLevel(settings.LOG_LEVEL.value)
config_all_loggers(
log_format_local_dev_enabled=settings.AGENT_VOLUMES_LOG_FORMAT_LOCAL_DEV_ENABLED,
logger_filter_mapping=settings.AGENT_VOLUMES_LOG_FILTER_MAPPING,
Expand Down Expand Up @@ -58,6 +59,7 @@ def create_app() -> FastAPI:

setup_rabbitmq(app)
setup_volume_manager(app)
setup_containers_manager(app)
setup_rest_api(app)
setup_rpc_api_routes(app)

Expand Down
9 changes: 6 additions & 3 deletions services/agent/src/simcore_service_agent/core/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta

from models_library.basic_types import BootModeEnum, LogLevel
from models_library.docker import DockerNodeID
from pydantic import AliasChoices, AnyHttpUrl, Field, field_validator
from servicelib.logging_utils_filtering import LoggerName, MessageSubstring
from settings_library.base import BaseCustomSettings
Expand All @@ -11,7 +12,7 @@


class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
LOGLEVEL: LogLevel = Field(
LOG_LEVEL: LogLevel = Field(
LogLevel.WARNING,
validation_alias=AliasChoices(
"AGENT_LOGLEVEL",
Expand Down Expand Up @@ -79,7 +80,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):

AGENT_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True

AGENT_DOCKER_NODE_ID: str = Field(..., description="used by the rabbitmq module")
AGENT_DOCKER_NODE_ID: DockerNodeID = Field(
..., description="used by the rabbitmq module"
)

AGENT_RABBITMQ: RabbitSettings = Field(
description="settings for service/rabbitmq",
Expand All @@ -91,7 +94,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
json_schema_extra={"auto_default_from_env": True},
)

@field_validator("LOGLEVEL")
@field_validator("LOG_LEVEL")
@classmethod
def valid_log_level(cls, value) -> LogLevel:
return LogLevel(cls.validate_log_level(value))
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import logging
from dataclasses import dataclass, field

from aiodocker import Docker
from fastapi import FastAPI
from models_library.api_schemas_directorv2.services import (
DYNAMIC_PROXY_SERVICE_PREFIX,
DYNAMIC_SIDECAR_SERVICE_PREFIX,
)
from models_library.projects_nodes_io import NodeID
from servicelib.fastapi.app_state import SingletonInAppStateMixin

from .docker_utils import get_containers_with_prefixes, remove_container_forcefully

_logger = logging.getLogger(__name__)


@dataclass
class ContainersManager(SingletonInAppStateMixin):
app_state_name: str = "containers_manager"

docker: Docker = field(default_factory=Docker)

async def force_container_cleanup(self, node_id: NodeID) -> None:
# compose all possible used container prefixes
proxy_prefix = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}"
dy_sidecar_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}"
user_service_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}"

orphan_containers = await get_containers_with_prefixes(
self.docker, {proxy_prefix, dy_sidecar_prefix, user_service_prefix}
)
_logger.debug(
"Detected orphan containers for node_id='%s': %s",
node_id,
orphan_containers,
)

unexpected_orphans = {
orphan
for orphan in orphan_containers
if orphan.startswith(user_service_prefix)
}
if unexpected_orphans:
_logger.warning(
"Unexpected orphans detected for node_id='%s': %s",
node_id,
unexpected_orphans,
)

# avoids parallel requests to docker engine
for container in orphan_containers:
await remove_container_forcefully(self.docker, container)

async def shutdown(self) -> None:
await self.docker.close()


def get_containers_manager(app: FastAPI) -> ContainersManager:
return ContainersManager.get_from_app_state(app)


def setup_containers_manager(app: FastAPI) -> None:
async def _on_startup() -> None:
ContainersManager().set_to_app_state(app)

async def _on_shutdown() -> None:
await ContainersManager.get_from_app_state(app).shutdown()

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
24 changes: 24 additions & 0 deletions services/agent/src/simcore_service_agent/services/docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,27 @@ async def remove_volume(
get_instrumentation(app).agent_metrics.remove_volumes(
settings.AGENT_DOCKER_NODE_ID
)


async def get_containers_with_prefixes(docker: Docker, prefixes: set[str]) -> set[str]:
"""Returns a set of container names matching any of the given prefixes"""
all_containers = await docker.containers.list(all=True)

result: set[str] = set()
for container in all_containers:
container_info = await container.show()
container_name = container_info.get("Name", "").lstrip("/")
if any(container_name.startswith(prefix) for prefix in prefixes):
result.add(container_name)

return result


async def remove_container_forcefully(docker: Docker, container_id: str) -> None:
"""Removes a container regardless of it's state"""
try:
container = await docker.containers.get(container_id)
await container.delete(force=True)
except DockerError as e:
if e.status != status.HTTP_404_NOT_FOUND:
raise
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass, field
from typing import Final

from models_library.docker import DockerNodeID
from prometheus_client import CollectorRegistry, Counter
from servicelib.instrumentation import MetricsBase, get_metrics_namespace

Expand Down Expand Up @@ -34,10 +35,10 @@ def __post_init__(self) -> None:
registry=self.registry,
)

def remove_volumes(self, docker_node_id: str) -> None:
def remove_volumes(self, docker_node_id: DockerNodeID) -> None:
self.volumes_removed.labels(docker_node_id=docker_node_id).inc()

def backedup_volumes(self, docker_node_id: str) -> None:
def backedup_volumes(self, docker_node_id: DockerNodeID) -> None:
self.volumes_backedup.labels(docker_node_id=docker_node_id).inc()


Expand Down
Loading

0 comments on commit f779d49

Please sign in to comment.