From d4e60964a9bcd5139fcf818c9a9a61f00913bea9 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Tue, 13 Aug 2024 17:34:38 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fixes=20cache=20issue=20in=20web?= =?UTF-8?q?-server=20services=20i/o=20model=20(#6176)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/web/server/requirements/_base.in | 1 - services/web/server/requirements/_base.txt | 2 - .../simcore_service_webserver/catalog/_api.py | 49 ++++++------- .../catalog/_api_units.py | 33 ++++----- .../catalog/_models.py | 70 +++++++------------ .../unit/isolated/test_catalog_models.py | 11 +-- 6 files changed, 70 insertions(+), 96 deletions(-) diff --git a/services/web/server/requirements/_base.in b/services/web/server/requirements/_base.in index 0dfa941e076..b0a765fd8ea 100644 --- a/services/web/server/requirements/_base.in +++ b/services/web/server/requirements/_base.in @@ -30,7 +30,6 @@ aiohttp-swagger[performance] aiopg[sa] # db aiosmtplib # email asyncpg # db -cachetools # caching for sync functions captcha cryptography # security faker # Only used in dev-mode for proof-of-concepts diff --git a/services/web/server/requirements/_base.txt b/services/web/server/requirements/_base.txt index fb4b9229c58..f80a025a702 100644 --- a/services/web/server/requirements/_base.txt +++ b/services/web/server/requirements/_base.txt @@ -101,8 +101,6 @@ attrs==21.4.0 # openapi-core bidict==0.22.0 # via python-socketio -cachetools==5.3.2 - # via -r requirements/_base.in captcha==0.5.0 # via -r requirements/_base.in certifi==2023.7.22 diff --git a/services/web/server/src/simcore_service_webserver/catalog/_api.py b/services/web/server/src/simcore_service_webserver/catalog/_api.py index f06e9cc7ec3..b5bffe5a764 100644 --- a/services/web/server/src/simcore_service_webserver/catalog/_api.py +++ b/services/web/server/src/simcore_service_webserver/catalog/_api.py @@ -1,4 +1,3 @@ -import asyncio import logging from collections.abc import Iterator from typing import Any @@ -66,8 +65,7 @@ async def _safe_replace_service_input_outputs( service: dict[str, Any], unit_registry: UnitRegistry ): try: - await asyncio.to_thread( - replace_service_input_outputs, + await replace_service_input_outputs( service, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY, @@ -133,8 +131,7 @@ async def dev_get_service( ) data = jsonable_encoder(service, exclude_unset=True) - await asyncio.to_thread( - replace_service_input_outputs, + await replace_service_input_outputs( data, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY, @@ -163,8 +160,7 @@ async def dev_update_service( ) data = jsonable_encoder(service, exclude_unset=True) - await asyncio.to_thread( - replace_service_input_outputs, + await replace_service_input_outputs( data, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY, @@ -194,8 +190,7 @@ async def get_service( service = await client.get_service( ctx.app, ctx.user_id, service_key, service_version, ctx.product_name ) - await asyncio.to_thread( - replace_service_input_outputs, + await replace_service_input_outputs( service, unit_registry=ctx.unit_registry, **RESPONSE_MODEL_POLICY, @@ -217,8 +212,7 @@ async def update_service( ctx.product_name, update_data, ) - await asyncio.to_thread( - replace_service_input_outputs, + await replace_service_input_outputs( service, unit_registry=ctx.unit_registry, **RESPONSE_MODEL_POLICY, @@ -232,13 +226,12 @@ async def list_service_inputs( service = await client.get_service( ctx.app, ctx.user_id, service_key, service_version, ctx.product_name ) - inputs = [] - for input_key in service["inputs"]: - service_input: ServiceInputGet = ( - ServiceInputGetFactory.from_catalog_service_api_model(service, input_key) + return [ + await ServiceInputGetFactory.from_catalog_service_api_model( + service=service, input_key=input_key ) - inputs.append(service_input) - return inputs + for input_key in service["inputs"] + ] async def get_service_input( @@ -251,7 +244,9 @@ async def get_service_input( ctx.app, ctx.user_id, service_key, service_version, ctx.product_name ) service_input: ServiceInputGet = ( - ServiceInputGetFactory.from_catalog_service_api_model(service, input_key) + await ServiceInputGetFactory.from_catalog_service_api_model( + service=service, input_key=input_key + ) ) return service_input @@ -307,14 +302,12 @@ async def list_service_outputs( service = await client.get_service( ctx.app, ctx.user_id, service_key, service_version, ctx.product_name ) - - outputs = [] - for output_key in service["outputs"]: - service_output = ServiceOutputGetFactory.from_catalog_service_api_model( - service, output_key, None + return [ + await ServiceOutputGetFactory.from_catalog_service_api_model( + service=service, output_key=output_key, ureg=None ) - outputs.append(service_output) - return outputs + for output_key in service["outputs"] + ] async def get_service_output( @@ -326,12 +319,10 @@ async def get_service_output( service = await client.get_service( ctx.app, ctx.user_id, service_key, service_version, ctx.product_name ) - service_output: ServiceOutputGet = ( - ServiceOutputGetFactory.from_catalog_service_api_model(service, output_key) + return await ServiceOutputGetFactory.from_catalog_service_api_model( + service=service, output_key=output_key ) - return service_output - async def get_compatible_outputs_given_target_input( service_key: ServiceKey, diff --git a/services/web/server/src/simcore_service_webserver/catalog/_api_units.py b/services/web/server/src/simcore_service_webserver/catalog/_api_units.py index a0367b60030..a8558e674ec 100644 --- a/services/web/server/src/simcore_service_webserver/catalog/_api_units.py +++ b/services/web/server/src/simcore_service_webserver/catalog/_api_units.py @@ -1,9 +1,5 @@ from typing import Any -from models_library.api_schemas_webserver.catalog import ( - ServiceInputGet, - ServiceOutputGet, -) from models_library.services import BaseServiceIOModel, ServiceInput, ServiceOutput from pint import PintError, UnitRegistry @@ -37,7 +33,7 @@ def _can_convert_units(from_unit: str, to_unit: str, ureg: UnitRegistry) -> bool return can -def replace_service_input_outputs( +async def replace_service_input_outputs( service: dict[str, Any], *, unit_registry: UnitRegistry | None = None, @@ -45,20 +41,25 @@ def replace_service_input_outputs( ): """Thin wrapper to replace i/o ports in returned service model""" # This is a fast solution until proper models are available for the web API - for input_key in service["inputs"]: - new_input: ServiceInputGet = ( - ServiceInputGetFactory.from_catalog_service_api_model( - service, input_key, unit_registry - ) + new_inputs = [ + await ServiceInputGetFactory.from_catalog_service_api_model( + service=service, input_key=input_key, ureg=unit_registry ) - service["inputs"][input_key] = new_input.dict(**export_options) + for input_key in service["inputs"] + ] - for output_key in service["outputs"]: - new_output: ServiceOutputGet = ( - ServiceOutputGetFactory.from_catalog_service_api_model( - service, output_key, unit_registry - ) + new_outputs = [ + await ServiceOutputGetFactory.from_catalog_service_api_model( + service=service, output_key=output_key, ureg=unit_registry ) + for output_key in service["outputs"] + ] + + # replace if above is successful + for input_key, new_input in zip(service["inputs"], new_inputs, strict=True): + service["inputs"][input_key] = new_input.dict(**export_options) + + for output_key, new_output in zip(service["outputs"], new_outputs, strict=True): service["outputs"][output_key] = new_output.dict(**export_options) diff --git a/services/web/server/src/simcore_service_webserver/catalog/_models.py b/services/web/server/src/simcore_service_webserver/catalog/_models.py index 969b9a9f52d..5c4235350a6 100644 --- a/services/web/server/src/simcore_service_webserver/catalog/_models.py +++ b/services/web/server/src/simcore_service_webserver/catalog/_models.py @@ -1,9 +1,8 @@ import logging -import os from dataclasses import dataclass -from typing import Any, Final +from typing import Any, Callable, Final -import cachetools +from aiocache import cached from models_library.api_schemas_webserver.catalog import ( ServiceInputGet, ServiceInputKey, @@ -54,43 +53,28 @@ def get_html_formatted_unit( # # Transforms from catalog api models -> webserver api models # - - -# Caching: https://cachetools.readthedocs.io/en/latest/index.html#cachetools.TTLCache -# - the least recently used items will be discarded first to make space when necessary. +# Uses aiocache (async) instead of cachetools (sync) in order to handle concurrency better +# SEE https://github.com/ITISFoundation/osparc-simcore/pull/6169 # +_SECOND = 1 # in seconds +_MINUTE = 60 * _SECOND +_CACHE_TTL: Final = 1 * _MINUTE -_CACHE_MAXSIZE: Final = int( - os.getenv("CACHETOOLS_CACHE_MAXSIZE", "100") -) # number of items i.e. ServiceInputGet/ServiceOutputGet instances -_CACHE_TTL: Final = int(os.getenv("CACHETOOLS_CACHE_TTL_SECS", "60")) # secs - - -def _hash_inputs( - service: dict[str, Any], - input_key: str, - *args, # noqa: ARG001 # pylint: disable=unused-argument - **kwargs, # noqa: ARG001 # pylint: disable=unused-argument -): - return f"{service['key']}/{service['version']}/{input_key}" - - -def _cachetools_cached(*args, **kwargs): - def decorator(func): - if os.getenv("CACHETOOLS_DISABLE", "0") == "0": - return cachetools.cached(*args, **kwargs)(func) - _logger.warning("cachetools disabled") - return func - return decorator +def _hash_inputs(_f: Callable[..., Any], *_args, **kw): + assert not _args # nosec + service: dict[str, Any] = kw["service"] + return f"ServiceInputGetFactory_{service['key']}_{service['version']}_{kw['input_key']}" class ServiceInputGetFactory: @staticmethod - @_cachetools_cached( - cachetools.TTLCache(ttl=_CACHE_TTL, maxsize=_CACHE_MAXSIZE), key=_hash_inputs + @cached( + ttl=_CACHE_TTL, + key_builder=_hash_inputs, ) - def from_catalog_service_api_model( + async def from_catalog_service_api_model( + *, service: dict[str, Any], input_key: ServiceInputKey, ureg: UnitRegistry | None = None, @@ -110,29 +94,27 @@ def from_catalog_service_api_model( return port -def _hash_outputs( - service: dict[str, Any], - output_key: str, - *args, # noqa: ARG001 # pylint: disable=unused-argument - **kwargs, # noqa: ARG001 # pylint: disable=unused-argument -): - return f"{service['key']}/{service['version']}/{output_key}" +def _hash_outputs(_f: Callable[..., Any], *_args, **kw): + assert not _args # nosec + service: dict[str, Any] = kw["service"] + return f"ServiceOutputGetFactory_{service['key']}/{service['version']}/{kw['output_key']}" class ServiceOutputGetFactory: @staticmethod - @_cachetools_cached( - cachetools.TTLCache(ttl=_CACHE_TTL, maxsize=_CACHE_MAXSIZE), key=_hash_outputs + @cached( + ttl=_CACHE_TTL, + key_builder=_hash_outputs, ) - def from_catalog_service_api_model( + async def from_catalog_service_api_model( + *, service: dict[str, Any], output_key: ServiceOutputKey, ureg: UnitRegistry | None = None, ) -> ServiceOutputGet: data = service["outputs"][output_key] # NOTE: prunes invalid field that might have remained in database - if "defaultValue" in data: - data.pop("defaultValue") + data.pop("defaultValue", None) # NOTE: this call must be validated if port property type is "ref_contentSchema" port = ServiceOutputGet(key_id=output_key, **data) diff --git a/services/web/server/tests/unit/isolated/test_catalog_models.py b/services/web/server/tests/unit/isolated/test_catalog_models.py index d9ba9d75894..ec82b0ab367 100644 --- a/services/web/server/tests/unit/isolated/test_catalog_models.py +++ b/services/web/server/tests/unit/isolated/test_catalog_models.py @@ -2,6 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable +import asyncio import json from copy import deepcopy @@ -69,14 +70,16 @@ def test_from_catalog_to_webapi_service( "owner": "foo@fake.com", } - def _run(): + def _run_async_test(): s = deepcopy(catalog_service) - replace_service_input_outputs( - s, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY + asyncio.get_event_loop().run_until_complete( + replace_service_input_outputs( + s, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY + ) ) return s - result = benchmark(_run) + result = benchmark(_run_async_test) # check result got = json.dumps(result, indent=1)