Skip to content

Commit

Permalink
🐛🎨♻️Director-v0: improve registry caching (ITISFoundation#6799)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Nov 22, 2024
1 parent ba881e9 commit 3655208
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 340 deletions.
39 changes: 32 additions & 7 deletions packages/settings-library/src/settings_library/docker_registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from functools import cached_property
from typing import Any
from typing import Any, Self

from pydantic import Field, SecretStr, field_validator
from pydantic import (
AnyHttpUrl,
Field,
SecretStr,
TypeAdapter,
field_validator,
model_validator,
)
from pydantic_settings import SettingsConfigDict

from .base import BaseCustomSettings
Expand All @@ -12,31 +19,49 @@ class RegistrySettings(BaseCustomSettings):
REGISTRY_PATH: str | None = Field(
default=None,
# This is useful in case of a local registry, where the registry url (path) is relative to the host docker engine"
description="development mode only, in case a local registry is used",
description="development mode only, in case a local registry is used - "
"this is the hostname to the docker registry as seen from the host running the containers (e.g. 127.0.0.1:5000)",
)
# NOTE: name is missleading, http or https protocol are not included
REGISTRY_URL: str = Field(default="", description="address to the docker registry")
REGISTRY_URL: str = Field(
...,
description="hostname of docker registry (without protocol but with port if available)",
min_length=1,
)

REGISTRY_USER: str = Field(
..., description="username to access the docker registry"
)
REGISTRY_PW: SecretStr = Field(
..., description="password to access the docker registry"
)
REGISTRY_SSL: bool = Field(..., description="access to registry through ssl")
REGISTRY_SSL: bool = Field(
..., description="True if docker registry is using HTTPS protocol"
)

@field_validator("REGISTRY_PATH", mode="before")
@classmethod
def _escape_none_string(cls, v) -> Any | None:
return None if v == "None" else v

@model_validator(mode="after")
def check_registry_authentication(self: Self) -> Self:
if self.REGISTRY_AUTH and any(
not v for v in (self.REGISTRY_USER, self.REGISTRY_PW)
):
msg = "If REGISTRY_AUTH is True, both REGISTRY_USER and REGISTRY_PW must be provided"
raise ValueError(msg)
return self

@cached_property
def resolved_registry_url(self) -> str:
return self.REGISTRY_PATH or self.REGISTRY_URL

@cached_property
def api_url(self) -> str:
return f"{self.REGISTRY_URL}/v2"
def api_url(self) -> AnyHttpUrl:
return TypeAdapter(AnyHttpUrl).validate_python(
f"http{'s' if self.REGISTRY_SSL else ''}://{self.REGISTRY_URL}/v2"
)

model_config = SettingsConfigDict(
json_schema_extra={
Expand Down
1 change: 1 addition & 0 deletions packages/settings-library/tests/test_docker_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"REGISTRY_USER": "usr",
"REGISTRY_PW": "pwd",
"REGISTRY_SSL": "False",
"REGISTRY_URL": "pytest.registry.com",
}


Expand Down
1 change: 1 addition & 0 deletions services/director-v2/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def mock_env(
"REGISTRY_PW": "test",
"REGISTRY_SSL": "false",
"REGISTRY_USER": "test",
"REGISTRY_URL": faker.url(),
"SC_BOOT_MODE": "production",
"SIMCORE_SERVICES_NETWORK_NAME": "test_network_name",
"SWARM_STACK_NAME": "pytest-simcore",
Expand Down
3 changes: 2 additions & 1 deletion services/director/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
aiocache
aiodocker
fastapi[all]
httpx
httpx[http2]
prometheus-client
pydantic
tenacity
10 changes: 9 additions & 1 deletion services/director/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ h11==0.14.0
# via
# httpcore
# uvicorn
h2==4.1.0
# via httpx
hpack==4.0.0
# via h2
httpcore==1.0.6
# via httpx
httptools==0.6.4
Expand All @@ -135,6 +139,8 @@ httpx==0.27.2
# -r requirements/../../../packages/service-library/requirements/_fastapi.in
# -r requirements/_base.in
# fastapi
hyperframe==6.0.1
# via h2
idna==3.10
# via
# anyio
Expand Down Expand Up @@ -422,7 +428,9 @@ starlette==0.41.3
# -c requirements/../../../requirements/constraints.txt
# fastapi
tenacity==9.0.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/_base.in
toolz==1.0.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
tqdm==4.67.0
Expand Down
1 change: 1 addition & 0 deletions services/director/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ faker
jsonref
pytest
pytest-asyncio
pytest-benchmark
pytest-cov
pytest-docker
pytest-instafail
Expand Down
5 changes: 5 additions & 0 deletions services/director/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@ propcache==0.2.0
# -c requirements/_base.txt
# aiohttp
# yarl
py-cpuinfo==9.0.0
# via pytest-benchmark
pytest==8.3.3
# via
# -r requirements/_test.in
# pytest-asyncio
# pytest-benchmark
# pytest-cov
# pytest-docker
# pytest-instafail
Expand All @@ -100,6 +103,8 @@ pytest-asyncio==0.23.8
# via
# -c requirements/../../../requirements/constraints.txt
# -r requirements/_test.in
pytest-benchmark==5.1.0
# via -r requirements/_test.in
pytest-cov==6.0.0
# via -r requirements/_test.in
pytest-docker==3.1.1
Expand Down
31 changes: 6 additions & 25 deletions services/director/src/simcore_service_director/client_session.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,22 @@
from aiohttp import ClientSession, ClientTimeout
from common_library.json_serialization import json_dumps
import httpx
from fastapi import FastAPI
from servicelib.utils import (
get_http_client_request_aiohttp_connect_timeout,
get_http_client_request_aiohttp_sock_connect_timeout,
get_http_client_request_total_timeout,
)


def setup_client_session(app: FastAPI) -> None:
async def on_startup() -> None:
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/4628

# ANE: it is important to have fast connection handshakes
# also requests should be as fast as possible
# some services are not that fast to reply
timeout_settings = ClientTimeout(
total=get_http_client_request_total_timeout(),
connect=get_http_client_request_aiohttp_connect_timeout(),
sock_connect=get_http_client_request_aiohttp_sock_connect_timeout(),
)
session = ClientSession(
timeout=timeout_settings,
json_serialize=json_dumps,
)
session = httpx.AsyncClient(transport=httpx.AsyncHTTPTransport(http2=True))
app.state.aiohttp_client_session = session

async def on_shutdown() -> None:
session = app.state.aiohttp_client_session
assert isinstance(session, ClientSession) # nosec
await session.close()
assert isinstance(session, httpx.AsyncClient) # nosec
await session.aclose()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_client_session(app: FastAPI) -> ClientSession:
def get_client_session(app: FastAPI) -> httpx.AsyncClient:
session = app.state.aiohttp_client_session
assert isinstance(session, ClientSession) # nosec
assert isinstance(session, httpx.AsyncClient) # nosec
return session
5 changes: 0 additions & 5 deletions services/director/src/simcore_service_director/core/errors.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import Any

from common_library.errors_classes import OsparcErrorMixin


class DirectorRuntimeError(OsparcErrorMixin, RuntimeError):
def __init__(self, **ctx: Any) -> None:
super().__init__(**ctx)

msg_template: str = "Director-v0 unexpected error: {msg}"


Expand Down
101 changes: 45 additions & 56 deletions services/director/src/simcore_service_director/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,22 @@
import re
from datetime import timedelta
from enum import Enum
from http import HTTPStatus
from pprint import pformat
from typing import Any, Final, cast

import aiodocker
import aiodocker.networks
import aiohttp
import arrow
import httpx
import tenacity
from aiohttp import (
ClientConnectionError,
ClientError,
ClientResponse,
ClientResponseError,
ClientSession,
ClientTimeout,
)
from fastapi import FastAPI
from fastapi import FastAPI, status
from packaging.version import Version
from servicelib.async_utils import run_sequentially_in_context
from servicelib.docker_utils import to_datetime
from settings_library.docker_registry import RegistrySettings
from tenacity import retry
from tenacity import retry, wait_random_exponential
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from . import docker_utils, registry_proxy
from .client_session import get_client_session
Expand Down Expand Up @@ -547,7 +537,7 @@ async def _pass_port_to_service(
service_name: str,
port: str,
service_boot_parameters_labels: list[Any],
session: ClientSession,
session: httpx.AsyncClient,
app_settings: ApplicationSettings,
) -> None:
for param in service_boot_parameters_labels:
Expand All @@ -566,8 +556,8 @@ async def _pass_port_to_service(
"port": str(port),
}
_logger.debug("creating request %s and query %s", service_url, query_string)
async with session.post(service_url, data=query_string) as response:
_logger.debug("query response: %s", await response.text())
response = await session.post(service_url, data=query_string)
_logger.debug("query response: %s", response.text)
return
_logger.debug("service %s does not need to know its external port", service_name)

Expand Down Expand Up @@ -1149,50 +1139,48 @@ async def get_service_details(app: FastAPI, node_uuid: str) -> dict:


@retry(
wait=wait_fixed(2),
wait=wait_random_exponential(min=1, max=5),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(ClientConnectionError),
retry=retry_if_exception_type(httpx.RequestError),
)
async def _save_service_state(
service_host_name: str, session: aiohttp.ClientSession
service_host_name: str, session: httpx.AsyncClient
) -> None:
response: ClientResponse
async with session.post(
url=f"http://{service_host_name}/state", # NOSONAR
timeout=ClientTimeout(
ServicesCommonSettings().director_dynamic_service_save_timeout
),
) as response:
try:
response.raise_for_status()
try:
response = await session.post(
url=f"http://{service_host_name}/state", # NOSONAR
timeout=ServicesCommonSettings().director_dynamic_service_save_timeout,
)
response.raise_for_status()

except ClientResponseError as err:
if err.status in (
HTTPStatus.METHOD_NOT_ALLOWED,
HTTPStatus.NOT_FOUND,
HTTPStatus.NOT_IMPLEMENTED,
):
# NOTE: Legacy Override. Some old services do not have a state entrypoint defined
# therefore we assume there is nothing to be saved and do not raise exception
# Responses found so far:
# METHOD NOT ALLOWED https://httpstatuses.com/405
# NOT FOUND https://httpstatuses.com/404
#
_logger.warning(
"Service '%s' does not seem to implement save state functionality: %s. Skipping save",
service_host_name,
err,
)
else:
# upss ... could service had troubles saving, reraise
raise
else:
_logger.info(
"Service '%s' successfully saved its state: %s",
except httpx.HTTPStatusError as err:

if err.response.status_code in (
status.HTTP_405_METHOD_NOT_ALLOWED,
status.HTTP_404_NOT_FOUND,
status.HTTP_501_NOT_IMPLEMENTED,
):
# NOTE: Legacy Override. Some old services do not have a state entrypoint defined
# therefore we assume there is nothing to be saved and do not raise exception
# Responses found so far:
# METHOD NOT ALLOWED https://httpstatuses.com/405
# NOT FOUND https://httpstatuses.com/404
#
_logger.warning(
"Service '%s' does not seem to implement save state functionality: %s. Skipping save",
service_host_name,
f"{response}",
err,
)
else:
# upss ... could service had troubles saving, reraise
raise
else:
_logger.info(
"Service '%s' successfully saved its state: %s",
service_host_name,
f"{response}",
)


@run_sequentially_in_context(target_args=["node_uuid"])
Expand Down Expand Up @@ -1246,20 +1234,21 @@ async def stop_service(app: FastAPI, *, node_uuid: str, save_state: bool) -> Non
await _save_service_state(
service_host_name, session=get_client_session(app)
)
except ClientResponseError as err:
except httpx.HTTPStatusError as err:

raise ServiceStateSaveError(
service_uuid=node_uuid,
reason=f"service {service_host_name} rejected to save state, "
f"responded {err.message} (status {err.status})."
f"responded {err.response.text} (status {err.response.status_code})."
"Aborting stop service to prevent data loss.",
) from err

except ClientError as err:
except httpx.RequestError as err:
_logger.warning(
"Could not save state because %s is unreachable [%s]."
"Resuming stop_service.",
service_host_name,
err,
err.request,
)

# remove the services
Expand Down
Loading

0 comments on commit 3655208

Please sign in to comment.