Skip to content

Commit

Permalink
🐛 Fixes cache issue in web-server services i/o model (#6176)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov authored Aug 13, 2024
1 parent 83fa6fe commit d4e6096
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 96 deletions.
1 change: 0 additions & 1 deletion services/web/server/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ aiohttp-swagger[performance]
aiopg[sa] # db
aiosmtplib # email
asyncpg # db
cachetools # caching for sync functions
captcha
cryptography # security
faker # Only used in dev-mode for proof-of-concepts
Expand Down
2 changes: 0 additions & 2 deletions services/web/server/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ attrs==21.4.0
# openapi-core
bidict==0.22.0
# via python-socketio
cachetools==5.3.2
# via -r requirements/_base.in
captcha==0.5.0
# via -r requirements/_base.in
certifi==2023.7.22
Expand Down
49 changes: 20 additions & 29 deletions services/web/server/src/simcore_service_webserver/catalog/_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from collections.abc import Iterator
from typing import Any
Expand Down Expand Up @@ -66,8 +65,7 @@ async def _safe_replace_service_input_outputs(
service: dict[str, Any], unit_registry: UnitRegistry
):
try:
await asyncio.to_thread(
replace_service_input_outputs,
await replace_service_input_outputs(
service,
unit_registry=unit_registry,
**RESPONSE_MODEL_POLICY,
Expand Down Expand Up @@ -133,8 +131,7 @@ async def dev_get_service(
)

data = jsonable_encoder(service, exclude_unset=True)
await asyncio.to_thread(
replace_service_input_outputs,
await replace_service_input_outputs(
data,
unit_registry=unit_registry,
**RESPONSE_MODEL_POLICY,
Expand Down Expand Up @@ -163,8 +160,7 @@ async def dev_update_service(
)

data = jsonable_encoder(service, exclude_unset=True)
await asyncio.to_thread(
replace_service_input_outputs,
await replace_service_input_outputs(
data,
unit_registry=unit_registry,
**RESPONSE_MODEL_POLICY,
Expand Down Expand Up @@ -194,8 +190,7 @@ async def get_service(
service = await client.get_service(
ctx.app, ctx.user_id, service_key, service_version, ctx.product_name
)
await asyncio.to_thread(
replace_service_input_outputs,
await replace_service_input_outputs(
service,
unit_registry=ctx.unit_registry,
**RESPONSE_MODEL_POLICY,
Expand All @@ -217,8 +212,7 @@ async def update_service(
ctx.product_name,
update_data,
)
await asyncio.to_thread(
replace_service_input_outputs,
await replace_service_input_outputs(
service,
unit_registry=ctx.unit_registry,
**RESPONSE_MODEL_POLICY,
Expand All @@ -232,13 +226,12 @@ async def list_service_inputs(
service = await client.get_service(
ctx.app, ctx.user_id, service_key, service_version, ctx.product_name
)
inputs = []
for input_key in service["inputs"]:
service_input: ServiceInputGet = (
ServiceInputGetFactory.from_catalog_service_api_model(service, input_key)
return [
await ServiceInputGetFactory.from_catalog_service_api_model(
service=service, input_key=input_key
)
inputs.append(service_input)
return inputs
for input_key in service["inputs"]
]


async def get_service_input(
Expand All @@ -251,7 +244,9 @@ async def get_service_input(
ctx.app, ctx.user_id, service_key, service_version, ctx.product_name
)
service_input: ServiceInputGet = (
ServiceInputGetFactory.from_catalog_service_api_model(service, input_key)
await ServiceInputGetFactory.from_catalog_service_api_model(
service=service, input_key=input_key
)
)

return service_input
Expand Down Expand Up @@ -307,14 +302,12 @@ async def list_service_outputs(
service = await client.get_service(
ctx.app, ctx.user_id, service_key, service_version, ctx.product_name
)

outputs = []
for output_key in service["outputs"]:
service_output = ServiceOutputGetFactory.from_catalog_service_api_model(
service, output_key, None
return [
await ServiceOutputGetFactory.from_catalog_service_api_model(
service=service, output_key=output_key, ureg=None
)
outputs.append(service_output)
return outputs
for output_key in service["outputs"]
]


async def get_service_output(
Expand All @@ -326,12 +319,10 @@ async def get_service_output(
service = await client.get_service(
ctx.app, ctx.user_id, service_key, service_version, ctx.product_name
)
service_output: ServiceOutputGet = (
ServiceOutputGetFactory.from_catalog_service_api_model(service, output_key)
return await ServiceOutputGetFactory.from_catalog_service_api_model(
service=service, output_key=output_key
)

return service_output


async def get_compatible_outputs_given_target_input(
service_key: ServiceKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from typing import Any

from models_library.api_schemas_webserver.catalog import (
ServiceInputGet,
ServiceOutputGet,
)
from models_library.services import BaseServiceIOModel, ServiceInput, ServiceOutput
from pint import PintError, UnitRegistry

Expand Down Expand Up @@ -37,28 +33,33 @@ def _can_convert_units(from_unit: str, to_unit: str, ureg: UnitRegistry) -> bool
return can


def replace_service_input_outputs(
async def replace_service_input_outputs(
service: dict[str, Any],
*,
unit_registry: UnitRegistry | None = None,
**export_options,
):
"""Thin wrapper to replace i/o ports in returned service model"""
# This is a fast solution until proper models are available for the web API
for input_key in service["inputs"]:
new_input: ServiceInputGet = (
ServiceInputGetFactory.from_catalog_service_api_model(
service, input_key, unit_registry
)
new_inputs = [
await ServiceInputGetFactory.from_catalog_service_api_model(
service=service, input_key=input_key, ureg=unit_registry
)
service["inputs"][input_key] = new_input.dict(**export_options)
for input_key in service["inputs"]
]

for output_key in service["outputs"]:
new_output: ServiceOutputGet = (
ServiceOutputGetFactory.from_catalog_service_api_model(
service, output_key, unit_registry
)
new_outputs = [
await ServiceOutputGetFactory.from_catalog_service_api_model(
service=service, output_key=output_key, ureg=unit_registry
)
for output_key in service["outputs"]
]

# replace if above is successful
for input_key, new_input in zip(service["inputs"], new_inputs, strict=True):
service["inputs"][input_key] = new_input.dict(**export_options)

for output_key, new_output in zip(service["outputs"], new_outputs, strict=True):
service["outputs"][output_key] = new_output.dict(**export_options)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import os
from dataclasses import dataclass
from typing import Any, Final
from typing import Any, Callable, Final

import cachetools
from aiocache import cached
from models_library.api_schemas_webserver.catalog import (
ServiceInputGet,
ServiceInputKey,
Expand Down Expand Up @@ -54,43 +53,28 @@ def get_html_formatted_unit(
#
# Transforms from catalog api models -> webserver api models
#


# Caching: https://cachetools.readthedocs.io/en/latest/index.html#cachetools.TTLCache
# - the least recently used items will be discarded first to make space when necessary.
# Uses aiocache (async) instead of cachetools (sync) in order to handle concurrency better
# SEE https://github.com/ITISFoundation/osparc-simcore/pull/6169
#
_SECOND = 1 # in seconds
_MINUTE = 60 * _SECOND
_CACHE_TTL: Final = 1 * _MINUTE

_CACHE_MAXSIZE: Final = int(
os.getenv("CACHETOOLS_CACHE_MAXSIZE", "100")
) # number of items i.e. ServiceInputGet/ServiceOutputGet instances
_CACHE_TTL: Final = int(os.getenv("CACHETOOLS_CACHE_TTL_SECS", "60")) # secs


def _hash_inputs(
service: dict[str, Any],
input_key: str,
*args, # noqa: ARG001 # pylint: disable=unused-argument
**kwargs, # noqa: ARG001 # pylint: disable=unused-argument
):
return f"{service['key']}/{service['version']}/{input_key}"


def _cachetools_cached(*args, **kwargs):
def decorator(func):
if os.getenv("CACHETOOLS_DISABLE", "0") == "0":
return cachetools.cached(*args, **kwargs)(func)
_logger.warning("cachetools disabled")
return func

return decorator
def _hash_inputs(_f: Callable[..., Any], *_args, **kw):
assert not _args # nosec
service: dict[str, Any] = kw["service"]
return f"ServiceInputGetFactory_{service['key']}_{service['version']}_{kw['input_key']}"


class ServiceInputGetFactory:
@staticmethod
@_cachetools_cached(
cachetools.TTLCache(ttl=_CACHE_TTL, maxsize=_CACHE_MAXSIZE), key=_hash_inputs
@cached(
ttl=_CACHE_TTL,
key_builder=_hash_inputs,
)
def from_catalog_service_api_model(
async def from_catalog_service_api_model(
*,
service: dict[str, Any],
input_key: ServiceInputKey,
ureg: UnitRegistry | None = None,
Expand All @@ -110,29 +94,27 @@ def from_catalog_service_api_model(
return port


def _hash_outputs(
service: dict[str, Any],
output_key: str,
*args, # noqa: ARG001 # pylint: disable=unused-argument
**kwargs, # noqa: ARG001 # pylint: disable=unused-argument
):
return f"{service['key']}/{service['version']}/{output_key}"
def _hash_outputs(_f: Callable[..., Any], *_args, **kw):
assert not _args # nosec
service: dict[str, Any] = kw["service"]
return f"ServiceOutputGetFactory_{service['key']}/{service['version']}/{kw['output_key']}"


class ServiceOutputGetFactory:
@staticmethod
@_cachetools_cached(
cachetools.TTLCache(ttl=_CACHE_TTL, maxsize=_CACHE_MAXSIZE), key=_hash_outputs
@cached(
ttl=_CACHE_TTL,
key_builder=_hash_outputs,
)
def from_catalog_service_api_model(
async def from_catalog_service_api_model(
*,
service: dict[str, Any],
output_key: ServiceOutputKey,
ureg: UnitRegistry | None = None,
) -> ServiceOutputGet:
data = service["outputs"][output_key]
# NOTE: prunes invalid field that might have remained in database
if "defaultValue" in data:
data.pop("defaultValue")
data.pop("defaultValue", None)

# NOTE: this call must be validated if port property type is "ref_contentSchema"
port = ServiceOutputGet(key_id=output_key, **data)
Expand Down
11 changes: 7 additions & 4 deletions services/web/server/tests/unit/isolated/test_catalog_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# pylint: disable=unused-argument
# pylint: disable=unused-variable

import asyncio
import json
from copy import deepcopy

Expand Down Expand Up @@ -69,14 +70,16 @@ def test_from_catalog_to_webapi_service(
"owner": "[email protected]",
}

def _run():
def _run_async_test():
s = deepcopy(catalog_service)
replace_service_input_outputs(
s, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY
asyncio.get_event_loop().run_until_complete(
replace_service_input_outputs(
s, unit_registry=unit_registry, **RESPONSE_MODEL_POLICY
)
)
return s

result = benchmark(_run)
result = benchmark(_run_async_test)

# check result
got = json.dumps(result, indent=1)
Expand Down

0 comments on commit d4e6096

Please sign in to comment.