From 8eaeeb03b4ae7187de3091b14ed9a0411e33fb7a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:07:57 +0100 Subject: [PATCH 01/29] add pytest-benchmark --- services/director/requirements/_test.in | 1 + services/director/requirements/_test.txt | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/services/director/requirements/_test.in b/services/director/requirements/_test.in index a6bd90a3acf..b5787990732 100644 --- a/services/director/requirements/_test.in +++ b/services/director/requirements/_test.in @@ -16,6 +16,7 @@ faker jsonref pytest pytest-asyncio +pytest-benchmark pytest-cov pytest-docker pytest-instafail diff --git a/services/director/requirements/_test.txt b/services/director/requirements/_test.txt index 8d14d466266..2e08a63301e 100644 --- a/services/director/requirements/_test.txt +++ b/services/director/requirements/_test.txt @@ -87,10 +87,13 @@ propcache==0.2.0 # -c requirements/_base.txt # aiohttp # yarl +py-cpuinfo==9.0.0 + # via pytest-benchmark pytest==8.3.3 # via # -r requirements/_test.in # pytest-asyncio + # pytest-benchmark # pytest-cov # pytest-docker # pytest-instafail @@ -100,6 +103,8 @@ pytest-asyncio==0.23.8 # via # -c requirements/../../../requirements/constraints.txt # -r requirements/_test.in +pytest-benchmark==5.1.0 + # via -r requirements/_test.in pytest-cov==6.0.0 # via -r requirements/_test.in pytest-docker==3.1.1 From 35de0046671931246fe18d91130693de8ea607f2 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:08:06 +0100 Subject: [PATCH 02/29] add external registry --- services/director/tests/unit/conftest.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/services/director/tests/unit/conftest.py b/services/director/tests/unit/conftest.py index 75ba8e7fd5c..15b7627e29d 100644 --- a/services/director/tests/unit/conftest.py +++ b/services/director/tests/unit/conftest.py @@ -13,6 +13,7 @@ from fastapi import FastAPI from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.docker_registry import RegistrySettings from simcore_service_director.core.application import create_app from simcore_service_director.core.settings import ApplicationSettings @@ -31,12 +32,6 @@ ] -def pytest_addoption(parser): - parser.addoption("--registry_url", action="store", default="default url") - parser.addoption("--registry_user", action="store", default="default user") - parser.addoption("--registry_pw", action="store", default="default pw") - - @pytest.fixture(scope="session") def project_slug_dir(osparc_simcore_root_dir: Path) -> Path: # fixtures in pytest_simcore.environs @@ -87,6 +82,23 @@ def configure_registry_access( ) +@pytest.fixture +def configure_external_registry_access( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, + external_registry_settings: RegistrySettings | None, +) -> EnvVarsDict: + assert external_registry_settings + return app_environment | setenvs_from_dict( + monkeypatch, + envs={ + **external_registry_settings.model_dump(by_alias=True, exclude_none=True), + "REGISTRY_PW": external_registry_settings.REGISTRY_PW.get_secret_value(), + "DIRECTOR_REGISTRY_CACHING": False, + }, + ) + + @pytest.fixture(scope="session") def configure_custom_registry( app_environment: EnvVarsDict, From 8599207ed885bc144de927a389d9acdaa6466836 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:08:31 +0100 Subject: [PATCH 03/29] add prefetching and generators --- .../registry_proxy.py | 120 ++++++++++-------- 1 file changed, 65 insertions(+), 55 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 3099627ddc6..3a7a456ed95 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -1,3 +1,4 @@ +import asyncio import enum import json import logging @@ -5,13 +6,14 @@ from collections.abc import Mapping from http import HTTPStatus from pprint import pformat -from typing import Any, cast +from typing import Any, AsyncGenerator, Final, cast from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] from aiohttp import BasicAuth, ClientSession, client_exceptions from aiohttp.client import ClientTimeout from fastapi import FastAPI -from servicelib.utils import limited_gather +from servicelib.logging_utils import log_catch, log_context +from servicelib.utils import limited_as_completed from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.retry import retry_if_result @@ -271,20 +273,44 @@ async def on_shutdown() -> None: app.add_event_handler("shutdown", on_shutdown) -async def _list_repositories(app: FastAPI) -> list[str]: - logger.debug("listing repositories") - # if there are more repos, the Link will be available in the response headers until none available - path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - repos_list: list = [] - while True: - result, headers = await registry_request(app, path) - if result["repositories"]: - repos_list.extend(result["repositories"]) - if "Link" not in headers: - break - path = str(headers["Link"]).split(";")[0].strip("<>") - logger.debug("listed %s repositories", len(repos_list)) - return repos_list +def _get_prefix(service_type: ServiceType) -> str: + return f"{DIRECTOR_SIMCORE_SERVICES_PREFIX}/{service_type.value}/" + + +_SERVICE_TYPE_FILTER_MAP: Final[dict[ServiceType, tuple[str, ...]]] = { + ServiceType.DYNAMIC: (_get_prefix(ServiceType.DYNAMIC),), + ServiceType.COMPUTATIONAL: (_get_prefix(ServiceType.COMPUTATIONAL),), + ServiceType.ALL: ( + _get_prefix(ServiceType.DYNAMIC), + _get_prefix(ServiceType.COMPUTATIONAL), + ), +} + + +async def _list_repositories_gen( + app: FastAPI, service_type: ServiceType +) -> AsyncGenerator[list[str], None]: + with log_context(logger, logging.DEBUG, msg="listing repositories"): + path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + result, headers = await registry_request(app, path) # initial call + + while True: + if "Link" in headers: + next_path = str(headers["Link"]).split(";")[0].strip("<>") + prefetch_task = asyncio.create_task(registry_request(app, next_path)) + else: + prefetch_task = None + + yield list( + filter( + lambda x: str(x).startswith(_SERVICE_TYPE_FILTER_MAP[service_type]), + result["repositories"], + ) + ) + if prefetch_task: + result, headers = await prefetch_task + else: + return async def list_image_tags(app: FastAPI, image_key: str) -> list[str]: @@ -375,48 +401,36 @@ async def get_image_details( async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]: - image_tags = await list_image_tags(app, image_key) - - results = await limited_gather( - *[get_image_details(app, image_key, tag) for tag in image_tags], - reraise=False, - log=logger, + repo_details = [] + async for image_details_future in limited_as_completed( + (get_image_details(app, image_key, tag) for tag in image_tags), limit=get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, - ) - return [result for result in results if not isinstance(result, BaseException)] + ): + with log_catch(logger, reraise=False): + if image_details := await image_details_future: + repo_details.append(image_details) + return repo_details async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: - logger.debug("getting list of services") - repos = await _list_repositories(app) - # get the services repos - prefixes = [] - if service_type in [ServiceType.DYNAMIC, ServiceType.ALL]: - prefixes.append(_get_prefix(ServiceType.DYNAMIC)) - if service_type in [ServiceType.COMPUTATIONAL, ServiceType.ALL]: - prefixes.append(_get_prefix(ServiceType.COMPUTATIONAL)) - repos = [x for x in repos if str(x).startswith(tuple(prefixes))] - logger.debug("retrieved list of repos : %s", repos) - - # only list as service if it actually contains the necessary labels - results = await limited_gather( - *[get_repo_details(app, repo) for repo in repos], - reraise=False, - log=logger, - limit=get_application_settings( - app - ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, - ) - - return [ - service - for repo_details in results - if isinstance(repo_details, list) - for service in repo_details - ] + with log_context(logger, logging.DEBUG, msg="listing services"): + services = [] + async for repos in _list_repositories_gen(app, service_type): + # only list as service if it actually contains the necessary labels + async for repo_details_future in limited_as_completed( + (get_repo_details(app, repo) for repo in repos), + limit=get_application_settings( + app + ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, + ): + with log_catch(logger, reraise=False): + if repo_details := await repo_details_future: + services.extend(repo_details) + + return services async def list_interactive_service_dependencies( @@ -441,10 +455,6 @@ async def list_interactive_service_dependencies( return dependency_keys -def _get_prefix(service_type: ServiceType) -> str: - return f"{DIRECTOR_SIMCORE_SERVICES_PREFIX}/{service_type.value}/" - - def get_service_first_name(image_key: str) -> str: if str(image_key).startswith(_get_prefix(ServiceType.DYNAMIC)): service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)) :] From 58de7868b33877d6a23e336b0d8627688b687089 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:09:05 +0100 Subject: [PATCH 04/29] added tests with external registry --- .../tests/unit/test_registry_proxy.py | 58 ++++++++++++++++--- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index 2e5738c2670..2f9eec1503d 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -1,13 +1,16 @@ # pylint: disable=W0613, W0621 # pylint: disable=unused-variable +import asyncio import json import time import pytest from fastapi import FastAPI +from pytest_benchmark.plugin import BenchmarkFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.docker_registry import RegistrySettings from simcore_service_director import registry_proxy from simcore_service_director.core.settings import ApplicationSettings @@ -203,6 +206,19 @@ async def test_get_image_details( assert details == service_description +async def test_list_services( + configure_registry_access: EnvVarsDict, + configure_number_concurrency_calls: EnvVarsDict, + app: FastAPI, + push_services, +): + await push_services( + number_of_computational_services=21, number_of_interactive_services=21 + ) + services = await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) + assert len(services) == 42 + + @pytest.fixture def configure_registry_caching( app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch @@ -237,17 +253,41 @@ async def test_registry_caching( print("time to retrieve services with cache: ", time_to_retrieve_with_cache) -@pytest.mark.skip(reason="test needs credentials to real registry") -async def test_get_services_performance( - configure_registry_access: EnvVarsDict, +@pytest.fixture +def configure_number_concurrency_calls( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + envs={ + "DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS": "50", + "DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS": "50", + }, + ) + + +def test_list_services_performance( + configure_external_registry_access: EnvVarsDict, + configure_number_concurrency_calls: EnvVarsDict, + registry_settings: RegistrySettings, app: FastAPI, + benchmark: BenchmarkFixture, ): - start_time = time.perf_counter() - services = await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) - stop_time = time.perf_counter() - print( - f"\nTime to run getting services: {stop_time - start_time}s, #services {len(services)}, time per call {(stop_time - start_time) / len(services)}s/service" - ) + async def _list_services(): + start_time = time.perf_counter() + services = await registry_proxy.list_services( + app, registry_proxy.ServiceType.ALL + ) + stop_time = time.perf_counter() + print( + f"\nTime to list services: {stop_time - start_time:.3}s, {len(services)} services in {registry_settings.resolved_registry_url}, rate: {(stop_time - start_time) / len(services or [1]):.3}s/service" + ) + + def run_async_test() -> None: + asyncio.get_event_loop().run_until_complete(_list_services()) + + benchmark.pedantic(run_async_test, rounds=1) async def test_generate_service_extras( From 2603f5c6513ff4473e4fb5f05244b75c2a97acc6 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:24:16 +0100 Subject: [PATCH 05/29] let's have more generators --- .../registry_proxy.py | 60 ++++++++++++------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 3a7a456ed95..816afc87dbc 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -313,19 +313,35 @@ async def _list_repositories_gen( return +async def list_image_tags_gen( + app: FastAPI, image_key: str +) -> AsyncGenerator[list[str], None]: + with log_context(logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): + path = f"/v2/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + tags, headers = await registry_request(app, path) # initial call + while True: + if "Link" in headers: + next_path = str(headers["Link"]).split(";")[0].strip("<>") + prefetch_task = asyncio.create_task(registry_request(app, next_path)) + else: + prefetch_task = None + + yield list( + filter( + VERSION_REG.match, + tags["tags"], + ) + ) + if prefetch_task: + tags, headers = await prefetch_task + else: + return + + async def list_image_tags(app: FastAPI, image_key: str) -> list[str]: - logger.debug("listing image tags in %s", image_key) - image_tags: list = [] - # get list of image tags - path = f"/v2/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - while True: - tags, headers = await registry_request(app, path) - if tags["tags"]: - image_tags.extend([tag for tag in tags["tags"] if VERSION_REG.match(tag)]) - if "Link" not in headers: - break - path = str(headers["Link"]).split(";")[0].strip("<>") - logger.debug("Found %s image tags in %s", len(image_tags), image_key) + image_tags = [] + async for tags in list_image_tags_gen(app, image_key): + image_tags.extend(tags) return image_tags @@ -401,17 +417,17 @@ async def get_image_details( async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]: - image_tags = await list_image_tags(app, image_key) repo_details = [] - async for image_details_future in limited_as_completed( - (get_image_details(app, image_key, tag) for tag in image_tags), - limit=get_application_settings( - app - ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, - ): - with log_catch(logger, reraise=False): - if image_details := await image_details_future: - repo_details.append(image_details) + async for image_tags in list_image_tags_gen(app, image_key): + async for image_details_future in limited_as_completed( + (get_image_details(app, image_key, tag) for tag in image_tags), + limit=get_application_settings( + app + ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, + ): + with log_catch(logger, reraise=False): + if image_details := await image_details_future: + repo_details.append(image_details) return repo_details From 8e36edcb729e02dd7adb30aba1e548f0c1093fb5 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:27:33 +0100 Subject: [PATCH 06/29] skip test if no external registry set --- services/director/tests/unit/test_registry_proxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index 2f9eec1503d..9c8393155ba 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -268,6 +268,7 @@ def configure_number_concurrency_calls( def test_list_services_performance( + skip_if_external_envfile_dict: None, configure_external_registry_access: EnvVarsDict, configure_number_concurrency_calls: EnvVarsDict, registry_settings: RegistrySettings, From 20f50ec0138013949ac4c36cc44a944c76f69386 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:42:13 +0100 Subject: [PATCH 07/29] unflake test --- services/director/tests/unit/test_registry_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index 9c8393155ba..cab85b1b2d0 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -236,7 +236,7 @@ async def test_registry_caching( push_services, ): images = await push_services( - number_of_computational_services=21, number_of_interactive_services=21 + number_of_computational_services=201, number_of_interactive_services=201 ) assert app_settings.DIRECTOR_REGISTRY_CACHING is True From 6c7234ff309d712ebe4f880c1dca5c5727217e4a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 20:20:46 +0100 Subject: [PATCH 08/29] add httpx client session --- services/director/requirements/_base.in | 3 ++- services/director/requirements/_base.txt | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/services/director/requirements/_base.in b/services/director/requirements/_base.in index af6b80486e8..97b2752fbf0 100644 --- a/services/director/requirements/_base.in +++ b/services/director/requirements/_base.in @@ -15,6 +15,7 @@ aiocache aiodocker fastapi[all] -httpx +httpx[http2] prometheus-client pydantic +tenacity diff --git a/services/director/requirements/_base.txt b/services/director/requirements/_base.txt index d9164137fa0..656861c1ba1 100644 --- a/services/director/requirements/_base.txt +++ b/services/director/requirements/_base.txt @@ -114,6 +114,10 @@ h11==0.14.0 # via # httpcore # uvicorn +h2==4.1.0 + # via httpx +hpack==4.0.0 + # via h2 httpcore==1.0.6 # via httpx httptools==0.6.4 @@ -135,6 +139,8 @@ httpx==0.27.2 # -r requirements/../../../packages/service-library/requirements/_fastapi.in # -r requirements/_base.in # fastapi +hyperframe==6.0.1 + # via h2 idna==3.10 # via # anyio @@ -422,7 +428,9 @@ starlette==0.41.3 # -c requirements/../../../requirements/constraints.txt # fastapi tenacity==9.0.0 - # via -r requirements/../../../packages/service-library/requirements/_base.in + # via + # -r requirements/../../../packages/service-library/requirements/_base.in + # -r requirements/_base.in toolz==1.0.0 # via -r requirements/../../../packages/service-library/requirements/_base.in tqdm==4.67.0 From 2e6fd9099a22b0eb96739f12ccd2c311d579c09c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 20:20:55 +0100 Subject: [PATCH 09/29] migrated to httpx --- .../client_session.py | 31 ++++--------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/services/director/src/simcore_service_director/client_session.py b/services/director/src/simcore_service_director/client_session.py index b053d06c456..de700737399 100644 --- a/services/director/src/simcore_service_director/client_session.py +++ b/services/director/src/simcore_service_director/client_session.py @@ -1,41 +1,22 @@ -from aiohttp import ClientSession, ClientTimeout -from common_library.json_serialization import json_dumps +import httpx from fastapi import FastAPI -from servicelib.utils import ( - get_http_client_request_aiohttp_connect_timeout, - get_http_client_request_aiohttp_sock_connect_timeout, - get_http_client_request_total_timeout, -) def setup_client_session(app: FastAPI) -> None: async def on_startup() -> None: - # SEE https://github.com/ITISFoundation/osparc-simcore/issues/4628 - - # ANE: it is important to have fast connection handshakes - # also requests should be as fast as possible - # some services are not that fast to reply - timeout_settings = ClientTimeout( - total=get_http_client_request_total_timeout(), - connect=get_http_client_request_aiohttp_connect_timeout(), - sock_connect=get_http_client_request_aiohttp_sock_connect_timeout(), - ) - session = ClientSession( - timeout=timeout_settings, - json_serialize=json_dumps, - ) + session = httpx.AsyncClient(transport=httpx.AsyncHTTPTransport(http2=True)) app.state.aiohttp_client_session = session async def on_shutdown() -> None: session = app.state.aiohttp_client_session - assert isinstance(session, ClientSession) # nosec - await session.close() + assert isinstance(session, httpx.AsyncClient) # nosec + await session.aclose() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) -def get_client_session(app: FastAPI) -> ClientSession: +def get_client_session(app: FastAPI) -> httpx.AsyncClient: session = app.state.aiohttp_client_session - assert isinstance(session, ClientSession) # nosec + assert isinstance(session, httpx.AsyncClient) # nosec return session From f99ff9aa70d89e9222c21aab4f85242939f59298 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 20:21:15 +0100 Subject: [PATCH 10/29] migrated to httpx and added retrials --- .../registry_proxy.py | 249 +++++++++--------- 1 file changed, 131 insertions(+), 118 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 816afc87dbc..9df9ddb7981 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -3,21 +3,20 @@ import json import logging import re -from collections.abc import Mapping -from http import HTTPStatus +from collections.abc import AsyncGenerator, Mapping from pprint import pformat -from typing import Any, AsyncGenerator, Final, cast +from typing import Any, Final, cast +import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] -from aiohttp import BasicAuth, ClientSession, client_exceptions -from aiohttp.client import ClientTimeout from fastapi import FastAPI from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_as_completed from tenacity import retry from tenacity.before_sleep import before_sleep_log -from tenacity.retry import retry_if_result -from tenacity.wait import wait_fixed +from tenacity.retry import retry_if_exception_type, retry_if_result +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed, wait_random_exponential from yarl import URL from .client_session import get_client_session @@ -39,7 +38,7 @@ r"^(0|[1-9]\d*)(\.(0|[1-9]\d*)){2}(-(0|[1-9]\d*|\d*[-a-zA-Z][-\da-zA-Z]*)(\.(0|[1-9]\d*|\d*[-a-zA-Z][-\da-zA-Z]*))*)?(\+[-\da-zA-Z]+(\.[-\da-zA-Z-]+)*)?$" ) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) # # NOTE: if you are refactoring this module, @@ -64,13 +63,13 @@ async def _basic_auth_registry_request( url = URL( f"{'https' if app_settings.DIRECTOR_REGISTRY.REGISTRY_SSL else 'http'}://{app_settings.DIRECTOR_REGISTRY.REGISTRY_URL}{path}" ) - logger.debug("Requesting registry using %s", url) + _logger.debug("Requesting registry using %s", url) # try the registry with basic authentication first, spare 1 call resp_data: dict = {} resp_headers: Mapping = {} auth = ( - BasicAuth( - login=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, + httpx.BasicAuth( + username=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, password=app_settings.DIRECTOR_REGISTRY.REGISTRY_PW.get_secret_value(), ) if app_settings.DIRECTOR_REGISTRY.REGISTRY_AUTH @@ -80,41 +79,36 @@ async def _basic_auth_registry_request( ) session = get_client_session(app) - try: - async with session.request( - method.lower(), url, auth=auth, **session_kwargs - ) as response: - if response.status == HTTPStatus.UNAUTHORIZED: - logger.debug("Registry unauthorized request: %s", await response.text()) - # basic mode failed, test with other auth mode - resp_data, resp_headers = await _auth_registry_request( - app_settings, - url, - method, - response.headers, - session, - **session_kwargs, - ) - elif response.status == HTTPStatus.NOT_FOUND: - raise ServiceNotAvailableError(service_name=path) + response = await session.request( + method.lower(), f"{url}", auth=auth, **session_kwargs + ) - elif response.status > 399: - logger.exception( - "Unknown error while accessing registry: %s", str(response) - ) - raise RegistryConnectionError(msg=str(response)) + if response.status_code == httpx.codes.UNAUTHORIZED: + _logger.debug("Registry unauthorized request: %s", response.text) + # basic mode failed, test with other auth mode + resp_data, resp_headers = await _auth_registry_request( + app_settings, + url, + method, + response.headers, + session, + **session_kwargs, + ) - else: - # registry that does not need an auth - resp_data = await response.json(content_type=None) - resp_headers = response.headers - - return (resp_data, resp_headers) - except client_exceptions.ClientError as exc: - logger.exception("Unknown error while accessing registry") - msg = f"Unknown error while accessing registry: {exc!s}" - raise DirectorRuntimeError(msg=msg) from exc + elif response.status_code == httpx.codes.NOT_FOUND: + raise ServiceNotAvailableError(service_name=path) + + elif response.status_code > 399: + _logger.exception("Unknown error while accessing registry: %s", str(response)) + raise RegistryConnectionError(msg=str(response)) + + else: + # registry that does not need an auth + resp_data = response.json() + resp_headers = response.headers + + return (resp_data, resp_headers) async def _auth_registry_request( @@ -122,7 +116,7 @@ async def _auth_registry_request( url: URL, method: str, auth_headers: Mapping, - session: ClientSession, + session: httpx.AsyncClient, **kwargs, ) -> tuple[dict, Mapping]: if ( @@ -146,8 +140,8 @@ async def _auth_registry_request( if not auth_type: msg = "Unknown registry type: cannot deduce authentication method!" raise RegistryConnectionError(msg=msg) - auth = BasicAuth( - login=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, + auth = httpx.BasicAuth( + username=app_settings.DIRECTOR_REGISTRY.REGISTRY_USER, password=app_settings.DIRECTOR_REGISTRY.REGISTRY_PW.get_secret_value(), ) @@ -157,58 +151,71 @@ async def _auth_registry_request( token_url = URL(auth_details["realm"]).with_query( service=auth_details["service"], scope=auth_details["scope"] ) - async with session.get(token_url, auth=auth, **kwargs) as token_resp: - if token_resp.status != HTTPStatus.OK: - msg = f"Unknown error while authentifying with registry: {token_resp!s}" - raise RegistryConnectionError(msg=msg) - bearer_code = (await token_resp.json())["token"] - headers = {"Authorization": f"Bearer {bearer_code}"} - async with getattr(session, method.lower())( - url, headers=headers, **kwargs - ) as resp_wtoken: - if resp_wtoken.status == HTTPStatus.NOT_FOUND: - logger.exception("path to registry not found: %s", url) - raise ServiceNotAvailableError(service_name=f"{url}") - if resp_wtoken.status > 399: - logger.exception( - "Unknown error while accessing with token authorized registry: %s", - str(resp_wtoken), - ) - raise RegistryConnectionError(msg=f"{resp_wtoken}") - resp_data = await resp_wtoken.json(content_type=None) - resp_headers = resp_wtoken.headers - return (resp_data, resp_headers) - elif auth_type == "Basic": + token_resp = await session.get(f"{token_url}", auth=auth, **kwargs) + if token_resp.status_code != httpx.codes.OK: + msg = f"Unknown error while authentifying with registry: {token_resp!s}" + raise RegistryConnectionError(msg=msg) + + bearer_code = (await token_resp.json())["token"] + headers = {"Authorization": f"Bearer {bearer_code}"} + resp_wtoken = await getattr(session, method.lower())( + url, headers=headers, **kwargs + ) + assert isinstance(resp_wtoken, httpx.Response) # nosec + if resp_wtoken.status_code == httpx.codes.NOT_FOUND: + _logger.exception("path to registry not found: %s", url) + raise ServiceNotAvailableError(service_name=f"{url}") + if resp_wtoken.status_code > 399: + _logger.exception( + "Unknown error while accessing with token authorized registry: %s", + str(resp_wtoken), + ) + raise RegistryConnectionError(msg=f"{resp_wtoken}") + resp_data = await resp_wtoken.json(content_type=None) + resp_headers = resp_wtoken.headers + return (resp_data, resp_headers) + if auth_type == "Basic": # basic authentication should not be since we tried already... - async with getattr(session, method.lower())( - url, auth=auth, **kwargs - ) as resp_wbasic: - if resp_wbasic.status == HTTPStatus.NOT_FOUND: - logger.exception("path to registry not found: %s", url) - raise ServiceNotAvailableError(service_name=f"{url}") - if resp_wbasic.status > 399: - logger.exception( - "Unknown error while accessing with token authorized registry: %s", - str(resp_wbasic), - ) - raise RegistryConnectionError(msg=f"{resp_wbasic}") - resp_data = await resp_wbasic.json(content_type=None) - resp_headers = resp_wbasic.headers - return (resp_data, resp_headers) + resp_wbasic = await getattr(session, method.lower())(url, auth=auth, **kwargs) + assert isinstance(resp_wbasic, httpx.Response) # nosec + + if resp_wbasic.status_code == httpx.codes.NOT_FOUND: + _logger.exception("path to registry not found: %s", url) + raise ServiceNotAvailableError(service_name=f"{url}") + if resp_wbasic.status_code > 399: + _logger.exception( + "Unknown error while accessing with token authorized registry: %s", + str(resp_wbasic), + ) + raise RegistryConnectionError(msg=f"{resp_wbasic}") + resp_data = await resp_wbasic.json(content_type=None) + resp_headers = resp_wbasic.headers + return (resp_data, resp_headers) msg = f"Unknown registry authentification type: {url}" raise RegistryConnectionError(msg=msg) +@retry( + retry=retry_if_exception_type((httpx.RequestError, TimeoutError)), + wait=wait_random_exponential(min=1, max=10), + stop=stop_after_delay(120), + before_sleep=before_sleep_log(_logger, logging.WARNING), + reraise=True, +) +async def _retried_request( + app: FastAPI, path: str, method: str, **session_kwargs +) -> tuple[dict, Mapping]: + return await _basic_auth_registry_request(app, path, method, **session_kwargs) + + async def registry_request( app: FastAPI, + *, path: str, method: str = "GET", no_cache: bool = False, **session_kwargs, ) -> tuple[dict, Mapping]: - logger.debug( - "Request to registry: path=%s, method=%s. no_cache=%s", path, method, no_cache - ) cache: SimpleMemoryCache = app.state.registry_cache_memory cache_key = f"{method}_{path}" if not no_cache and (cached_response := await cache.get(cache_key)): @@ -216,9 +223,13 @@ async def registry_request( return cast(tuple[dict, Mapping], cached_response) app_settings = get_application_settings(app) - response, response_headers = await _basic_auth_registry_request( - app, path, method, **session_kwargs - ) + try: + response, response_headers = await _retried_request( + app, path, method, **session_kwargs + ) + except httpx.RequestError as exc: + msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}" + raise DirectorRuntimeError(msg=msg) from exc if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET": await cache.set( @@ -233,29 +244,27 @@ async def registry_request( async def _is_registry_responsive(app: FastAPI) -> bool: path = "/v2/" try: - await registry_request( - app, path, no_cache=True, timeout=ClientTimeout(total=1.0) - ) + await _basic_auth_registry_request(app, path=path, method="GET", timeout=1.0) return True - except (TimeoutError, DirectorRuntimeError) as exc: - logger.debug("Registry not responsive: %s", exc) + except (httpx.RequestError, DirectorRuntimeError) as exc: + _logger.debug("Registry not responsive: %s", exc) return False async def _setup_registry(app: FastAPI) -> None: - logger.debug("pinging registry...") + _logger.debug("pinging registry...") @retry( - wait=wait_fixed(2), - before_sleep=before_sleep_log(logger, logging.WARNING), + wait=wait_fixed(1), + before_sleep=before_sleep_log(_logger, logging.WARNING), retry=retry_if_result(lambda result: result is False), reraise=True, ) - async def wait_until_registry_responsive(app: FastAPI) -> bool: + async def _wait_until_registry_responsive(app: FastAPI) -> bool: return await _is_registry_responsive(app) - await wait_until_registry_responsive(app) - logger.info("Connected to docker registry") + await _wait_until_registry_responsive(app) + _logger.info("Connected to docker registry") def setup(app: FastAPI) -> None: @@ -290,14 +299,16 @@ def _get_prefix(service_type: ServiceType) -> str: async def _list_repositories_gen( app: FastAPI, service_type: ServiceType ) -> AsyncGenerator[list[str], None]: - with log_context(logger, logging.DEBUG, msg="listing repositories"): + with log_context(_logger, logging.DEBUG, msg="listing repositories"): path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - result, headers = await registry_request(app, path) # initial call + result, headers = await registry_request(app, path=path) # initial call while True: if "Link" in headers: next_path = str(headers["Link"]).split(";")[0].strip("<>") - prefetch_task = asyncio.create_task(registry_request(app, next_path)) + prefetch_task = asyncio.create_task( + registry_request(app, path=next_path) + ) else: prefetch_task = None @@ -316,13 +327,15 @@ async def _list_repositories_gen( async def list_image_tags_gen( app: FastAPI, image_key: str ) -> AsyncGenerator[list[str], None]: - with log_context(logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): + with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): path = f"/v2/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - tags, headers = await registry_request(app, path) # initial call + tags, headers = await registry_request(app, path=path) # initial call while True: if "Link" in headers: next_path = str(headers["Link"]).split(";")[0].strip("<>") - prefetch_task = asyncio.create_task(registry_request(app, next_path)) + prefetch_task = asyncio.create_task( + registry_request(app, path=next_path) + ) else: prefetch_task = None @@ -356,7 +369,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ path = f"/v2/{image}/manifests/{tag}" - _, headers = await registry_request(app, path) + _, headers = await registry_request(app, path=path) headers = headers or {} return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) @@ -367,9 +380,9 @@ async def get_image_labels( ) -> tuple[dict[str, str], str | None]: """Returns image labels and the image manifest digest""" - logger.debug("getting image labels of %s:%s", image, tag) + _logger.debug("getting image labels of %s:%s", image, tag) path = f"/v2/{image}/manifests/{tag}" - request_result, headers = await registry_request(app, path) + request_result, headers = await registry_request(app, path=path) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( "container_config", v1_compatibility_key["config"] @@ -379,7 +392,7 @@ async def get_image_labels( headers = headers or {} manifest_digest: str | None = headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) - logger.debug("retrieved labels of image %s:%s", image, tag) + _logger.debug("retrieved labels of image %s:%s", image, tag) return (labels, manifest_digest) @@ -425,14 +438,14 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]] app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, ): - with log_catch(logger, reraise=False): + with log_catch(_logger, reraise=False): if image_details := await image_details_future: repo_details.append(image_details) return repo_details async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: - with log_context(logger, logging.DEBUG, msg="listing services"): + with log_context(_logger, logging.DEBUG, msg="listing services"): services = [] async for repos in _list_repositories_gen(app, service_type): # only list as service if it actually contains the necessary labels @@ -442,7 +455,7 @@ async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, ): - with log_catch(logger, reraise=False): + with log_catch(_logger, reraise=False): if repo_details := await repo_details_future: services.extend(repo_details) @@ -481,7 +494,7 @@ def get_service_first_name(image_key: str) -> str: else: return "invalid service" - logger.debug( + _logger.debug( "retrieved service name from repo %s : %s", image_key, service_name_suffixes ) return service_name_suffixes.split("/")[0] @@ -497,7 +510,7 @@ def get_service_last_names(image_key: str) -> str: else: return "invalid service" service_last_name = str(service_name_suffixes).replace("/", "_") - logger.debug( + _logger.debug( "retrieved service last name from repo %s : %s", image_key, service_last_name ) return service_last_name @@ -532,7 +545,7 @@ async def get_service_extras( } labels, _ = await get_image_labels(app, image_key, image_tag) - logger.debug("Compiling service extras from labels %s", pformat(labels)) + _logger.debug("Compiling service extras from labels %s", pformat(labels)) if SERVICE_RUNTIME_SETTINGS in labels: service_settings: list[dict[str, Any]] = json.loads( @@ -583,7 +596,7 @@ async def get_service_extras( invalid_with_msg = f"invalid container_spec [{entry_value}]" if invalid_with_msg: - logger.warning( + _logger.warning( "%s entry [%s] encoded in settings labels of service image %s:%s", invalid_with_msg, entry, @@ -600,6 +613,6 @@ async def get_service_extras( } ) - logger.debug("Following service extras were compiled: %s", pformat(result)) + _logger.debug("Following service extras were compiled: %s", pformat(result)) return result From 181c80fae7f370ae5d44d6f73bc7fc1f2899bf48 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 20:28:01 +0100 Subject: [PATCH 11/29] 5 rounds --- services/director/tests/unit/test_registry_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index cab85b1b2d0..7861179323c 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -288,7 +288,7 @@ async def _list_services(): def run_async_test() -> None: asyncio.get_event_loop().run_until_complete(_list_services()) - benchmark.pedantic(run_async_test, rounds=1) + benchmark.pedantic(run_async_test, rounds=5) async def test_generate_service_extras( From b9dccf94e515ad4e834d1230b6c8cea48ea7f1a2 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:08:52 +0100 Subject: [PATCH 12/29] @pcrespov review: use fastapi.status --- .../simcore_service_director/registry_proxy.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 9df9ddb7981..22734a69b2c 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -9,7 +9,7 @@ import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] -from fastapi import FastAPI +from fastapi import FastAPI, status from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_as_completed from tenacity import retry @@ -84,7 +84,7 @@ async def _basic_auth_registry_request( method.lower(), f"{url}", auth=auth, **session_kwargs ) - if response.status_code == httpx.codes.UNAUTHORIZED: + if response.status_code == status.HTTP_401_UNAUTHORIZED: _logger.debug("Registry unauthorized request: %s", response.text) # basic mode failed, test with other auth mode resp_data, resp_headers = await _auth_registry_request( @@ -96,10 +96,10 @@ async def _basic_auth_registry_request( **session_kwargs, ) - elif response.status_code == httpx.codes.NOT_FOUND: + elif response.status_code == status.HTTP_404_NOT_FOUND: raise ServiceNotAvailableError(service_name=path) - elif response.status_code > 399: + elif response.status_code >= status.HTTP_400_BAD_REQUEST: _logger.exception("Unknown error while accessing registry: %s", str(response)) raise RegistryConnectionError(msg=str(response)) @@ -152,7 +152,7 @@ async def _auth_registry_request( service=auth_details["service"], scope=auth_details["scope"] ) token_resp = await session.get(f"{token_url}", auth=auth, **kwargs) - if token_resp.status_code != httpx.codes.OK: + if token_resp.status_code != status.HTTP_200_OK: msg = f"Unknown error while authentifying with registry: {token_resp!s}" raise RegistryConnectionError(msg=msg) @@ -162,10 +162,10 @@ async def _auth_registry_request( url, headers=headers, **kwargs ) assert isinstance(resp_wtoken, httpx.Response) # nosec - if resp_wtoken.status_code == httpx.codes.NOT_FOUND: + if resp_wtoken.status_code == status.HTTP_404_NOT_FOUND: _logger.exception("path to registry not found: %s", url) raise ServiceNotAvailableError(service_name=f"{url}") - if resp_wtoken.status_code > 399: + if resp_wtoken.status_code >= status.HTTP_400_BAD_REQUEST: _logger.exception( "Unknown error while accessing with token authorized registry: %s", str(resp_wtoken), @@ -179,10 +179,10 @@ async def _auth_registry_request( resp_wbasic = await getattr(session, method.lower())(url, auth=auth, **kwargs) assert isinstance(resp_wbasic, httpx.Response) # nosec - if resp_wbasic.status_code == httpx.codes.NOT_FOUND: + if resp_wbasic.status_code == status.HTTP_404_NOT_FOUND: _logger.exception("path to registry not found: %s", url) raise ServiceNotAvailableError(service_name=f"{url}") - if resp_wbasic.status_code > 399: + if resp_wbasic.status_code >= status.HTTP_400_BAD_REQUEST: _logger.exception( "Unknown error while accessing with token authorized registry: %s", str(resp_wbasic), From 886d683f87af232e7ae8ea9d2ccc93c325d62795 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:10:37 +0100 Subject: [PATCH 13/29] cleanup --- .../director/src/simcore_service_director/core/errors.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/services/director/src/simcore_service_director/core/errors.py b/services/director/src/simcore_service_director/core/errors.py index 23ca2fc17fc..c7113baa402 100644 --- a/services/director/src/simcore_service_director/core/errors.py +++ b/services/director/src/simcore_service_director/core/errors.py @@ -1,12 +1,7 @@ -from typing import Any - from common_library.errors_classes import OsparcErrorMixin class DirectorRuntimeError(OsparcErrorMixin, RuntimeError): - def __init__(self, **ctx: Any) -> None: - super().__init__(**ctx) - msg_template: str = "Director-v0 unexpected error: {msg}" From b7e3acfb2ed27ca10b2de13d12d192bbb25c6459 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:10:45 +0100 Subject: [PATCH 14/29] ruff --- .../director/src/simcore_service_director/registry_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 22734a69b2c..e73e485cccd 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -111,7 +111,7 @@ async def _basic_auth_registry_request( return (resp_data, resp_headers) -async def _auth_registry_request( +async def _auth_registry_request( # noqa: C901 app_settings: ApplicationSettings, url: URL, method: str, From 5bcaea30ee3297e60187efd606837698eb999a0b Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:30:23 +0100 Subject: [PATCH 15/29] make sure the settings take care of the registry settings being correct --- .../src/settings_library/docker_registry.py | 42 ++++++++++++++-- .../registry_proxy.py | 48 ++++--------------- 2 files changed, 47 insertions(+), 43 deletions(-) diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index daef990abf1..45afd4c7310 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -1,7 +1,14 @@ from functools import cached_property -from typing import Any +from typing import Any, Self -from pydantic import Field, SecretStr, field_validator +from pydantic import ( + AnyHttpUrl, + Field, + SecretStr, + TypeAdapter, + field_validator, + model_validator, +) from pydantic_settings import SettingsConfigDict from .base import BaseCustomSettings @@ -12,10 +19,16 @@ class RegistrySettings(BaseCustomSettings): REGISTRY_PATH: str | None = Field( default=None, # This is useful in case of a local registry, where the registry url (path) is relative to the host docker engine" - description="development mode only, in case a local registry is used", + description="development mode only, in case a local registry is used - " + "this is the hostname to the docker registry as seen from inside the container", ) # NOTE: name is missleading, http or https protocol are not included - REGISTRY_URL: str = Field(default="", description="address to the docker registry") + REGISTRY_URL: str = Field( + ..., + description="hostname of docker registry (without protocol but with port if available) - " + "typically used by the host machine docker engine", + min_length=1, + ) REGISTRY_USER: str = Field( ..., description="username to access the docker registry" @@ -23,13 +36,24 @@ class RegistrySettings(BaseCustomSettings): REGISTRY_PW: SecretStr = Field( ..., description="password to access the docker registry" ) - REGISTRY_SSL: bool = Field(..., description="access to registry through ssl") + REGISTRY_SSL: bool = Field( + ..., description="True if docker registry is using HTTPS protocol" + ) @field_validator("REGISTRY_PATH", mode="before") @classmethod def _escape_none_string(cls, v) -> Any | None: return None if v == "None" else v + @model_validator(mode="after") + def check_registry_authentication(self: Self) -> Self: + if self.REGISTRY_AUTH and any( + not v for v in (self.REGISTRY_USER, self.REGISTRY_PW) + ): + msg = "If REGISTRY_AUTH is True, both REGISTRY_USER and REGISTRY_PW must be provided" + raise ValueError(msg) + return self + @cached_property def resolved_registry_url(self) -> str: return self.REGISTRY_PATH or self.REGISTRY_URL @@ -38,6 +62,14 @@ def resolved_registry_url(self) -> str: def api_url(self) -> str: return f"{self.REGISTRY_URL}/v2" + @cached_property + def registry_url(self) -> AnyHttpUrl: + """returns the full URL to the Docker Registry for use by docker engine""" + protocol = "https" if self.REGISTRY_SSL else "http" + return TypeAdapter(AnyHttpUrl).validate_python( + f"{protocol}://{self.REGISTRY_URL}" + ) + model_config = SettingsConfigDict( json_schema_extra={ "examples": [ diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index e73e485cccd..d8896e9bba6 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -56,14 +56,6 @@ async def _basic_auth_registry_request( app: FastAPI, path: str, method: str, **session_kwargs ) -> tuple[dict, Mapping]: app_settings = get_application_settings(app) - if not app_settings.DIRECTOR_REGISTRY.REGISTRY_URL: - msg = "URL to registry is not defined" - raise DirectorRuntimeError(msg=msg) - - url = URL( - f"{'https' if app_settings.DIRECTOR_REGISTRY.REGISTRY_SSL else 'http'}://{app_settings.DIRECTOR_REGISTRY.REGISTRY_URL}{path}" - ) - _logger.debug("Requesting registry using %s", url) # try the registry with basic authentication first, spare 1 call resp_data: dict = {} resp_headers: Mapping = {} @@ -73,23 +65,22 @@ async def _basic_auth_registry_request( password=app_settings.DIRECTOR_REGISTRY.REGISTRY_PW.get_secret_value(), ) if app_settings.DIRECTOR_REGISTRY.REGISTRY_AUTH - and app_settings.DIRECTOR_REGISTRY.REGISTRY_USER - and app_settings.DIRECTOR_REGISTRY.REGISTRY_PW else None ) session = get_client_session(app) - response = await session.request( - method.lower(), f"{url}", auth=auth, **session_kwargs + method.lower(), + f"{app_settings.DIRECTOR_REGISTRY.registry_url}", + auth=auth, + **session_kwargs, ) if response.status_code == status.HTTP_401_UNAUTHORIZED: - _logger.debug("Registry unauthorized request: %s", response.text) # basic mode failed, test with other auth mode resp_data, resp_headers = await _auth_registry_request( app_settings, - url, + URL(f"{app_settings.DIRECTOR_REGISTRY.registry_url}"), method, response.headers, session, @@ -100,7 +91,6 @@ async def _basic_auth_registry_request( raise ServiceNotAvailableError(service_name=path) elif response.status_code >= status.HTTP_400_BAD_REQUEST: - _logger.exception("Unknown error while accessing registry: %s", str(response)) raise RegistryConnectionError(msg=str(response)) else: @@ -119,13 +109,6 @@ async def _auth_registry_request( # noqa: C901 session: httpx.AsyncClient, **kwargs, ) -> tuple[dict, Mapping]: - if ( - not app_settings.DIRECTOR_REGISTRY.REGISTRY_AUTH - or not app_settings.DIRECTOR_REGISTRY.REGISTRY_USER - or not app_settings.DIRECTOR_REGISTRY.REGISTRY_PW - ): - msg = "Wrong configuration: Authentication to registry is needed!" - raise RegistryConnectionError(msg=msg) # auth issue let's try some authentication get the auth type auth_type = None auth_details: dict[str, str] = {} @@ -163,13 +146,8 @@ async def _auth_registry_request( # noqa: C901 ) assert isinstance(resp_wtoken, httpx.Response) # nosec if resp_wtoken.status_code == status.HTTP_404_NOT_FOUND: - _logger.exception("path to registry not found: %s", url) raise ServiceNotAvailableError(service_name=f"{url}") if resp_wtoken.status_code >= status.HTTP_400_BAD_REQUEST: - _logger.exception( - "Unknown error while accessing with token authorized registry: %s", - str(resp_wtoken), - ) raise RegistryConnectionError(msg=f"{resp_wtoken}") resp_data = await resp_wtoken.json(content_type=None) resp_headers = resp_wtoken.headers @@ -178,15 +156,9 @@ async def _auth_registry_request( # noqa: C901 # basic authentication should not be since we tried already... resp_wbasic = await getattr(session, method.lower())(url, auth=auth, **kwargs) assert isinstance(resp_wbasic, httpx.Response) # nosec - if resp_wbasic.status_code == status.HTTP_404_NOT_FOUND: - _logger.exception("path to registry not found: %s", url) raise ServiceNotAvailableError(service_name=f"{url}") if resp_wbasic.status_code >= status.HTTP_400_BAD_REQUEST: - _logger.exception( - "Unknown error while accessing with token authorized registry: %s", - str(resp_wbasic), - ) raise RegistryConnectionError(msg=f"{resp_wbasic}") resp_data = await resp_wbasic.json(content_type=None) resp_headers = resp_wbasic.headers @@ -246,8 +218,7 @@ async def _is_registry_responsive(app: FastAPI) -> bool: try: await _basic_auth_registry_request(app, path=path, method="GET", timeout=1.0) return True - except (httpx.RequestError, DirectorRuntimeError) as exc: - _logger.debug("Registry not responsive: %s", exc) + except (httpx.RequestError, DirectorRuntimeError): return False @@ -447,13 +418,14 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]] async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: with log_context(_logger, logging.DEBUG, msg="listing services"): services = [] + concurrency_limit = get_application_settings( + app + ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS async for repos in _list_repositories_gen(app, service_type): # only list as service if it actually contains the necessary labels async for repo_details_future in limited_as_completed( (get_repo_details(app, repo) for repo in repos), - limit=get_application_settings( - app - ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, + limit=concurrency_limit, ): with log_catch(_logger, reraise=False): if repo_details := await repo_details_future: From 93da4df90c971e0a9a5627e69824ccf572239fa3 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:49:08 +0100 Subject: [PATCH 16/29] cleanup --- .../registry_proxy.py | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index d8896e9bba6..21e9f6b96f8 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -14,7 +14,7 @@ from servicelib.utils import limited_as_completed from tenacity import retry from tenacity.before_sleep import before_sleep_log -from tenacity.retry import retry_if_exception_type, retry_if_result +from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed, wait_random_exponential from yarl import URL @@ -68,10 +68,12 @@ async def _basic_auth_registry_request( else None ) + request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.registry_url}").with_path(path) + session = get_client_session(app) response = await session.request( method.lower(), - f"{app_settings.DIRECTOR_REGISTRY.registry_url}", + f"{request_url}", auth=auth, **session_kwargs, ) @@ -80,7 +82,7 @@ async def _basic_auth_registry_request( # basic mode failed, test with other auth mode resp_data, resp_headers = await _auth_registry_request( app_settings, - URL(f"{app_settings.DIRECTOR_REGISTRY.registry_url}"), + request_url, method, response.headers, session, @@ -213,29 +215,22 @@ async def registry_request( return response, response_headers -async def _is_registry_responsive(app: FastAPI) -> bool: - path = "/v2/" - try: - await _basic_auth_registry_request(app, path=path, method="GET", timeout=1.0) - return True - except (httpx.RequestError, DirectorRuntimeError): - return False +async def _is_registry_responsive(app: FastAPI) -> None: + await _basic_auth_registry_request(app, path="/v2/", method="GET", timeout=1.0) async def _setup_registry(app: FastAPI) -> None: - _logger.debug("pinging registry...") - @retry( wait=wait_fixed(1), before_sleep=before_sleep_log(_logger, logging.WARNING), - retry=retry_if_result(lambda result: result is False), + retry=retry_if_exception_type((httpx.RequestError, DirectorRuntimeError)), reraise=True, ) - async def _wait_until_registry_responsive(app: FastAPI) -> bool: - return await _is_registry_responsive(app) + async def _wait_until_registry_responsive(app: FastAPI) -> None: + await _is_registry_responsive(app) - await _wait_until_registry_responsive(app) - _logger.info("Connected to docker registry") + with log_context(_logger, logging.INFO, msg="Connecting to docker registry"): + await _wait_until_registry_responsive(app) def setup(app: FastAPI) -> None: From cb37f16b7df046cb323d0f22f95d0777c87ef850 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:56:17 +0100 Subject: [PATCH 17/29] missing refactoring --- .../src/simcore_service_director/producer.py | 101 ++++++++---------- 1 file changed, 45 insertions(+), 56 deletions(-) diff --git a/services/director/src/simcore_service_director/producer.py b/services/director/src/simcore_service_director/producer.py index ff791a4066f..79e695ead57 100644 --- a/services/director/src/simcore_service_director/producer.py +++ b/services/director/src/simcore_service_director/producer.py @@ -5,32 +5,22 @@ import re from datetime import timedelta from enum import Enum -from http import HTTPStatus from pprint import pformat from typing import Any, Final, cast import aiodocker import aiodocker.networks -import aiohttp import arrow +import httpx import tenacity -from aiohttp import ( - ClientConnectionError, - ClientError, - ClientResponse, - ClientResponseError, - ClientSession, - ClientTimeout, -) -from fastapi import FastAPI +from fastapi import FastAPI, status from packaging.version import Version from servicelib.async_utils import run_sequentially_in_context from servicelib.docker_utils import to_datetime from settings_library.docker_registry import RegistrySettings -from tenacity import retry +from tenacity import retry, wait_random_exponential from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_attempt -from tenacity.wait import wait_fixed from . import docker_utils, registry_proxy from .client_session import get_client_session @@ -547,7 +537,7 @@ async def _pass_port_to_service( service_name: str, port: str, service_boot_parameters_labels: list[Any], - session: ClientSession, + session: httpx.AsyncClient, app_settings: ApplicationSettings, ) -> None: for param in service_boot_parameters_labels: @@ -566,8 +556,8 @@ async def _pass_port_to_service( "port": str(port), } _logger.debug("creating request %s and query %s", service_url, query_string) - async with session.post(service_url, data=query_string) as response: - _logger.debug("query response: %s", await response.text()) + response = await session.post(service_url, data=query_string) + _logger.debug("query response: %s", response.text) return _logger.debug("service %s does not need to know its external port", service_name) @@ -1149,50 +1139,48 @@ async def get_service_details(app: FastAPI, node_uuid: str) -> dict: @retry( - wait=wait_fixed(2), + wait=wait_random_exponential(min=1, max=5), stop=stop_after_attempt(3), reraise=True, - retry=retry_if_exception_type(ClientConnectionError), + retry=retry_if_exception_type(httpx.RequestError), ) async def _save_service_state( - service_host_name: str, session: aiohttp.ClientSession + service_host_name: str, session: httpx.AsyncClient ) -> None: - response: ClientResponse - async with session.post( - url=f"http://{service_host_name}/state", # NOSONAR - timeout=ClientTimeout( - ServicesCommonSettings().director_dynamic_service_save_timeout - ), - ) as response: - try: - response.raise_for_status() + try: + response = await session.post( + url=f"http://{service_host_name}/state", # NOSONAR + timeout=ServicesCommonSettings().director_dynamic_service_save_timeout, + ) + response.raise_for_status() - except ClientResponseError as err: - if err.status in ( - HTTPStatus.METHOD_NOT_ALLOWED, - HTTPStatus.NOT_FOUND, - HTTPStatus.NOT_IMPLEMENTED, - ): - # NOTE: Legacy Override. Some old services do not have a state entrypoint defined - # therefore we assume there is nothing to be saved and do not raise exception - # Responses found so far: - # METHOD NOT ALLOWED https://httpstatuses.com/405 - # NOT FOUND https://httpstatuses.com/404 - # - _logger.warning( - "Service '%s' does not seem to implement save state functionality: %s. Skipping save", - service_host_name, - err, - ) - else: - # upss ... could service had troubles saving, reraise - raise - else: - _logger.info( - "Service '%s' successfully saved its state: %s", + except httpx.HTTPStatusError as err: + + if err.response.status_code in ( + status.HTTP_405_METHOD_NOT_ALLOWED, + status.HTTP_404_NOT_FOUND, + status.HTTP_501_NOT_IMPLEMENTED, + ): + # NOTE: Legacy Override. Some old services do not have a state entrypoint defined + # therefore we assume there is nothing to be saved and do not raise exception + # Responses found so far: + # METHOD NOT ALLOWED https://httpstatuses.com/405 + # NOT FOUND https://httpstatuses.com/404 + # + _logger.warning( + "Service '%s' does not seem to implement save state functionality: %s. Skipping save", service_host_name, - f"{response}", + err, ) + else: + # upss ... could service had troubles saving, reraise + raise + else: + _logger.info( + "Service '%s' successfully saved its state: %s", + service_host_name, + f"{response}", + ) @run_sequentially_in_context(target_args=["node_uuid"]) @@ -1246,20 +1234,21 @@ async def stop_service(app: FastAPI, *, node_uuid: str, save_state: bool) -> Non await _save_service_state( service_host_name, session=get_client_session(app) ) - except ClientResponseError as err: + except httpx.HTTPStatusError as err: + raise ServiceStateSaveError( service_uuid=node_uuid, reason=f"service {service_host_name} rejected to save state, " - f"responded {err.message} (status {err.status})." + f"responded {err.response.text} (status {err.response.status_code})." "Aborting stop service to prevent data loss.", ) from err - except ClientError as err: + except httpx.RequestError as err: _logger.warning( "Could not save state because %s is unreachable [%s]." "Resuming stop_service.", service_host_name, - err, + err.request, ) # remove the services From 9ebba7973dee9d1456ffe8bb7a9e1f30fc7f33d7 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 23:06:49 +0100 Subject: [PATCH 18/29] encoded true is needed here --- .../director/src/simcore_service_director/registry_proxy.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 21e9f6b96f8..adc140c7b82 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,7 +68,9 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.registry_url}").with_path(path) + request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.registry_url}").with_path( + path, encoded=True + ) session = get_client_session(app) response = await session.request( From 83148bd68eb2c1dae79efd19f7224d62ec41bdc5 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 21 Nov 2024 23:27:49 +0100 Subject: [PATCH 19/29] replace aioresponses with httpx --- .../test_rest_running_interactive_services.py | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/services/director/tests/unit/api/test_rest_running_interactive_services.py b/services/director/tests/unit/api/test_rest_running_interactive_services.py index 97accd23279..fa6fabd6948 100644 --- a/services/director/tests/unit/api/test_rest_running_interactive_services.py +++ b/services/director/tests/unit/api/test_rest_running_interactive_services.py @@ -7,7 +7,7 @@ import httpx import pytest -from aioresponses import CallbackResult, aioresponses +import respx from faker import Faker from fastapi import status from models_library.projects import ProjectID @@ -172,29 +172,23 @@ async def test_running_services_post_and_delete( if save_state: query_params.update({"save_state": "true" if save_state else "false"}) - mocked_save_state_cb = mocker.MagicMock( - return_value=CallbackResult(status=200, payload={}) - ) - PASSTHROUGH_REQUESTS_PREFIXES = [ - "http://127.0.0.1", - "http://localhost", - "unix://", # docker engine - "ws://", # websockets - ] - with aioresponses(passthrough=PASSTHROUGH_REQUESTS_PREFIXES) as mock: + with respx.mock( + base_url=f"http://{service_host}:{service_port}{service_basepath}", + assert_all_called=False, + assert_all_mocked=False, + ) as respx_mock: + + def _save_me(request) -> httpx.Response: + return httpx.Response(status.HTTP_200_OK, json={}) + + respx_mock.post("/state", name="save_state").mock(side_effect=_save_me) + respx_mock.route(host="127.0.0.1", name="host").pass_through() + respx_mock.route(host="localhost", name="localhost").pass_through() - # POST /http://service_host:service_port service_basepath/state ------------------------------------------------- - mock.post( - f"http://{service_host}:{service_port}{service_basepath}/state", - status=200, - callback=mocked_save_state_cb, - ) resp = await client.delete( f"/{api_version_prefix}/running_interactive_services/{params['service_uuid']}", params=query_params, ) - if expected_save_state_call: - mocked_save_state_cb.assert_called_once() text = resp.text assert resp.status_code == status.HTTP_204_NO_CONTENT, text From 6aa135064a738cbfb8aee6d74fc1fb2532acffb5 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 09:11:00 +0100 Subject: [PATCH 20/29] sonarcloud: use a specific name --- .../src/settings_library/docker_registry.py | 2 +- .../director/src/simcore_service_director/registry_proxy.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index 45afd4c7310..b4120c9a3f4 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -63,7 +63,7 @@ def api_url(self) -> str: return f"{self.REGISTRY_URL}/v2" @cached_property - def registry_url(self) -> AnyHttpUrl: + def registry_url_for_docker_engine(self) -> AnyHttpUrl: """returns the full URL to the Docker Registry for use by docker engine""" protocol = "https" if self.REGISTRY_SSL else "http" return TypeAdapter(AnyHttpUrl).validate_python( diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index adc140c7b82..ced210f440d 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,9 +68,9 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.registry_url}").with_path( - path, encoded=True - ) + request_url = URL( + f"{app_settings.DIRECTOR_REGISTRY.registry_url_for_docker_engine}" + ).with_path(path, encoded=True) session = get_client_session(app) response = await session.request( From fdd0eba696e5875346f09b82aacaccebe28815ed Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 09:17:58 +0100 Subject: [PATCH 21/29] remove confusion --- .../src/settings_library/docker_registry.py | 22 +++---------------- .../registry_proxy.py | 6 ++--- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index b4120c9a3f4..9473a1a7152 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -1,14 +1,7 @@ from functools import cached_property from typing import Any, Self -from pydantic import ( - AnyHttpUrl, - Field, - SecretStr, - TypeAdapter, - field_validator, - model_validator, -) +from pydantic import Field, SecretStr, field_validator, model_validator from pydantic_settings import SettingsConfigDict from .base import BaseCustomSettings @@ -20,13 +13,12 @@ class RegistrySettings(BaseCustomSettings): default=None, # This is useful in case of a local registry, where the registry url (path) is relative to the host docker engine" description="development mode only, in case a local registry is used - " - "this is the hostname to the docker registry as seen from inside the container", + "this is the hostname to the docker registry as seen from the host running the containers (e.g. 127.0.0.1:5000)", ) # NOTE: name is missleading, http or https protocol are not included REGISTRY_URL: str = Field( ..., - description="hostname of docker registry (without protocol but with port if available) - " - "typically used by the host machine docker engine", + description="hostname of docker registry (without protocol but with port if available)", min_length=1, ) @@ -62,14 +54,6 @@ def resolved_registry_url(self) -> str: def api_url(self) -> str: return f"{self.REGISTRY_URL}/v2" - @cached_property - def registry_url_for_docker_engine(self) -> AnyHttpUrl: - """returns the full URL to the Docker Registry for use by docker engine""" - protocol = "https" if self.REGISTRY_SSL else "http" - return TypeAdapter(AnyHttpUrl).validate_python( - f"{protocol}://{self.REGISTRY_URL}" - ) - model_config = SettingsConfigDict( json_schema_extra={ "examples": [ diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index ced210f440d..f3b24b2d84c 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,9 +68,9 @@ async def _basic_auth_registry_request( else None ) - request_url = URL( - f"{app_settings.DIRECTOR_REGISTRY.registry_url_for_docker_engine}" - ).with_path(path, encoded=True) + request_url = URL(app_settings.DIRECTOR_REGISTRY.resolved_registry_url).with_path( + path, encoded=True + ) session = get_client_session(app) response = await session.request( From 220b21563fcacfebeee1e929355bf80399cff844 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 09:19:15 +0100 Subject: [PATCH 22/29] ensure registry proxy uses internal registry URL --- .../director/src/simcore_service_director/registry_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index f3b24b2d84c..f93bad9f52a 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,7 +68,7 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(app_settings.DIRECTOR_REGISTRY.resolved_registry_url).with_path( + request_url = URL(app_settings.DIRECTOR_REGISTRY.REGISTRY_URL).with_path( path, encoded=True ) From b6aa263c26c67bc795191f217eaa5e29fad4eeb5 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 09:21:12 +0100 Subject: [PATCH 23/29] ensure registry proxy uses api_url --- .../src/simcore_service_director/registry_proxy.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index f93bad9f52a..2d5936f2352 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,7 +68,7 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(app_settings.DIRECTOR_REGISTRY.REGISTRY_URL).with_path( + request_url = URL(app_settings.DIRECTOR_REGISTRY.api_url).with_path( path, encoded=True ) @@ -218,7 +218,7 @@ async def registry_request( async def _is_registry_responsive(app: FastAPI) -> None: - await _basic_auth_registry_request(app, path="/v2/", method="GET", timeout=1.0) + await _basic_auth_registry_request(app, path="/", method="GET", timeout=1.0) async def _setup_registry(app: FastAPI) -> None: @@ -268,7 +268,7 @@ async def _list_repositories_gen( app: FastAPI, service_type: ServiceType ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): - path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + path = f"/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" result, headers = await registry_request(app, path=path) # initial call while True: @@ -296,7 +296,7 @@ async def list_image_tags_gen( app: FastAPI, image_key: str ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): - path = f"/v2/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + path = f"/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" tags, headers = await registry_request(app, path=path) # initial call while True: if "Link" in headers: @@ -336,7 +336,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ - path = f"/v2/{image}/manifests/{tag}" + path = f"/{image}/manifests/{tag}" _, headers = await registry_request(app, path=path) headers = headers or {} @@ -349,7 +349,7 @@ async def get_image_labels( """Returns image labels and the image manifest digest""" _logger.debug("getting image labels of %s:%s", image, tag) - path = f"/v2/{image}/manifests/{tag}" + path = f"/{image}/manifests/{tag}" request_result, headers = await registry_request(app, path=path) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( From 0076aec6dc514a3cea5190d8319fd7d6514c567e Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:00:42 +0100 Subject: [PATCH 24/29] fix api_url to return protocol --- .../src/settings_library/docker_registry.py | 15 ++++++++++++--- .../simcore_service_director/registry_proxy.py | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index 9473a1a7152..32636f6b9df 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -1,7 +1,14 @@ from functools import cached_property from typing import Any, Self -from pydantic import Field, SecretStr, field_validator, model_validator +from pydantic import ( + AnyHttpUrl, + Field, + SecretStr, + TypeAdapter, + field_validator, + model_validator, +) from pydantic_settings import SettingsConfigDict from .base import BaseCustomSettings @@ -51,8 +58,10 @@ def resolved_registry_url(self) -> str: return self.REGISTRY_PATH or self.REGISTRY_URL @cached_property - def api_url(self) -> str: - return f"{self.REGISTRY_URL}/v2" + def api_url(self) -> AnyHttpUrl: + return TypeAdapter(AnyHttpUrl).validate_python( + f"http{'s' if self.REGISTRY_SSL else ''}://{self.REGISTRY_URL}/v2" + ) model_config = SettingsConfigDict( json_schema_extra={ diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 2d5936f2352..8ad51b3ab6b 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,7 +68,7 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(app_settings.DIRECTOR_REGISTRY.api_url).with_path( + request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.api_url}").with_path( path, encoded=True ) From 7c7c431e6b434bbeae858ecfc24165b6f900fa14 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:14:05 +0100 Subject: [PATCH 25/29] fixes path --- .../simcore_service_director/registry_proxy.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 8ad51b3ab6b..cf2d68f3a35 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,9 +68,7 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.api_url}").with_path( - path, encoded=True - ) + request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.api_url}") / path session = get_client_session(app) response = await session.request( @@ -99,7 +97,8 @@ async def _basic_auth_registry_request( else: # registry that does not need an auth - resp_data = response.json() + if method.lower() != "head": + resp_data = response.json() resp_headers = response.headers return (resp_data, resp_headers) @@ -218,7 +217,7 @@ async def registry_request( async def _is_registry_responsive(app: FastAPI) -> None: - await _basic_auth_registry_request(app, path="/", method="GET", timeout=1.0) + await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) async def _setup_registry(app: FastAPI) -> None: @@ -268,7 +267,7 @@ async def _list_repositories_gen( app: FastAPI, service_type: ServiceType ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): - path = f"/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" result, headers = await registry_request(app, path=path) # initial call while True: @@ -296,7 +295,7 @@ async def list_image_tags_gen( app: FastAPI, image_key: str ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): - path = f"/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" + path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" tags, headers = await registry_request(app, path=path) # initial call while True: if "Link" in headers: @@ -336,7 +335,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ - path = f"/{image}/manifests/{tag}" + path = f"{image}/manifests/{tag}" _, headers = await registry_request(app, path=path) headers = headers or {} @@ -349,7 +348,7 @@ async def get_image_labels( """Returns image labels and the image manifest digest""" _logger.debug("getting image labels of %s:%s", image, tag) - path = f"/{image}/manifests/{tag}" + path = f"{image}/manifests/{tag}" request_result, headers = await registry_request(app, path=path) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( From 3e3d2f119fb8625a37bf01eeb7195388fa51efa7 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:23:51 +0100 Subject: [PATCH 26/29] fix encoding --- .../director/src/simcore_service_director/registry_proxy.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index cf2d68f3a35..9dd6b880bee 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -68,7 +68,9 @@ async def _basic_auth_registry_request( else None ) - request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.api_url}") / path + request_url = URL(f"{app_settings.DIRECTOR_REGISTRY.api_url}").joinpath( + path, encoded=True + ) session = get_client_session(app) response = await session.request( From a5946137bc4b2953cd32263662971093be59d15c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:57:05 +0100 Subject: [PATCH 27/29] fix required registry URL --- services/director-v2/tests/conftest.py | 1 + services/dynamic-sidecar/.env-devel | 2 +- services/dynamic-sidecar/tests/conftest.py | 2 ++ .../tests/unit/test_core_external_dependencies.py | 9 +++++---- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index d0a70389caa..fcc0db6dbf1 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -186,6 +186,7 @@ def mock_env( "REGISTRY_PW": "test", "REGISTRY_SSL": "false", "REGISTRY_USER": "test", + "REGISTRY_URL": faker.url(), "SC_BOOT_MODE": "production", "SIMCORE_SERVICES_NETWORK_NAME": "test_network_name", "SWARM_STACK_NAME": "pytest-simcore", diff --git a/services/dynamic-sidecar/.env-devel b/services/dynamic-sidecar/.env-devel index 9bccac9239e..e1866cb022a 100644 --- a/services/dynamic-sidecar/.env-devel +++ b/services/dynamic-sidecar/.env-devel @@ -22,7 +22,7 @@ DY_SIDECAR_RUN_ID=1689771013_f7c1bd87-4da5-4709-9471-3d60c8a70639 DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS=false # DOCKER REGISTRY -DY_DEPLOYMENT_REGISTRY_SETTINGS='{"REGISTRY_AUTH":"false","REGISTRY_USER":"test","REGISTRY_PW":"test","REGISTRY_SSL":"false"}' +DY_DEPLOYMENT_REGISTRY_SETTINGS='{"REGISTRY_AUTH":"false","REGISTRY_USER":"test","REGISTRY_PW":"test","REGISTRY_SSL":"false", "REGISTRY_URL": "fake.registry.com"}' S3_ENDPOINT=http://111.111.111.111:12345 S3_ACCESS_KEY=mocked diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index d575cdc0db8..62b93daa25c 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -197,6 +197,7 @@ def base_mock_envs( "REGISTRY_USER": "test", "REGISTRY_PW": "test", "REGISTRY_SSL": "false", + "REGISTRY_URL": "registry.pytest.com", } ), "DYNAMIC_SIDECAR_TRACING": "null", @@ -267,6 +268,7 @@ def mock_environment( "REGISTRY_USER": "test", "REGISTRY_PW": "test", "REGISTRY_SSL": "false", + "REGISTRY_URL": "registry.pytest.com", } ), }, diff --git a/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py b/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py index 667aa30f6a4..e741b11189c 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py +++ b/services/dynamic-sidecar/tests/unit/test_core_external_dependencies.py @@ -59,10 +59,11 @@ def mock_environment( "POSTGRES_USER": "test", "POSTGRES_PASSWORD": "test", "POSTGRES_DB": "test", - "REGISTRY_AUTH": "0", - "REGISTRY_USER": "test", - "REGISTRY_PW": "test", - "REGISTRY_SSL": "0", + "REGISTRY_AUTH": f"{faker.pybool()}", + "REGISTRY_USER": faker.user_name(), + "REGISTRY_PW": faker.password(), + "REGISTRY_SSL": f"{faker.pybool()}", + "REGISTRY_URL": faker.url(), **base_mock_envs, }, ) From 1843d947a4bdcf5c12011e5faebb00eaafb7f8e3 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:08:09 +0100 Subject: [PATCH 28/29] fix prefix --- .../src/simcore_service_director/registry_proxy.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 9dd6b880bee..9fa703a24db 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -274,7 +274,9 @@ async def _list_repositories_gen( while True: if "Link" in headers: - next_path = str(headers["Link"]).split(";")[0].strip("<>") + next_path = ( + str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") + ) prefetch_task = asyncio.create_task( registry_request(app, path=next_path) ) @@ -301,7 +303,9 @@ async def list_image_tags_gen( tags, headers = await registry_request(app, path=path) # initial call while True: if "Link" in headers: - next_path = str(headers["Link"]).split(";")[0].strip("<>") + next_path = ( + str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") + ) prefetch_task = asyncio.create_task( registry_request(app, path=next_path) ) From cb215aed29fbdaaa9c8dc95d45b70ff085108706 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:11:03 +0100 Subject: [PATCH 29/29] missing env --- packages/settings-library/tests/test_docker_registry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/settings-library/tests/test_docker_registry.py b/packages/settings-library/tests/test_docker_registry.py index fdaf5a4bc66..0cc513399c8 100644 --- a/packages/settings-library/tests/test_docker_registry.py +++ b/packages/settings-library/tests/test_docker_registry.py @@ -11,6 +11,7 @@ "REGISTRY_USER": "usr", "REGISTRY_PW": "pwd", "REGISTRY_SSL": "False", + "REGISTRY_URL": "pytest.registry.com", }