diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index daef990abf1..32636f6b9df 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -1,7 +1,14 @@ from functools import cached_property -from typing import Any +from typing import Any, Self -from pydantic import Field, SecretStr, field_validator +from pydantic import ( + AnyHttpUrl, + Field, + SecretStr, + TypeAdapter, + field_validator, + model_validator, +) from pydantic_settings import SettingsConfigDict from .base import BaseCustomSettings @@ -12,10 +19,15 @@ class RegistrySettings(BaseCustomSettings): REGISTRY_PATH: str | None = Field( default=None, # This is useful in case of a local registry, where the registry url (path) is relative to the host docker engine" - description="development mode only, in case a local registry is used", + description="development mode only, in case a local registry is used - " + "this is the hostname to the docker registry as seen from the host running the containers (e.g. 127.0.0.1:5000)", ) # NOTE: name is missleading, http or https protocol are not included - REGISTRY_URL: str = Field(default="", description="address to the docker registry") + REGISTRY_URL: str = Field( + ..., + description="hostname of docker registry (without protocol but with port if available)", + min_length=1, + ) REGISTRY_USER: str = Field( ..., description="username to access the docker registry" @@ -23,20 +35,33 @@ class RegistrySettings(BaseCustomSettings): REGISTRY_PW: SecretStr = Field( ..., description="password to access the docker registry" ) - REGISTRY_SSL: bool = Field(..., description="access to registry through ssl") + REGISTRY_SSL: bool = Field( + ..., description="True if docker registry is using HTTPS protocol" + ) @field_validator("REGISTRY_PATH", mode="before") @classmethod def _escape_none_string(cls, v) -> Any | None: return None if v == "None" else v + @model_validator(mode="after") + def check_registry_authentication(self: Self) -> Self: + if self.REGISTRY_AUTH and any( + not v for v in (self.REGISTRY_USER, self.REGISTRY_PW) + ): + msg = "If REGISTRY_AUTH is True, both REGISTRY_USER and REGISTRY_PW must be provided" + raise ValueError(msg) + return self + @cached_property def resolved_registry_url(self) -> str: return self.REGISTRY_PATH or self.REGISTRY_URL @cached_property - def api_url(self) -> str: - return f"{self.REGISTRY_URL}/v2" + def api_url(self) -> AnyHttpUrl: + return TypeAdapter(AnyHttpUrl).validate_python( + f"http{'s' if self.REGISTRY_SSL else ''}://{self.REGISTRY_URL}/v2" + ) model_config = SettingsConfigDict( json_schema_extra={ diff --git a/packages/settings-library/tests/test_docker_registry.py b/packages/settings-library/tests/test_docker_registry.py index fdaf5a4bc66..0cc513399c8 100644 --- a/packages/settings-library/tests/test_docker_registry.py +++ b/packages/settings-library/tests/test_docker_registry.py @@ -11,6 +11,7 @@ "REGISTRY_USER": "usr", "REGISTRY_PW": "pwd", "REGISTRY_SSL": "False", + "REGISTRY_URL": "pytest.registry.com", } diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index d0a70389caa..fcc0db6dbf1 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -186,6 +186,7 @@ def mock_env( "REGISTRY_PW": "test", "REGISTRY_SSL": "false", "REGISTRY_USER": "test", + "REGISTRY_URL": faker.url(), "SC_BOOT_MODE": "production", "SIMCORE_SERVICES_NETWORK_NAME": "test_network_name", "SWARM_STACK_NAME": "pytest-simcore", diff --git a/services/director/requirements/_base.in b/services/director/requirements/_base.in index af6b80486e8..97b2752fbf0 100644 --- a/services/director/requirements/_base.in +++ b/services/director/requirements/_base.in @@ -15,6 +15,7 @@ aiocache aiodocker fastapi[all] -httpx +httpx[http2] prometheus-client pydantic +tenacity diff --git a/services/director/requirements/_base.txt b/services/director/requirements/_base.txt index d9164137fa0..656861c1ba1 100644 --- a/services/director/requirements/_base.txt +++ b/services/director/requirements/_base.txt @@ -114,6 +114,10 @@ h11==0.14.0 # via # httpcore # uvicorn +h2==4.1.0 + # via httpx +hpack==4.0.0 + # via h2 httpcore==1.0.6 # via httpx httptools==0.6.4 @@ -135,6 +139,8 @@ httpx==0.27.2 # -r requirements/../../../packages/service-library/requirements/_fastapi.in # -r requirements/_base.in # fastapi +hyperframe==6.0.1 + # via h2 idna==3.10 # via # anyio @@ -422,7 +428,9 @@ starlette==0.41.3 # -c requirements/../../../requirements/constraints.txt # fastapi tenacity==9.0.0 - # via -r requirements/../../../packages/service-library/requirements/_base.in + # via + # -r requirements/../../../packages/service-library/requirements/_base.in + # -r requirements/_base.in toolz==1.0.0 # via -r requirements/../../../packages/service-library/requirements/_base.in tqdm==4.67.0 diff --git a/services/director/requirements/_test.in b/services/director/requirements/_test.in index a6bd90a3acf..b5787990732 100644 --- a/services/director/requirements/_test.in +++ b/services/director/requirements/_test.in @@ -16,6 +16,7 @@ faker jsonref pytest pytest-asyncio +pytest-benchmark pytest-cov pytest-docker pytest-instafail diff --git a/services/director/requirements/_test.txt b/services/director/requirements/_test.txt index 8d14d466266..2e08a63301e 100644 --- a/services/director/requirements/_test.txt +++ b/services/director/requirements/_test.txt @@ -87,10 +87,13 @@ propcache==0.2.0 # -c requirements/_base.txt # aiohttp # yarl +py-cpuinfo==9.0.0 + # via pytest-benchmark pytest==8.3.3 # via # -r requirements/_test.in # pytest-asyncio + # pytest-benchmark # pytest-cov # pytest-docker # pytest-instafail @@ -100,6 +103,8 @@ pytest-asyncio==0.23.8 # via # -c requirements/../../../requirements/constraints.txt # -r requirements/_test.in +pytest-benchmark==5.1.0 + # via -r requirements/_test.in pytest-cov==6.0.0 # via -r requirements/_test.in pytest-docker==3.1.1 diff --git a/services/director/src/simcore_service_director/client_session.py b/services/director/src/simcore_service_director/client_session.py index b053d06c456..de700737399 100644 --- a/services/director/src/simcore_service_director/client_session.py +++ b/services/director/src/simcore_service_director/client_session.py @@ -1,41 +1,22 @@ -from aiohttp import ClientSession, ClientTimeout -from common_library.json_serialization import json_dumps +import httpx from fastapi import FastAPI -from servicelib.utils import ( - get_http_client_request_aiohttp_connect_timeout, - get_http_client_request_aiohttp_sock_connect_timeout, - get_http_client_request_total_timeout, -) def setup_client_session(app: FastAPI) -> None: async def on_startup() -> None: - # SEE https://github.com/ITISFoundation/osparc-simcore/issues/4628 - - # ANE: it is important to have fast connection handshakes - # also requests should be as fast as possible - # some services are not that fast to reply - timeout_settings = ClientTimeout( - total=get_http_client_request_total_timeout(), - connect=get_http_client_request_aiohttp_connect_timeout(), - sock_connect=get_http_client_request_aiohttp_sock_connect_timeout(), - ) - session = ClientSession( - timeout=timeout_settings, - json_serialize=json_dumps, - ) + session = httpx.AsyncClient(transport=httpx.AsyncHTTPTransport(http2=True)) app.state.aiohttp_client_session = session async def on_shutdown() -> None: session = app.state.aiohttp_client_session - assert isinstance(session, ClientSession) # nosec - await session.close() + assert isinstance(session, httpx.AsyncClient) # nosec + await session.aclose() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) -def get_client_session(app: FastAPI) -> ClientSession: +def get_client_session(app: FastAPI) -> httpx.AsyncClient: session = app.state.aiohttp_client_session - assert isinstance(session, ClientSession) # nosec + assert isinstance(session, httpx.AsyncClient) # nosec return session diff --git a/services/director/src/simcore_service_director/core/errors.py b/services/director/src/simcore_service_director/core/errors.py index 23ca2fc17fc..c7113baa402 100644 --- a/services/director/src/simcore_service_director/core/errors.py +++ b/services/director/src/simcore_service_director/core/errors.py @@ -1,12 +1,7 @@ -from typing import Any - from common_library.errors_classes import OsparcErrorMixin class DirectorRuntimeError(OsparcErrorMixin, RuntimeError): - def __init__(self, **ctx: Any) -> None: - super().__init__(**ctx) - msg_template: str = "Director-v0 unexpected error: {msg}" diff --git a/services/director/src/simcore_service_director/producer.py b/services/director/src/simcore_service_director/producer.py index ff791a4066f..79e695ead57 100644 --- a/services/director/src/simcore_service_director/producer.py +++ b/services/director/src/simcore_service_director/producer.py @@ -5,32 +5,22 @@ import re from datetime import timedelta from enum import Enum -from http import HTTPStatus from pprint import pformat from typing import Any, Final, cast import aiodocker import aiodocker.networks -import aiohttp import arrow +import httpx import tenacity -from aiohttp import ( - ClientConnectionError, - ClientError, - ClientResponse, - ClientResponseError, - ClientSession, - ClientTimeout, -) -from fastapi import FastAPI +from fastapi import FastAPI, status from packaging.version import Version from servicelib.async_utils import run_sequentially_in_context from servicelib.docker_utils import to_datetime from settings_library.docker_registry import RegistrySettings -from tenacity import retry +from tenacity import retry, wait_random_exponential from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_attempt -from tenacity.wait import wait_fixed from . import docker_utils, registry_proxy from .client_session import get_client_session @@ -547,7 +537,7 @@ async def _pass_port_to_service( service_name: str, port: str, service_boot_parameters_labels: list[Any], - session: ClientSession, + session: httpx.AsyncClient, app_settings: ApplicationSettings, ) -> None: for param in service_boot_parameters_labels: @@ -566,8 +556,8 @@ async def _pass_port_to_service( "port": str(port), } _logger.debug("creating request %s and query %s", service_url, query_string) - async with session.post(service_url, data=query_string) as response: - _logger.debug("query response: %s", await response.text()) + response = await session.post(service_url, data=query_string) + _logger.debug("query response: %s", response.text) return _logger.debug("service %s does not need to know its external port", service_name) @@ -1149,50 +1139,48 @@ async def get_service_details(app: FastAPI, node_uuid: str) -> dict: @retry( - wait=wait_fixed(2), + wait=wait_random_exponential(min=1, max=5), stop=stop_after_attempt(3), reraise=True, - retry=retry_if_exception_type(ClientConnectionError), + retry=retry_if_exception_type(httpx.RequestError), ) async def _save_service_state( - service_host_name: str, session: aiohttp.ClientSession + service_host_name: str, session: httpx.AsyncClient ) -> None: - response: ClientResponse - async with session.post( - url=f"http://{service_host_name}/state", # NOSONAR - timeout=ClientTimeout( - ServicesCommonSettings().director_dynamic_service_save_timeout - ), - ) as response: - try: - response.raise_for_status() + try: + response = await session.post( + url=f"http://{service_host_name}/state", # NOSONAR + timeout=ServicesCommonSettings().director_dynamic_service_save_timeout, + ) + response.raise_for_status() - except ClientResponseError as err: - if err.status in ( - HTTPStatus.METHOD_NOT_ALLOWED, - HTTPStatus.NOT_FOUND, - HTTPStatus.NOT_IMPLEMENTED, - ): - # NOTE: Legacy Override. Some old services do not have a state entrypoint defined - # therefore we assume there is nothing to be saved and do not raise exception - # Responses found so far: - # METHOD NOT ALLOWED https://httpstatuses.com/405 - # NOT FOUND https://httpstatuses.com/404 - # - _logger.warning( - "Service '%s' does not seem to implement save state functionality: %s. Skipping save", - service_host_name, - err, - ) - else: - # upss ... could service had troubles saving, reraise - raise - else: - _logger.info( - "Service '%s' successfully saved its state: %s", + except httpx.HTTPStatusError as err: + + if err.response.status_code in ( + status.HTTP_405_METHOD_NOT_ALLOWED, + status.HTTP_404_NOT_FOUND, + status.HTTP_501_NOT_IMPLEMENTED, + ): + # NOTE: Legacy Override. Some old services do not have a state entrypoint defined + # therefore we assume there is nothing to be saved and do not raise exception + # Responses found so far: + # METHOD NOT ALLOWED https://httpstatuses.com/405 + # NOT FOUND https://httpstatuses.com/404 + # + _logger.warning( + "Service '%s' does not seem to implement save state functionality: %s. Skipping save", service_host_name, - f"{response}", + err, ) + else: + # upss ... could service had troubles saving, reraise + raise + else: + _logger.info( + "Service '%s' successfully saved its state: %s", + service_host_name, + f"{response}", + ) @run_sequentially_in_context(target_args=["node_uuid"]) @@ -1246,20 +1234,21 @@ async def stop_service(app: FastAPI, *, node_uuid: str, save_state: bool) -> Non await _save_service_state( service_host_name, session=get_client_session(app) ) - except ClientResponseError as err: + except httpx.HTTPStatusError as err: + raise ServiceStateSaveError( service_uuid=node_uuid, reason=f"service {service_host_name} rejected to save state, " - f"responded {err.message} (status {err.status})." + f"responded {err.response.text} (status {err.response.status_code})." "Aborting stop service to prevent data loss.", ) from err - except ClientError as err: + except httpx.RequestError as err: _logger.warning( "Could not save state because %s is unreachable [%s]." "Resuming stop_service.", service_host_name, - err, + err.request, ) # remove the services diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 3099627ddc6..9fa703a24db 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -1,21 +1,22 @@ +import asyncio import enum import json import logging import re -from collections.abc import Mapping -from http import HTTPStatus +from collections.abc import AsyncGenerator, Mapping from pprint import pformat -from typing import Any, cast +from typing import Any, Final, cast +import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] -from aiohttp import BasicAuth, ClientSession, client_exceptions -from aiohttp.client import ClientTimeout -from fastapi import FastAPI -from servicelib.utils import limited_gather +from fastapi import FastAPI, status +from servicelib.logging_utils import log_catch, log_context +from servicelib.utils import limited_as_completed from tenacity import retry from tenacity.before_sleep import before_sleep_log -from tenacity.retry import retry_if_result -from tenacity.wait import wait_fixed +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed, wait_random_exponential from yarl import URL from .client_session import get_client_session @@ -37,7 +38,7 @@ r"^(0|[1-9]\d*)(\.(0|[1-9]\d*)){2}(-(0|[1-9]\d*|\d*[-a-zA-Z][-\da-zA-Z]*)(\.(0|[1-9]\d*|\d*[-a-zA-Z][-\da-zA-Z]*))*)?(\+[-\da-zA-Z]+(\.[-\da-zA-Z-]+)*)?$" ) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) # # NOTE: if you are refactoring this module, @@ -55,81 +56,64 @@ async def _basic_auth_registry_request( app: FastAPI, path: str, method: str, **session_kwargs ) -> tuple[dict, Mapping]: app_settings = get_application_settings(app) - if not app_settings.DIRECTOR_REGISTRY.REGISTRY_URL: - msg = "URL to registry is not defined" - raise DirectorRuntimeError(msg=msg) - - url = URL( - f"{'https' if app_settings.DIRECTOR_REGISTRY.REGISTRY_SSL else 'http'}://{app_settings.DIRECTOR_REGISTRY.REGISTRY_URL}{path}" - ) - logger.debug("Requesting registry using %s", url) # try the registry with basic authentication first, spare 1 call resp_data: dict = {} resp_headers: Mapping = {} auth = ( - BasicAuth( - login=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, + httpx.BasicAuth( + username=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, password=app_settings.DIRECTOR_REGISTRY.REGISTRY_PW.get_secret_value(), ) if app_settings.DIRECTOR_REGISTRY.REGISTRY_AUTH - and app_settings.DIRECTOR_REGISTRY.REGISTRY_USER - and app_settings.DIRECTOR_REGISTRY.REGISTRY_PW else None ) + request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.api_url}").joinpath( + path, encoded=True + ) + session = get_client_session(app) - try: - async with session.request( - method.lower(), url, auth=auth, **session_kwargs - ) as response: - if response.status == HTTPStatus.UNAUTHORIZED: - logger.debug("Registry unauthorized request: %s", await response.text()) - # basic mode failed, test with other auth mode - resp_data, resp_headers = await _auth_registry_request( - app_settings, - url, - method, - response.headers, - session, - **session_kwargs, - ) + response = await session.request( + method.lower(), + f"{request_url}", + auth=auth, + **session_kwargs, + ) + + if response.status_code == status.HTTP_401_UNAUTHORIZED: + # basic mode failed, test with other auth mode + resp_data, resp_headers = await _auth_registry_request( + app_settings, + request_url, + method, + response.headers, + session, + **session_kwargs, + ) - elif response.status == HTTPStatus.NOT_FOUND: - raise ServiceNotAvailableError(service_name=path) + elif response.status_code == status.HTTP_404_NOT_FOUND: + raise ServiceNotAvailableError(service_name=path) - elif response.status > 399: - logger.exception( - "Unknown error while accessing registry: %s", str(response) - ) - raise RegistryConnectionError(msg=str(response)) + elif response.status_code >= status.HTTP_400_BAD_REQUEST: + raise RegistryConnectionError(msg=str(response)) - else: - # registry that does not need an auth - resp_data = await response.json(content_type=None) - resp_headers = response.headers - - return (resp_data, resp_headers) - except client_exceptions.ClientError as exc: - logger.exception("Unknown error while accessing registry") - msg = f"Unknown error while accessing registry: {exc!s}" - raise DirectorRuntimeError(msg=msg) from exc + else: + # registry that does not need an auth + if method.lower() != "head": + resp_data = response.json() + resp_headers = response.headers + + return (resp_data, resp_headers) -async def _auth_registry_request( +async def _auth_registry_request( # noqa: C901 app_settings: ApplicationSettings, url: URL, method: str, auth_headers: Mapping, - session: ClientSession, + session: httpx.AsyncClient, **kwargs, ) -> tuple[dict, Mapping]: - if ( - not app_settings.DIRECTOR_REGISTRY.REGISTRY_AUTH - or not app_settings.DIRECTOR_REGISTRY.REGISTRY_USER - or not app_settings.DIRECTOR_REGISTRY.REGISTRY_PW - ): - msg = "Wrong configuration: Authentication to registry is needed!" - raise RegistryConnectionError(msg=msg) # auth issue let's try some authentication get the auth type auth_type = None auth_details: dict[str, str] = {} @@ -144,8 +128,8 @@ async def _auth_registry_request( if not auth_type: msg = "Unknown registry type: cannot deduce authentication method!" raise RegistryConnectionError(msg=msg) - auth = BasicAuth( - login=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, + auth = httpx.BasicAuth( + username=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, password=app_settings.DIRECTOR_REGISTRY.REGISTRY_PW.get_secret_value(), ) @@ -155,58 +139,60 @@ async def _auth_registry_request( token_url = URL(auth_details["realm"]).with_query( service=auth_details["service"], scope=auth_details["scope"] ) - async with session.get(token_url, auth=auth, **kwargs) as token_resp: - if token_resp.status != HTTPStatus.OK: - msg = f"Unknown error while authentifying with registry: {token_resp!s}" - raise RegistryConnectionError(msg=msg) - bearer_code = (await token_resp.json())["token"] - headers = {"Authorization": f"Bearer {bearer_code}"} - async with getattr(session, method.lower())( - url, headers=headers, **kwargs - ) as resp_wtoken: - if resp_wtoken.status == HTTPStatus.NOT_FOUND: - logger.exception("path to registry not found: %s", url) - raise ServiceNotAvailableError(service_name=f"{url}") - if resp_wtoken.status > 399: - logger.exception( - "Unknown error while accessing with token authorized registry: %s", - str(resp_wtoken), - ) - raise RegistryConnectionError(msg=f"{resp_wtoken}") - resp_data = await resp_wtoken.json(content_type=None) - resp_headers = resp_wtoken.headers - return (resp_data, resp_headers) - elif auth_type == "Basic": + token_resp = await session.get(f"{token_url}", auth=auth, **kwargs) + if token_resp.status_code != status.HTTP_200_OK: + msg = f"Unknown error while authentifying with registry: {token_resp!s}" + raise RegistryConnectionError(msg=msg) + + bearer_code = (await token_resp.json())["token"] + headers = {"Authorization": f"Bearer {bearer_code}"} + resp_wtoken = await getattr(session, method.lower())( + url, headers=headers, **kwargs + ) + assert isinstance(resp_wtoken, httpx.Response) # nosec + if resp_wtoken.status_code == status.HTTP_404_NOT_FOUND: + raise ServiceNotAvailableError(service_name=f"{url}") + if resp_wtoken.status_code >= status.HTTP_400_BAD_REQUEST: + raise RegistryConnectionError(msg=f"{resp_wtoken}") + resp_data = await resp_wtoken.json(content_type=None) + resp_headers = resp_wtoken.headers + return (resp_data, resp_headers) + if auth_type == "Basic": # basic authentication should not be since we tried already... - async with getattr(session, method.lower())( - url, auth=auth, **kwargs - ) as resp_wbasic: - if resp_wbasic.status == HTTPStatus.NOT_FOUND: - logger.exception("path to registry not found: %s", url) - raise ServiceNotAvailableError(service_name=f"{url}") - if resp_wbasic.status > 399: - logger.exception( - "Unknown error while accessing with token authorized registry: %s", - str(resp_wbasic), - ) - raise RegistryConnectionError(msg=f"{resp_wbasic}") - resp_data = await resp_wbasic.json(content_type=None) - resp_headers = resp_wbasic.headers - return (resp_data, resp_headers) + resp_wbasic = await getattr(session, method.lower())(url, auth=auth, **kwargs) + assert isinstance(resp_wbasic, httpx.Response) # nosec + if resp_wbasic.status_code == status.HTTP_404_NOT_FOUND: + raise ServiceNotAvailableError(service_name=f"{url}") + if resp_wbasic.status_code >= status.HTTP_400_BAD_REQUEST: + raise RegistryConnectionError(msg=f"{resp_wbasic}") + resp_data = await resp_wbasic.json(content_type=None) + resp_headers = resp_wbasic.headers + return (resp_data, resp_headers) msg = f"Unknown registry authentification type: {url}" raise RegistryConnectionError(msg=msg) +@retry( + retry=retry_if_exception_type((httpx.RequestError, TimeoutError)), + wait=wait_random_exponential(min=1, max=10), + stop=stop_after_delay(120), + before_sleep=before_sleep_log(_logger, logging.WARNING), + reraise=True, +) +async def _retried_request( + app: FastAPI, path: str, method: str, **session_kwargs +) -> tuple[dict, Mapping]: + return await _basic_auth_registry_request(app, path, method, **session_kwargs) + + async def registry_request( app: FastAPI, + *, path: str, method: str = "GET", no_cache: bool = False, **session_kwargs, ) -> tuple[dict, Mapping]: - logger.debug( - "Request to registry: path=%s, method=%s. no_cache=%s", path, method, no_cache - ) cache: SimpleMemoryCache = app.state.registry_cache_memory cache_key = f"{method}_{path}" if not no_cache and (cached_response := await cache.get(cache_key)): @@ -214,9 +200,13 @@ async def registry_request( return cast(tuple[dict, Mapping], cached_response) app_settings = get_application_settings(app) - response, response_headers = await _basic_auth_registry_request( - app, path, method, **session_kwargs - ) + try: + response, response_headers = await _retried_request( + app, path, method, **session_kwargs + ) + except httpx.RequestError as exc: + msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}" + raise DirectorRuntimeError(msg=msg) from exc if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET": await cache.set( @@ -228,32 +218,22 @@ async def registry_request( return response, response_headers -async def _is_registry_responsive(app: FastAPI) -> bool: - path = "/v2/" - try: - await registry_request( - app, path, no_cache=True, timeout=ClientTimeout(total=1.0) - ) - return True - except (TimeoutError, DirectorRuntimeError) as exc: - logger.debug("Registry not responsive: %s", exc) - return False +async def _is_registry_responsive(app: FastAPI) -> None: + await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) async def _setup_registry(app: FastAPI) -> None: - logger.debug("pinging registry...") - @retry( - wait=wait_fixed(2), - before_sleep=before_sleep_log(logger, logging.WARNING), - retry=retry_if_result(lambda result: result is False), + wait=wait_fixed(1), + before_sleep=before_sleep_log(_logger, logging.WARNING), + retry=retry_if_exception_type((httpx.RequestError, DirectorRuntimeError)), reraise=True, ) - async def wait_until_registry_responsive(app: FastAPI) -> bool: - return await _is_registry_responsive(app) + async def _wait_until_registry_responsive(app: FastAPI) -> None: + await _is_registry_responsive(app) - await wait_until_registry_responsive(app) - logger.info("Connected to docker registry") + with log_context(_logger, logging.INFO, msg="Connecting to docker registry"): + await _wait_until_registry_responsive(app) def setup(app: FastAPI) -> None: @@ -271,35 +251,83 @@ async def on_shutdown() -> None: app.add_event_handler("shutdown", on_shutdown) -async def _list_repositories(app: FastAPI) -> list[str]: - logger.debug("listing repositories") - # if there are more repos, the Link will be available in the response headers until none available - path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - repos_list: list = [] - while True: - result, headers = await registry_request(app, path) - if result["repositories"]: - repos_list.extend(result["repositories"]) - if "Link" not in headers: - break - path = str(headers["Link"]).split(";")[0].strip("<>") - logger.debug("listed %s repositories", len(repos_list)) - return repos_list +def _get_prefix(service_type: ServiceType) -> str: + return f"{DIRECTOR_SIMCORE_SERVICES_PREFIX}/{service_type.value}/" + + +_SERVICE_TYPE_FILTER_MAP: Final[dict[ServiceType, tuple[str, ...]]] = { + ServiceType.DYNAMIC: (_get_prefix(ServiceType.DYNAMIC),), + ServiceType.COMPUTATIONAL: (_get_prefix(ServiceType.COMPUTATIONAL),), + ServiceType.ALL: ( + _get_prefix(ServiceType.DYNAMIC), + _get_prefix(ServiceType.COMPUTATIONAL), + ), +} + + +async def _list_repositories_gen( + app: FastAPI, service_type: ServiceType +) -> AsyncGenerator[list[str], None]: + with log_context(_logger, logging.DEBUG, msg="listing repositories"): + path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + result, headers = await registry_request(app, path=path) # initial call + + while True: + if "Link" in headers: + next_path = ( + str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") + ) + prefetch_task = asyncio.create_task( + registry_request(app, path=next_path) + ) + else: + prefetch_task = None + + yield list( + filter( + lambda x: str(x).startswith(_SERVICE_TYPE_FILTER_MAP[service_type]), + result["repositories"], + ) + ) + if prefetch_task: + result, headers = await prefetch_task + else: + return + + +async def list_image_tags_gen( + app: FastAPI, image_key: str +) -> AsyncGenerator[list[str], None]: + with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): + path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + tags, headers = await registry_request(app, path=path) # initial call + while True: + if "Link" in headers: + next_path = ( + str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") + ) + prefetch_task = asyncio.create_task( + registry_request(app, path=next_path) + ) + else: + prefetch_task = None + + yield list( + filter( + VERSION_REG.match, + tags["tags"], + ) + ) + if prefetch_task: + tags, headers = await prefetch_task + else: + return async def list_image_tags(app: FastAPI, image_key: str) -> list[str]: - logger.debug("listing image tags in %s", image_key) - image_tags: list = [] - # get list of image tags - path = f"/v2/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - while True: - tags, headers = await registry_request(app, path) - if tags["tags"]: - image_tags.extend([tag for tag in tags["tags"] if VERSION_REG.match(tag)]) - if "Link" not in headers: - break - path = str(headers["Link"]).split(";")[0].strip("<>") - logger.debug("Found %s image tags in %s", len(image_tags), image_key) + image_tags = [] + async for tags in list_image_tags_gen(app, image_key): + image_tags.extend(tags) return image_tags @@ -313,8 +341,8 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ - path = f"/v2/{image}/manifests/{tag}" - _, headers = await registry_request(app, path) + path = f"{image}/manifests/{tag}" + _, headers = await registry_request(app, path=path) headers = headers or {} return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) @@ -325,9 +353,9 @@ async def get_image_labels( ) -> tuple[dict[str, str], str | None]: """Returns image labels and the image manifest digest""" - logger.debug("getting image labels of %s:%s", image, tag) - path = f"/v2/{image}/manifests/{tag}" - request_result, headers = await registry_request(app, path) + _logger.debug("getting image labels of %s:%s", image, tag) + path = f"{image}/manifests/{tag}" + request_result, headers = await registry_request(app, path=path) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( "container_config", v1_compatibility_key["config"] @@ -337,7 +365,7 @@ async def get_image_labels( headers = headers or {} manifest_digest: str | None = headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) - logger.debug("retrieved labels of image %s:%s", image, tag) + _logger.debug("retrieved labels of image %s:%s", image, tag) return (labels, manifest_digest) @@ -375,48 +403,37 @@ async def get_image_details( async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]: - - image_tags = await list_image_tags(app, image_key) - - results = await limited_gather( - *[get_image_details(app, image_key, tag) for tag in image_tags], - reraise=False, - log=logger, - limit=get_application_settings( - app - ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, - ) - return [result for result in results if not isinstance(result, BaseException)] + repo_details = [] + async for image_tags in list_image_tags_gen(app, image_key): + async for image_details_future in limited_as_completed( + (get_image_details(app, image_key, tag) for tag in image_tags), + limit=get_application_settings( + app + ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, + ): + with log_catch(_logger, reraise=False): + if image_details := await image_details_future: + repo_details.append(image_details) + return repo_details async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: - logger.debug("getting list of services") - repos = await _list_repositories(app) - # get the services repos - prefixes = [] - if service_type in [ServiceType.DYNAMIC, ServiceType.ALL]: - prefixes.append(_get_prefix(ServiceType.DYNAMIC)) - if service_type in [ServiceType.COMPUTATIONAL, ServiceType.ALL]: - prefixes.append(_get_prefix(ServiceType.COMPUTATIONAL)) - repos = [x for x in repos if str(x).startswith(tuple(prefixes))] - logger.debug("retrieved list of repos : %s", repos) - - # only list as service if it actually contains the necessary labels - results = await limited_gather( - *[get_repo_details(app, repo) for repo in repos], - reraise=False, - log=logger, - limit=get_application_settings( + with log_context(_logger, logging.DEBUG, msg="listing services"): + services = [] + concurrency_limit = get_application_settings( app - ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, - ) + ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS + async for repos in _list_repositories_gen(app, service_type): + # only list as service if it actually contains the necessary labels + async for repo_details_future in limited_as_completed( + (get_repo_details(app, repo) for repo in repos), + limit=concurrency_limit, + ): + with log_catch(_logger, reraise=False): + if repo_details := await repo_details_future: + services.extend(repo_details) - return [ - service - for repo_details in results - if isinstance(repo_details, list) - for service in repo_details - ] + return services async def list_interactive_service_dependencies( @@ -441,10 +458,6 @@ async def list_interactive_service_dependencies( return dependency_keys -def _get_prefix(service_type: ServiceType) -> str: - return f"{DIRECTOR_SIMCORE_SERVICES_PREFIX}/{service_type.value}/" - - def get_service_first_name(image_key: str) -> str: if str(image_key).startswith(_get_prefix(ServiceType.DYNAMIC)): service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)) :] @@ -455,7 +468,7 @@ def get_service_first_name(image_key: str) -> str: else: return "invalid service" - logger.debug( + _logger.debug( "retrieved service name from repo %s : %s", image_key, service_name_suffixes ) return service_name_suffixes.split("/")[0] @@ -471,7 +484,7 @@ def get_service_last_names(image_key: str) -> str: else: return "invalid service" service_last_name = str(service_name_suffixes).replace("/", "_") - logger.debug( + _logger.debug( "retrieved service last name from repo %s : %s", image_key, service_last_name ) return service_last_name @@ -506,7 +519,7 @@ async def get_service_extras( } labels, _ = await get_image_labels(app, image_key, image_tag) - logger.debug("Compiling service extras from labels %s", pformat(labels)) + _logger.debug("Compiling service extras from labels %s", pformat(labels)) if SERVICE_RUNTIME_SETTINGS in labels: service_settings: list[dict[str, Any]] = json.loads( @@ -557,7 +570,7 @@ async def get_service_extras( invalid_with_msg = f"invalid container_spec [{entry_value}]" if invalid_with_msg: - logger.warning( + _logger.warning( "%s entry [%s] encoded in settings labels of service image %s:%s", invalid_with_msg, entry, @@ -574,6 +587,6 @@ async def get_service_extras( } ) - logger.debug("Following service extras were compiled: %s", pformat(result)) + _logger.debug("Following service extras were compiled: %s", pformat(result)) return result diff --git a/services/director/tests/unit/api/test_rest_running_interactive_services.py b/services/director/tests/unit/api/test_rest_running_interactive_services.py index 97accd23279..fa6fabd6948 100644 --- a/services/director/tests/unit/api/test_rest_running_interactive_services.py +++ b/services/director/tests/unit/api/test_rest_running_interactive_services.py @@ -7,7 +7,7 @@ import httpx import pytest -from aioresponses import CallbackResult, aioresponses +import respx from faker import Faker from fastapi import status from models_library.projects import ProjectID @@ -172,29 +172,23 @@ async def test_running_services_post_and_delete( if save_state: query_params.update({"save_state": "true" if save_state else "false"}) - mocked_save_state_cb = mocker.MagicMock( - return_value=CallbackResult(status=200, payload={}) - ) - PASSTHROUGH_REQUESTS_PREFIXES = [ - "http://127.0.0.1", - "http://localhost", - "unix://", # docker engine - "ws://", # websockets - ] - with aioresponses(passthrough=PASSTHROUGH_REQUESTS_PREFIXES) as mock: + with respx.mock( + base_url=f"http://{service_host}:{service_port}{service_basepath}", + assert_all_called=False, + assert_all_mocked=False, + ) as respx_mock: + + def _save_me(request) -> httpx.Response: + return httpx.Response(status.HTTP_200_OK, json={}) + + respx_mock.post("/state", name="save_state").mock(side_effect=_save_me) + respx_mock.route(host="127.0.0.1", name="host").pass_through() + respx_mock.route(host="localhost", name="localhost").pass_through() - # POST /http://service_host:service_port service_basepath/state ------------------------------------------------- - mock.post( - f"http://{service_host}:{service_port}{service_basepath}/state", - status=200, - callback=mocked_save_state_cb, - ) resp = await client.delete( f"/{api_version_prefix}/running_interactive_services/{params['service_uuid']}", params=query_params, ) - if expected_save_state_call: - mocked_save_state_cb.assert_called_once() text = resp.text assert resp.status_code == status.HTTP_204_NO_CONTENT, text diff --git a/services/director/tests/unit/conftest.py b/services/director/tests/unit/conftest.py index 75ba8e7fd5c..15b7627e29d 100644 --- a/services/director/tests/unit/conftest.py +++ b/services/director/tests/unit/conftest.py @@ -13,6 +13,7 @@ from fastapi import FastAPI from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.docker_registry import RegistrySettings from simcore_service_director.core.application import create_app from simcore_service_director.core.settings import ApplicationSettings @@ -31,12 +32,6 @@ ] -def pytest_addoption(parser): - parser.addoption("--registry_url", action="store", default="default url") - parser.addoption("--registry_user", action="store", default="default user") - parser.addoption("--registry_pw", action="store", default="default pw") - - @pytest.fixture(scope="session") def project_slug_dir(osparc_simcore_root_dir: Path) -> Path: # fixtures in pytest_simcore.environs @@ -87,6 +82,23 @@ def configure_registry_access( ) +@pytest.fixture +def configure_external_registry_access( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, + external_registry_settings: RegistrySettings | None, +) -> EnvVarsDict: + assert external_registry_settings + return app_environment | setenvs_from_dict( + monkeypatch, + envs={ + **external_registry_settings.model_dump(by_alias=True, exclude_none=True), + "REGISTRY_PW": external_registry_settings.REGISTRY_PW.get_secret_value(), + "DIRECTOR_REGISTRY_CACHING": False, + }, + ) + + @pytest.fixture(scope="session") def configure_custom_registry( app_environment: EnvVarsDict, diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index 2e5738c2670..7861179323c 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -1,13 +1,16 @@ # pylint: disable=W0613, W0621 # pylint: disable=unused-variable +import asyncio import json import time import pytest from fastapi import FastAPI +from pytest_benchmark.plugin import BenchmarkFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.docker_registry import RegistrySettings from simcore_service_director import registry_proxy from simcore_service_director.core.settings import ApplicationSettings @@ -203,6 +206,19 @@ async def test_get_image_details( assert details == service_description +async def test_list_services( + configure_registry_access: EnvVarsDict, + configure_number_concurrency_calls: EnvVarsDict, + app: FastAPI, + push_services, +): + await push_services( + number_of_computational_services=21, number_of_interactive_services=21 + ) + services = await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) + assert len(services) == 42 + + @pytest.fixture def configure_registry_caching( app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch @@ -220,7 +236,7 @@ async def test_registry_caching( push_services, ): images = await push_services( - number_of_computational_services=21, number_of_interactive_services=21 + number_of_computational_services=201, number_of_interactive_services=201 ) assert app_settings.DIRECTOR_REGISTRY_CACHING is True @@ -237,17 +253,42 @@ async def test_registry_caching( print("time to retrieve services with cache: ", time_to_retrieve_with_cache) -@pytest.mark.skip(reason="test needs credentials to real registry") -async def test_get_services_performance( - configure_registry_access: EnvVarsDict, +@pytest.fixture +def configure_number_concurrency_calls( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + envs={ + "DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS": "50", + "DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS": "50", + }, + ) + + +def test_list_services_performance( + skip_if_external_envfile_dict: None, + configure_external_registry_access: EnvVarsDict, + configure_number_concurrency_calls: EnvVarsDict, + registry_settings: RegistrySettings, app: FastAPI, + benchmark: BenchmarkFixture, ): - start_time = time.perf_counter() - services = await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) - stop_time = time.perf_counter() - print( - f"\nTime to run getting services: {stop_time - start_time}s, #services {len(services)}, time per call {(stop_time - start_time) / len(services)}s/service" - ) + async def _list_services(): + start_time = time.perf_counter() + services = await registry_proxy.list_services( + app, registry_proxy.ServiceType.ALL + ) + stop_time = time.perf_counter() + print( + f"\nTime to list services: {stop_time - start_time:.3}s, {len(services)} services in {registry_settings.resolved_registry_url}, rate: {(stop_time - start_time) / len(services or [1]):.3}s/service" + ) + + def run_async_test() -> None: + asyncio.get_event_loop().run_until_complete(_list_services()) + + benchmark.pedantic(run_async_test, rounds=5) async def test_generate_service_extras( diff --git a/services/dynamic-sidecar/.env-devel b/services/dynamic-sidecar/.env-devel index 9bccac9239e..e1866cb022a 100644 --- a/services/dynamic-sidecar/.env-devel +++ b/services/dynamic-sidecar/.env-devel @@ -22,7 +22,7 @@ DY_SIDECAR_RUN_ID=1689771013_f7c1bd87-4da5-4709-9471-3d60c8a70639 DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS=false # DOCKER REGISTRY -DY_DEPLOYMENT_REGISTRY_SETTINGS='{"REGISTRY_AUTH":"false","REGISTRY_USER":"test","REGISTRY_PW":"test","REGISTRY_SSL":"false"}' +DY_DEPLOYMENT_REGISTRY_SETTINGS='{"REGISTRY_AUTH":"false","REGISTRY_USER":"test","REGISTRY_PW":"test","REGISTRY_SSL":"false", "REGISTRY_URL": "fake.registry.com"}' S3_ENDPOINT=http://111.111.111.111:12345 S3_ACCESS_KEY=mocked diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index d575cdc0db8..62b93daa25c 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -197,6 +197,7 @@ def base_mock_envs( "REGISTRY_USER": "test", "REGISTRY_PW": "test", "REGISTRY_SSL": "false", + "REGISTRY_URL": "registry.pytest.com", } ), "DYNAMIC_SIDECAR_TRACING": "null", @@ -267,6 +268,7 @@ def mock_environment( "REGISTRY_USER": "test", "REGISTRY_PW": "test", "REGISTRY_SSL": "false", + "REGISTRY_URL": "registry.pytest.com", } ), }, diff --git a/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py b/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py index 667aa30f6a4..e741b11189c 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py +++ b/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py @@ -59,10 +59,11 @@ def mock_environment( "POSTGRES_USER": "test", "POSTGRES_PASSWORD": "test", "POSTGRES_DB": "test", - "REGISTRY_AUTH": "0", - "REGISTRY_USER": "test", - "REGISTRY_PW": "test", - "REGISTRY_SSL": "0", + "REGISTRY_AUTH": f"{faker.pybool()}", + "REGISTRY_USER": faker.user_name(), + "REGISTRY_PW": faker.password(), + "REGISTRY_SSL": f"{faker.pybool()}", + "REGISTRY_URL": faker.url(), **base_mock_envs, }, )