diff --git a/api/specs/web-server/_auth.py b/api/specs/web-server/_auth.py index 6d2b714f393..d922ce41228 100644 --- a/api/specs/web-server/_auth.py +++ b/api/specs/web-server/_auth.py @@ -266,7 +266,7 @@ async def email_confirmation(code: str): }, }, ) -async def list_api_keys(code: str): +async def list_api_keys(): """lists display names of API keys by this user""" diff --git a/packages/models-library/src/models_library/api_schemas_api_server/__init__.py b/packages/models-library/src/models_library/api_schemas_api_server/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/services/api-server/src/simcore_service_api_server/models/domain/api_keys.py b/packages/models-library/src/models_library/api_schemas_api_server/api_keys.py similarity index 79% rename from services/api-server/src/simcore_service_api_server/models/domain/api_keys.py rename to packages/models-library/src/models_library/api_schemas_api_server/api_keys.py index 467b885cb82..d828fc6507d 100644 --- a/services/api-server/src/simcore_service_api_server/models/domain/api_keys.py +++ b/packages/models-library/src/models_library/api_schemas_api_server/api_keys.py @@ -6,7 +6,10 @@ class ApiKey(BaseModel): api_secret: SecretStr -class ApiKeyInDB(ApiKey): +class ApiKeyInDB(BaseModel): + api_key: str + api_secret: str + id_: int = Field(0, alias="id") display_name: str user_id: int diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index 369e0a8bf63..b9ebbb549c3 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -1580,6 +1580,7 @@ "Profile": { "title": "Profile", "required": [ + "id", "login", "role" ], @@ -1595,6 +1596,12 @@ "type": "string", "example": "Maxwell" }, + "id": { + "title": "Id", + "exclusiveMinimum": true, + "type": "integer", + "minimum": 0 + }, "login": { "title": "Login", "type": "string", @@ -1614,6 +1621,7 @@ } }, "example": { + "id": "20", "first_name": "James", "last_name": "Maxwell", "login": "james-maxwell@itis.swiss", diff --git a/services/api-server/tests/unit/_with_db/conftest.py b/services/api-server/tests/unit/_with_db/conftest.py index ee13aabcc72..f9a8815bff6 100644 --- a/services/api-server/tests/unit/_with_db/conftest.py +++ b/services/api-server/tests/unit/_with_db/conftest.py @@ -22,6 +22,7 @@ import yaml from aiopg.sa.connection import SAConnection from fastapi import FastAPI +from models_library.api_schemas_api_server.api_keys import ApiKeyInDB from pydantic import PositiveInt from pytest_simcore.helpers.rawdata_fakers import ( random_api_key, @@ -36,7 +37,6 @@ from simcore_postgres_database.models.users import users from simcore_service_api_server.core.application import init_app from simcore_service_api_server.core.settings import PostgresSettings -from simcore_service_api_server.models.domain.api_keys import ApiKeyInDB ## POSTGRES ----- @@ -269,5 +269,5 @@ async def auth( ) -> httpx.BasicAuth: """overrides auth and uses access to real repositories instead of mocks""" async for key in create_fake_api_keys(1): - return httpx.BasicAuth(key.api_key, key.api_secret.get_secret_value()) - assert False, "Did not generate authentication" + return httpx.BasicAuth(key.api_key, key.api_secret) + pytest.fail("Did not generate authentication") diff --git a/services/api-server/tests/unit/_with_db/test_product.py b/services/api-server/tests/unit/_with_db/test_product.py index a5b721be3f7..af0fb995ffd 100644 --- a/services/api-server/tests/unit/_with_db/test_product.py +++ b/services/api-server/tests/unit/_with_db/test_product.py @@ -13,13 +13,13 @@ from faker import Faker from fastapi import status from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_api_server.api_keys import ApiKeyInDB from models_library.api_schemas_webserver.wallets import WalletGetWithAvailableCredits from models_library.generics import Envelope from models_library.users import UserID from models_library.wallets import WalletStatus from pydantic import PositiveInt from simcore_service_api_server._meta import API_VTAG -from simcore_service_api_server.models.domain.api_keys import ApiKeyInDB async def test_product_webserver( @@ -70,7 +70,7 @@ def _check_key_product_compatibility(request: httpx.Request, **kwargs): key = keys[wallet_id] response = await client.get( f"{API_VTAG}/wallets/{wallet_id}", - auth=httpx.BasicAuth(key.api_key, key.api_secret.get_secret_value()), + auth=httpx.BasicAuth(key.api_key, key.api_secret), ) assert response.status_code == status.HTTP_200_OK assert wallet_get_mock.call_count == len(keys) @@ -104,7 +104,7 @@ def _get_service_side_effect(request: httpx.Request, **kwargs): for key in keys: response = await client.get( f"{API_VTAG}/solvers/simcore/services/comp/isolve/releases/2.0.24", - auth=httpx.BasicAuth(key.api_key, key.api_secret.get_secret_value()), + auth=httpx.BasicAuth(key.api_key, key.api_secret), ) assert respx_mock.call_count == len(keys) diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index 9ad93646615..55a02280053 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -17,6 +17,7 @@ from ..api.errors.validation_error import http422_error_handler from ..meta import API_VERSION, API_VTAG, PROJECT_NAME, SUMMARY from ..modules import ( + api_keys_manager, catalog, comp_scheduler, dask_clients_pool, @@ -166,6 +167,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: if dynamic_scheduler_enabled: dynamic_sidecar.setup(app) + api_keys_manager.setup(app) if ( settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND.COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py new file mode 100644 index 00000000000..0fa6f457fe8 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -0,0 +1,106 @@ +from uuid import uuid5 + +from fastapi import FastAPI +from models_library.api_schemas_webserver import WEBSERVER_RPC_NAMESPACE +from models_library.api_schemas_webserver.auth import ApiKeyCreate, ApiKeyGet +from models_library.products import ProductName +from models_library.projects_nodes_io import NodeID +from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.users import UserID +from pydantic import parse_obj_as +from servicelib.rabbitmq import RabbitMQRPCClient + +from ..utils.distributed_identifer import BaseDistributedIdentifierManager +from .rabbitmq import get_rabbitmq_rpc_client + + +class APIKeysManager(BaseDistributedIdentifierManager[str, ApiKeyGet]): + def __init__(self, app: FastAPI) -> None: + self.GET_OR_CREATE_INJECTS_IDENTIFIER = True + self.app = app + + @property + def rpc_client(self) -> RabbitMQRPCClient: + return get_rabbitmq_rpc_client(self.app) + + # pylint:disable=arguments-differ + + async def get( # type:ignore [override] + self, identifier: str, product_name: ProductName, user_id: UserID + ) -> ApiKeyGet | None: + result = await self.rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "api_key_get"), + product_name=product_name, + user_id=user_id, + name=identifier, + ) + return parse_obj_as(ApiKeyGet | None, result) + + async def create( # type:ignore [override] + self, identifier: str, product_name: ProductName, user_id: UserID + ) -> tuple[str, ApiKeyGet]: + result = await self.rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "create_api_keys"), + product_name=product_name, + user_id=user_id, + new=ApiKeyCreate(display_name=identifier, expiration=None), + ) + return identifier, ApiKeyGet.parse_obj(result) + + async def destroy( # type:ignore [override] + self, identifier: str, product_name: ProductName, user_id: UserID + ) -> None: + await self.rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "delete_api_keys"), + product_name=product_name, + user_id=user_id, + name=identifier, + ) + + +async def get_or_create_api_key( + app: FastAPI, *, product_name: ProductName, user_id: UserID, node_id: NodeID +) -> ApiKeyGet: + api_keys_manager = _get_api_keys_manager(app) + display_name = _get_api_key_name(node_id) + + key_data: ApiKeyGet | None = await api_keys_manager.get( + identifier=display_name, product_name=product_name, user_id=user_id + ) + if key_data is None: + _, key_data = await api_keys_manager.create( + identifier=display_name, product_name=product_name, user_id=user_id + ) + + return key_data + + +async def safe_remove( + app: FastAPI, *, node_id: NodeID, product_name: ProductName, user_id: UserID +) -> None: + api_keys_manager = _get_api_keys_manager(app) + display_name = _get_api_key_name(node_id) + + await api_keys_manager.remove( + identifier=display_name, product_name=product_name, user_id=user_id + ) + + +def _get_api_key_name(node_id: NodeID) -> str: + obfuscated_node_id = uuid5(node_id, f"{node_id}") + return f"_auto_{obfuscated_node_id}" + + +def _get_api_keys_manager(app: FastAPI) -> APIKeysManager: + api_keys_manager: APIKeysManager = app.state.api_keys_manager + return api_keys_manager + + +def setup(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.api_keys_manager = APIKeysManager(app) + + app.add_event_handler("startup", on_startup) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py index 54e837b8f5d..e67c3c45f64 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py @@ -28,6 +28,7 @@ from ...core.dynamic_services_settings.egress_proxy import EgressProxySettings from ...modules.osparc_variables_substitutions import ( + resolve_and_substitute_service_lifetime_variables_in_specs, resolve_and_substitute_session_variables_in_model, resolve_and_substitute_session_variables_in_specs, substitute_vendor_secrets_in_model, @@ -376,11 +377,19 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913 app=app, specs=service_spec, user_id=user_id, + safe=True, + product_name=product_name, + project_id=project_id, + node_id=node_id, + ) + service_spec = await resolve_and_substitute_service_lifetime_variables_in_specs( + app=app, + specs=service_spec, # NOTE: at this point all OsparcIdentifiers have to be replaced # an error will be raised otherwise - safe=False, + safe=True, product_name=product_name, - project_id=project_id, + user_id=user_id, node_id=node_id, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index d46811a3aab..e5c83523f6c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -2,8 +2,7 @@ import json import logging -from collections import deque -from typing import Any, Deque, Final +from typing import Any, Final from fastapi import FastAPI from models_library.projects_networks import ProjectsNetworks @@ -39,6 +38,7 @@ SchedulerData, ) from .....utils.db import get_repository +from ....api_keys_manager import safe_remove from ....db.repositories.projects import ProjectsRepository from ....db.repositories.projects_networks import ProjectsNetworksRepository from ....director_v0 import DirectorV0Client @@ -64,7 +64,7 @@ from ...errors import EntrypointContainerNotFoundError from ...volumes import DY_SIDECAR_SHARED_STORE_PATH, DynamicSidecarVolumesPathsResolver -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) # Used to ensure no more that X services per node pull or push data @@ -75,22 +75,19 @@ def get_director_v0_client(app: FastAPI) -> DirectorV0Client: - client = DirectorV0Client.instance(app) - return client + return DirectorV0Client.instance(app) def parse_containers_inspect( containers_inspect: dict[str, Any] | None ) -> list[DockerContainerInspect]: - results: Deque[DockerContainerInspect] = deque() - if containers_inspect is None: return [] - for container_id in containers_inspect: - container_inspect_data = containers_inspect[container_id] - results.append(DockerContainerInspect.from_container(container_inspect_data)) - return list(results) + return [ + DockerContainerInspect.from_container(containers_inspect[container_id]) + for container_id in containers_inspect + ] def are_all_user_services_containers_running( @@ -107,7 +104,9 @@ def _get_scheduler_data(app: FastAPI, node_uuid: NodeID) -> SchedulerData: ) # pylint: disable=protected-access scheduler_data: SchedulerData = ( - dynamic_sidecars_scheduler._scheduler.get_scheduler_data(node_uuid) + dynamic_sidecars_scheduler._scheduler.get_scheduler_data( # noqa: SLF001 + node_uuid + ) ) return scheduler_data @@ -125,7 +124,7 @@ async def service_remove_containers( scheduler_data.endpoint, progress_callback=progress_callback ) except (BaseClientHTTPError, TaskClientResultError) as e: - logger.warning( + _logger.warning( ( "Could not remove service containers for " "%s\n%s. Will continue to save the data from the service!" @@ -193,7 +192,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( if scheduler_data.dynamic_sidecar.were_state_and_outputs_saved: if scheduler_data.dynamic_sidecar.docker_node_id is None: - logger.warning( + _logger.warning( "Skipped volume removal for %s, since a docker_node_id was not found.", scheduler_data.node_uuid, ) @@ -210,11 +209,11 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( DY_SIDECAR_SHARED_STORE_PATH, scheduler_data.paths_mapping.inputs_path, scheduler_data.paths_mapping.outputs_path, + *scheduler_data.paths_mapping.state_paths, ] - + scheduler_data.paths_mapping.state_paths ] with log_context( - logger, logging.DEBUG, f"removing volumes via service for {node_uuid}" + _logger, logging.DEBUG, f"removing volumes via service for {node_uuid}" ): await remove_volumes_from_node( swarm_stack_name=swarm_stack_name, @@ -225,7 +224,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( node_uuid=scheduler_data.node_uuid, ) - logger.debug( + _logger.debug( "Removed dynamic-sidecar services and crated container for '%s'", scheduler_data.service_name, ) @@ -245,7 +244,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( # pylint: disable=protected-access scheduler_data.dynamic_sidecar.service_removal_state.mark_removed() await ( - app.state.dynamic_sidecar_scheduler._scheduler.remove_service_from_observation( + app.state.dynamic_sidecar_scheduler._scheduler.remove_service_from_observation( # noqa: SLF001 scheduler_data.node_uuid ) ) @@ -287,7 +286,7 @@ async def _remove_containers_save_state_and_outputs() -> None: ) if can_really_save and scheduler_data.dynamic_sidecar.were_containers_created: - logger.info("Calling into dynamic-sidecar to save: state and output ports") + _logger.info("Calling into dynamic-sidecar to save: state and output ports") try: tasks = [ service_push_outputs(app, scheduler_data.node_uuid, sidecars_client) @@ -305,9 +304,9 @@ async def _remove_containers_save_state_and_outputs() -> None: await logged_gather(*tasks, max_concurrency=2) scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True - logger.info("dynamic-sidecar saved: state and output ports") + _logger.info("dynamic-sidecar saved: state and output ports") except (BaseClientHTTPError, TaskClientResultError) as e: - logger.error( + _logger.error( # noqa: TRY400 ( "Could not contact dynamic-sidecar to save service " "state or output ports %s\n%s" @@ -322,7 +321,7 @@ async def _remove_containers_save_state_and_outputs() -> None: scheduler_data.dynamic_sidecar.wait_for_manual_intervention_after_error = ( True ) - raise e + raise if node_resource_limits_enabled(app): node_rights_manager = await NodeRightsManager.instance(app) @@ -335,7 +334,7 @@ async def _remove_containers_save_state_and_outputs() -> None: await _remove_containers_save_state_and_outputs() except NodeRightsAcquireError: # Next observation cycle, the service will try again - logger.debug( + _logger.debug( "Skip saving service state for %s. Docker node %s is busy. Will try later.", scheduler_data.node_uuid, scheduler_data.dynamic_sidecar.docker_node_id, @@ -348,6 +347,13 @@ async def _remove_containers_save_state_and_outputs() -> None: TaskProgress.create(), app, scheduler_data.node_uuid, settings.SWARM_STACK_NAME ) + await safe_remove( + app, + node_id=scheduler_data.node_uuid, + product_name=scheduler_data.product_name, + user_id=scheduler_data.user_id, + ) + # remove sidecar's api client remove_sidecars_client(app, scheduler_data.node_uuid) @@ -368,7 +374,7 @@ async def _remove_containers_save_state_and_outputs() -> None: async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None: - logger.debug("Attaching project networks for %s", scheduler_data.service_name) + _logger.debug("Attaching project networks for %s", scheduler_data.service_name) sidecars_client = get_sidecars_client(app, scheduler_data.node_uuid) dynamic_sidecar_endpoint = scheduler_data.endpoint @@ -410,13 +416,13 @@ async def wait_for_sidecar_api(app: FastAPI, scheduler_data: SchedulerData) -> N ), wait=wait_fixed(1), retry_error_cls=EntrypointContainerNotFoundError, - before_sleep=before_sleep_log(logger, logging.DEBUG), + before_sleep=before_sleep_log(_logger, logging.DEBUG), ): with attempt: if not await get_dynamic_sidecar_service_health( app, scheduler_data, with_retry=False ): - raise TryAgain() + raise TryAgain scheduler_data.dynamic_sidecar.is_healthy = True @@ -473,7 +479,7 @@ async def _pull_outputs_and_state(): service_outputs_labels = json.loads( simcore_service_labels.dict().get("io.simcore.outputs", "{}") ).get("outputs", {}) - logger.debug( + _logger.debug( "Creating dirs from service outputs labels: %s", service_outputs_labels, ) @@ -494,7 +500,7 @@ async def _pull_outputs_and_state(): await _pull_outputs_and_state() except NodeRightsAcquireError: # Next observation cycle, the service will try again - logger.debug( + _logger.debug( "Skip saving service state for %s. Docker node %s is busy. Will try later.", scheduler_data.node_uuid, scheduler_data.dynamic_sidecar.docker_node_id, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py index 8a4dfcd390e..078c6f9eae2 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py @@ -2,7 +2,6 @@ """ import logging -from collections.abc import Callable, Mapping from copy import deepcopy from typing import Any @@ -27,6 +26,7 @@ OsparcVariablesTable, resolve_variables_from_context, ) +from .api_keys_manager import get_or_create_api_key from .db.repositories.services_environments import ServicesEnvironmentsRepository _logger = logging.getLogger(__name__) @@ -156,10 +156,14 @@ async def resolve_and_substitute_session_variables_in_specs( if requested := set(resolver.get_identifiers()): available = set(table.variables_names()) - - if identifiers := available.intersection(requested): + identifiers_to_replace = available.intersection(requested) + _logger.debug( + "resolve_and_substitute_session_variables_in_specs identifiers_to_replace=%s", + identifiers_to_replace, + ) + if identifiers_to_replace: environs = await resolve_variables_from_context( - table.copy(include=identifiers), + table.copy(include=identifiers_to_replace), context=ContextDict( app=app, user_id=user_id, @@ -176,14 +180,76 @@ async def resolve_and_substitute_session_variables_in_specs( return deepcopy(specs) -async def resolve_and_substitute_lifespan_variables_in_specs( - _app: FastAPI, - _specs: dict[str, Any], +async def resolve_and_substitute_service_lifetime_variables_in_specs( + app: FastAPI, + specs: dict[str, Any], *, - _callbacks_registry: Mapping[str, Callable], + product_name: ProductName, + user_id: UserID, + node_id: NodeID, safe: bool = True, -): - raise NotImplementedError +) -> dict[str, Any]: + registry: OsparcVariablesTable = app.state.lifespan_osparc_variables_table + + resolver = SpecsSubstitutionsResolver(specs, upgrade=False) + + if requested := set(resolver.get_identifiers()): + available = set(registry.variables_names()) + + if identifiers := available.intersection(requested): + environs = await resolve_variables_from_context( + registry.copy(include=identifiers), + context=ContextDict( + app=app, + product_name=product_name, + user_id=user_id, + node_id=node_id, + ), + # NOTE: the api key and secret cannot be resolved in parallel + # due to race conditions + resolve_in_parallel=False, + ) + + resolver.set_substitutions(mappings=environs) + new_specs: dict[str, Any] = resolver.run(safe=safe) + return new_specs + + return deepcopy(specs) + + +async def _get_or_create_api_key( + app: FastAPI, product_name: ProductName, user_id: UserID, node_id: NodeID +) -> str: + key_data = await get_or_create_api_key( + app, + product_name=product_name, + user_id=user_id, + node_id=node_id, + ) + return key_data.api_key # type:ignore [no-any-return] + + +async def _get_or_create_api_secret( + app: FastAPI, product_name: ProductName, user_id: UserID, node_id: NodeID +) -> str: + key_data = await get_or_create_api_key( + app, + product_name=product_name, + user_id=user_id, + node_id=node_id, + ) + return key_data.api_secret # type:ignore [no-any-return] + + +def _setup_lifespan_osparc_variables_table(app: FastAPI): + app.state.lifespan_osparc_variables_table = table = OsparcVariablesTable() + + table.register_from_handler("OSPARC_VARIABLE_API_KEY")(_get_or_create_api_key) + table.register_from_handler("OSPARC_VARIABLE_API_SECRET")(_get_or_create_api_secret) + + _logger.debug( + "Registered lifespan_osparc_variables_table=%s", sorted(table.variables_names()) + ) async def _request_user_email(app: FastAPI, user_id: UserID) -> EmailStr: @@ -205,6 +271,7 @@ def _setup_session_osparc_variables(app: FastAPI): ("OSPARC_VARIABLE_PRODUCT_NAME", "product_name"), ("OSPARC_VARIABLE_STUDY_UUID", "project_id"), ("OSPARC_VARIABLE_NODE_ID", "node_id"), + # ANE -> PC: why not register the user_id as well at this point? ]: table.register_from_context(name, context_name) @@ -226,5 +293,6 @@ def setup(app: FastAPI): def on_startup() -> None: _setup_session_osparc_variables(app) + _setup_lifespan_osparc_variables_table(app) app.add_event_handler("startup", on_startup) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py new file mode 100644 index 00000000000..05d84a72f08 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py @@ -0,0 +1,69 @@ +import logging +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from servicelib.logging_utils import log_catch, log_context + +_logger = logging.getLogger(__name__) + +Ident = TypeVar("Ident") +Res = TypeVar("Res") + + +class BaseDistributedIdentifierManager(ABC, Generic[Ident, Res]): + """Common interface used to manage the lifecycle of osparc resources. + + An osparc resource can be anything that needs to be created and then removed + during the runtime of the platform. + + Safe remove the resource. + + For usage check ``packages/service-library/tests/test_osparc_generic_resource.py`` + """ + + @abstractmethod + async def get(self, identifier: Ident, **extra_kwargs) -> Res | None: + """Returns a resource if exists. + + Arguments: + identifier -- user chosen identifier for the resource + **extra_kwargs -- can be overloaded by the user + + Returns: + None if the resource does not exit + """ + + @abstractmethod + async def create(self, **extra_kwargs) -> tuple[Ident, Res]: + """Used for creating the resources + + Arguments: + **extra_kwargs -- can be overloaded by the user + + Returns: + tuple[identifier for the resource, resource object] + """ + + @abstractmethod + async def destroy(self, identifier: Ident, **extra_kwargs) -> None: + """Used to destroy an existing resource + + Arguments: + identifier -- user chosen identifier for the resource + **extra_kwargs -- can be overloaded by the user + """ + + async def remove( + self, identifier: Ident, *, reraise: bool = False, **extra_kwargs + ) -> None: + """Attempts to remove the resource, if an error occurs it is logged. + + Arguments: + identifier -- user chosen identifier for the resource + reraise -- when True raises any exception raised by ``destroy`` (default: {False}) + **extra_kwargs -- can be overloaded by the user + """ + with log_context( + _logger, logging.DEBUG, f"{self.__class__}: removing {identifier}" + ), log_catch(_logger, reraise=reraise): + await self.destroy(identifier, **extra_kwargs) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py b/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py index 5e783a807a9..94eadd9fb47 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/osparc_variables.py @@ -5,6 +5,7 @@ from models_library.utils.specs_substitution import SubstitutionValue from pydantic import NonNegativeInt, parse_obj_as +from servicelib.utils import logged_gather ContextDict: TypeAlias = dict[str, Any] ContextGetter: TypeAlias = Callable[[ContextDict], Any] @@ -94,7 +95,21 @@ def copy( async def resolve_variables_from_context( variables_getters: dict[str, ContextGetter], context: ContextDict, + *, + resolve_in_parallel: bool = True, ) -> dict[str, SubstitutionValue]: + """Resolves variables given a list of handlers and a context + containing vars which can be used by the handlers. + + Arguments: + variables_getters -- mapping of awaitables which resolve the value + context -- variables which can be passed to the awaitables + + Keyword Arguments: + resolve_in_parallel -- sometimes the variable_getters cannot be ran in parallel, + for example due to race conditions, + for those situations set to False (default: {True}) + """ # evaluate getters from context values pre_environs: dict[str, SubstitutionValue | RequestTuple] = { key: fun(context) for key, fun in variables_getters.items() @@ -113,7 +128,10 @@ async def resolve_variables_from_context( environs[key] = value # evaluates handlers - values = await asyncio.gather(*coros.values()) + values = await logged_gather( + *coros.values(), + max_concurrency=0 if resolve_in_parallel else 1, + ) for key, value in zip(coros.keys(), values, strict=True): environs[key] = value diff --git a/services/director-v2/tests/unit/test_modules_api_keys_manager.py b/services/director-v2/tests/unit/test_modules_api_keys_manager.py new file mode 100644 index 00000000000..3c56a05936c --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_api_keys_manager.py @@ -0,0 +1,100 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from collections.abc import Awaitable, Callable + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.api_schemas_webserver import WEBSERVER_RPC_NAMESPACE +from models_library.api_schemas_webserver.auth import ApiKeyCreate, ApiKeyGet +from models_library.products import ProductName +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from pytest_mock import MockerFixture +from servicelib.rabbitmq import RabbitMQRPCClient, RPCRouter +from simcore_service_director_v2.modules.api_keys_manager import ( + APIKeysManager, + _get_api_key_name, +) + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +def test_get_api_key_name_is_not_randomly_generated(node_id: NodeID): + api_key_names = {_get_api_key_name(node_id) for x in range(1000)} + assert len(api_key_names) == 1 + + +@pytest.fixture +async def mock_rpc_server( + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], + mocker: MockerFixture, +) -> RabbitMQRPCClient: + rpc_client = await rabbitmq_rpc_client("client") + rpc_server = await rabbitmq_rpc_client("mock_server") + + router = RPCRouter() + + # mocks the interface defined in the webserver + + @router.expose() + async def api_key_get( + product_name: ProductName, user_id: UserID, name: str + ) -> ApiKeyGet: + return ApiKeyGet.parse_obj(ApiKeyGet.Config.schema_extra["examples"][0]) + + @router.expose() + async def create_api_keys( + product_name: ProductName, user_id: UserID, new: ApiKeyCreate + ) -> ApiKeyGet: + return ApiKeyGet.parse_obj(ApiKeyGet.Config.schema_extra["examples"][0]) + + @router.expose() + async def delete_api_keys( + product_name: ProductName, user_id: UserID, name: str + ) -> None: + ... + + await rpc_server.register_router(router, namespace=WEBSERVER_RPC_NAMESPACE) + + # mock returned client + mocker.patch( + "simcore_service_director_v2.modules.api_keys_manager.get_rabbitmq_rpc_client", + return_value=rpc_client, + ) + + return rpc_client + + +async def test_rpc_endpoints( + mock_rpc_server: RabbitMQRPCClient, + faker: Faker, +): + manager = APIKeysManager(FastAPI()) + + identifier = faker.pystr() + product_name = faker.pystr() + user_id = faker.pyint() + + api_key = await manager.get( + identifier=identifier, product_name=product_name, user_id=user_id + ) + assert isinstance(api_key, ApiKeyGet) + + identifier, api_key = await manager.create( + identifier=identifier, product_name=product_name, user_id=user_id + ) + assert isinstance(identifier, str) + assert isinstance(api_key, ApiKeyGet) + + await manager.destroy( + identifier=identifier, product_name=product_name, user_id=user_id + ) diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py new file mode 100644 index 00000000000..9d645e7b1da --- /dev/null +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -0,0 +1,137 @@ +# pylint:disable=redefined-outer-name + +import random +import string +from typing import Any +from uuid import UUID, uuid4 + +import pytest +from pytest_mock import MockerFixture +from simcore_service_director_v2.utils.distributed_identifer import ( + BaseDistributedIdentifierManager, +) + + +# define a custom type of ID for the API +class UserDefinedID: + def __init__(self, uuid: UUID | None = None) -> None: + self._id = uuid if uuid else uuid4() + + def __eq__(self, other: "UserDefinedID") -> bool: + return self._id == other._id + + def __hash__(self): + return hash(str(self._id)) + + +# mocked api interface +class RandomTextAPI: + def __init__(self) -> None: + self._created: dict[UserDefinedID, Any] = {} + + @staticmethod + def _random_string(length: int) -> str: + letters_and_digits = string.ascii_letters + string.digits + return "".join( + random.choice(letters_and_digits) for _ in range(length) # noqa: S311 + ) + + def create(self, length: int) -> tuple[UserDefinedID, Any]: + identifier = UserDefinedID(uuid4()) + self._created[identifier] = self._random_string(length) + return identifier, self._created[identifier] + + def delete(self, identifier: UserDefinedID) -> None: + del self._created[identifier] + + def get(self, identifier: UserDefinedID) -> Any | None: + return self._created.get(identifier, None) + + +# define a custom manager using the custom user defined identifiers +# NOTE: note that the generic uses `[UserDefinedID, Any]` +# which enforces typing constraints on the overloaded abstract methods +class RandomTextResourcesManager(BaseDistributedIdentifierManager[UserDefinedID, Any]): + # pylint:disable=arguments-differ + + def __init__(self) -> None: + self.api = RandomTextAPI() + + async def get(self, identifier: UserDefinedID, **_) -> Any | None: + return self.api.get(identifier) + + async def create(self, length: int) -> tuple[UserDefinedID, Any]: + return self.api.create(length) + + async def destroy(self, identifier: UserDefinedID) -> None: + self.api.delete(identifier) + + +@pytest.fixture +def manager() -> RandomTextResourcesManager: + return RandomTextResourcesManager() + + +async def test_resource_is_missing(manager: RandomTextResourcesManager): + missing_identifier = UserDefinedID() + assert await manager.get(missing_identifier) is None + + +async def test_manual_workflow(manager: RandomTextResourcesManager): + # creation + identifier, _ = await manager.create(length=1) + assert await manager.get(identifier) is not None + + # removal + await manager.destroy(identifier) + + # resource no longer exists + assert await manager.get(identifier) is None + + +@pytest.mark.parametrize("delete_before_removal", [True, False]) +async def test_automatic_cleanup_workflow( + manager: RandomTextResourcesManager, delete_before_removal: bool +): + # creation + identifier, _ = await manager.create(length=1) + assert await manager.get(identifier) is not None + + # optional removal + if delete_before_removal: + await manager.destroy(identifier) + + is_still_present = not delete_before_removal + assert (await manager.get(identifier) is not None) is is_still_present + + # safe remove the resource + await manager.remove(identifier) + + # resource no longer exists + assert await manager.get(identifier) is None + + +@pytest.mark.parametrize("reraise", [True, False]) +async def test_remove_raises_error( + mocker: MockerFixture, + manager: RandomTextResourcesManager, + caplog: pytest.LogCaptureFixture, + reraise: bool, +): + caplog.clear() + + error_message = "mock error during resource destroy" + mocker.patch.object(manager, "destroy", side_effect=RuntimeError(error_message)) + + # after creation object is present + identifier, _ = await manager.create(length=1) + assert await manager.get(identifier) is not None + + if reraise: + with pytest.raises(RuntimeError): + await manager.remove(identifier, reraise=reraise) + else: + await manager.remove(identifier, reraise=reraise) + # check logs in case of error + assert "Unhandled exception:" in caplog.text + assert error_message in caplog.text diff --git a/services/director-v2/tests/unit/test_utils_osparc_variables.py b/services/director-v2/tests/unit/test_utils_osparc_variables.py index 84bf7ce2e40..17c3b4d9899 100644 --- a/services/director-v2/tests/unit/test_utils_osparc_variables.py +++ b/services/director-v2/tests/unit/test_utils_osparc_variables.py @@ -18,7 +18,7 @@ from pytest_simcore.helpers.faker_compose_specs import generate_fake_docker_compose from simcore_postgres_database.models.users import UserRole from simcore_service_director_v2.modules.osparc_variables_substitutions import ( - resolve_and_substitute_lifespan_variables_in_specs, + resolve_and_substitute_service_lifetime_variables_in_specs, resolve_and_substitute_session_variables_in_specs, substitute_vendor_secrets_in_specs, ) @@ -49,7 +49,7 @@ def session_context(faker: Faker) -> ContextDict: async def test_resolve_session_environs(faker: Faker, session_context: ContextDict): assert resolve_and_substitute_session_variables_in_specs assert substitute_vendor_secrets_in_specs - assert resolve_and_substitute_lifespan_variables_in_specs + assert resolve_and_substitute_service_lifetime_variables_in_specs async def _request_user_role(app: FastAPI, user_id: UserID) -> SubstitutionValue: print(app, user_id) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 223c4402b0d..d84d9c55f10 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -445,13 +445,6 @@ paths: summary: List Api Keys description: lists display names of API keys by this user operationId: list_api_keys - parameters: - - required: true - schema: - title: Code - type: string - name: code - in: query responses: '200': description: returns the display names of API keys diff --git a/services/web/server/src/simcore_service_webserver/api_keys/_api.py b/services/web/server/src/simcore_service_webserver/api_keys/_api.py index 9ce15603ee4..6768ce40fcb 100644 --- a/services/web/server/src/simcore_service_webserver/api_keys/_api.py +++ b/services/web/server/src/simcore_service_webserver/api_keys/_api.py @@ -1,4 +1,3 @@ -import logging import re import string from typing import Final @@ -11,9 +10,6 @@ from ..login.utils import get_random_string from ._db import ApiKeyRepo -_logger = logging.getLogger(__name__) - - _PUNCTUATION_REGEX = re.compile( pattern="[" + re.escape(string.punctuation.replace("_", "")) + "]" ) @@ -62,6 +58,14 @@ async def create_api_key( ) +async def get( + app: web.Application, *, name: str, user_id: UserID, product_name: ProductName +) -> ApiKeyGet | None: + repo = ApiKeyRepo.create_from_app(app) + row = await repo.get(display_name=name, user_id=user_id, product_name=product_name) + return ApiKeyGet.parse_obj(row) if row else None + + async def delete_api_key( app: web.Application, *, diff --git a/services/web/server/src/simcore_service_webserver/api_keys/_db.py b/services/web/server/src/simcore_service_webserver/api_keys/_db.py index 8cb12f07172..abbbf05e52b 100644 --- a/services/web/server/src/simcore_service_webserver/api_keys/_db.py +++ b/services/web/server/src/simcore_service_webserver/api_keys/_db.py @@ -5,7 +5,8 @@ import sqlalchemy as sa from aiohttp import web from aiopg.sa.engine import Engine -from aiopg.sa.result import ResultProxy +from aiopg.sa.result import ResultProxy, RowProxy +from models_library.api_schemas_api_server.api_keys import ApiKeyInDB from models_library.basic_types import IdInt from models_library.products import ProductName from models_library.users import UserID @@ -27,7 +28,7 @@ async def list_names( self, *, user_id: UserID, product_name: ProductName ) -> list[str]: async with self.engine.acquire() as conn: - stmt = sa.select(api_keys.c.display_name,).where( + stmt = sa.select(api_keys.c.display_name).where( (api_keys.c.user_id == user_id) & (api_keys.c.product_name == product_name) ) @@ -64,9 +65,24 @@ async def create( rows = await result.fetchall() or [] return [r.id for r in rows] + async def get( + self, *, display_name: str, user_id: UserID, product_name: ProductName + ) -> ApiKeyInDB | None: + async with self.engine.acquire() as conn: + stmt = sa.select(api_keys).where( + (api_keys.c.user_id == user_id) + & (api_keys.c.display_name == display_name) + & (api_keys.c.product_name == product_name) + ) + + result: ResultProxy = await conn.execute(stmt) + row: RowProxy | None = await result.fetchone() + _logger.error("debug %s", row) + return ApiKeyInDB.from_orm(row) if row else None + async def delete_by_name( self, *, display_name: str, user_id: UserID, product_name: ProductName - ): + ) -> None: async with self.engine.acquire() as conn: stmt = api_keys.delete().where( (api_keys.c.user_id == user_id) @@ -77,7 +93,7 @@ async def delete_by_name( async def delete_by_key( self, *, api_key: str, user_id: UserID, product_name: ProductName - ): + ) -> None: async with self.engine.acquire() as conn: stmt = api_keys.delete().where( (api_keys.c.user_id == user_id) diff --git a/services/web/server/src/simcore_service_webserver/api_keys/_rpc.py b/services/web/server/src/simcore_service_webserver/api_keys/_rpc.py index 8c94ed271e8..ef7fb9b303c 100644 --- a/services/web/server/src/simcore_service_webserver/api_keys/_rpc.py +++ b/services/web/server/src/simcore_service_webserver/api_keys/_rpc.py @@ -31,12 +31,23 @@ async def delete_api_keys( product_name: ProductName, user_id: UserID, name: str, -): - return await _api.delete_api_key( +) -> None: + await _api.delete_api_key( app, name=name, user_id=user_id, product_name=product_name ) +@router.expose() +async def api_key_get( + app: web.Application, + *, + product_name: ProductName, + user_id: UserID, + name: str, +) -> ApiKeyGet | None: + return await _api.get(app, name=name, user_id=user_id, product_name=product_name) + + async def register_rpc_routes_on_startup(app: web.Application): rpc_server = get_rabbitmq_rpc_server(app) await rpc_server.register_router(router, WEBSERVER_RPC_NAMESPACE, app) diff --git a/services/web/server/tests/unit/with_dbs/01/test_api_keys.py b/services/web/server/tests/unit/with_dbs/01/test_api_keys.py index ef218513b36..1b4eeb7c257 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_api_keys.py +++ b/services/web/server/tests/unit/with_dbs/01/test_api_keys.py @@ -80,20 +80,19 @@ async def test_create_api_keys( expected: type[web.HTTPException], disable_gc_manual_guest_users: None, ): - resp = await client.post("/v0/auth/api-keys", json={"display_name": "foo"}) + display_name = "foo" + resp = await client.post("/v0/auth/api-keys", json={"display_name": display_name}) data, errors = await assert_status(resp, expected) if not errors: - assert data["display_name"] == "foo" + assert data["display_name"] == display_name assert "api_key" in data assert "api_secret" in data resp = await client.get("/v0/auth/api-keys") data, _ = await assert_status(resp, expected) - assert sorted(data) == [ - "foo", - ] + assert sorted(data) == [display_name] @pytest.mark.parametrize( diff --git a/services/web/server/tests/unit/with_dbs/01/test_api_keys_rpc.py b/services/web/server/tests/unit/with_dbs/01/test_api_keys_rpc.py new file mode 100644 index 00000000000..b70812f09ae --- /dev/null +++ b/services/web/server/tests/unit/with_dbs/01/test_api_keys_rpc.py @@ -0,0 +1,164 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +from collections.abc import AsyncIterable, Awaitable, Callable + +import pytest +from aiohttp.test_utils import TestServer +from faker import Faker +from models_library.api_schemas_webserver import WEBSERVER_RPC_NAMESPACE +from models_library.api_schemas_webserver.auth import ApiKeyCreate +from models_library.products import ProductName +from models_library.rabbitmq_basic_types import RPCMethodName +from pydantic import parse_obj_as +from pytest_mock import MockerFixture +from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.utils_envs import setenvs_from_dict +from pytest_simcore.helpers.utils_login import UserInfoDict +from servicelib.rabbitmq import RabbitMQRPCClient +from settings_library.rabbit import RabbitSettings +from simcore_postgres_database.models.users import UserRole +from simcore_service_webserver.api_keys._db import ApiKeyRepo +from simcore_service_webserver.application_settings import ApplicationSettings + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def app_environment( + rabbit_service: RabbitSettings, + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, +): + new_envs = setenvs_from_dict( + monkeypatch, + { + **app_environment, + "RABBIT_HOST": rabbit_service.RABBIT_HOST, + "RABBIT_PORT": f"{rabbit_service.RABBIT_PORT}", + "RABBIT_USER": rabbit_service.RABBIT_USER, + "RABBIT_SECURE": f"{rabbit_service.RABBIT_SECURE}", + "RABBIT_PASSWORD": rabbit_service.RABBIT_PASSWORD.get_secret_value(), + }, + ) + + settings = ApplicationSettings.create_from_envs() + assert settings.WEBSERVER_RABBITMQ + + return new_envs + + +@pytest.fixture +def user_role() -> UserRole: + return UserRole.USER + + +@pytest.fixture +async def fake_user_api_keys( + user_role: UserRole, + web_server: TestServer, + logged_user: UserInfoDict, + osparc_product_name: ProductName, +) -> AsyncIterable[list[str]]: + names = ["foo", "bar", "beta", "alpha"] + repo = ApiKeyRepo.create_from_app(app=web_server.app) + + for name in names: + await repo.create( + user_id=logged_user["id"], + product_name=osparc_product_name, + display_name=name, + expiration=None, + api_key=f"{name}-key", + api_secret=f"{name}-secret", + ) + + yield names + + for name in names: + await repo.delete_by_name( + display_name=name, + user_id=logged_user["id"], + product_name=osparc_product_name, + ) + + +@pytest.fixture +async def rpc_client( + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], + mocker: MockerFixture, +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + +async def test_api_key_get( + fake_user_api_keys: list[str], + rpc_client: RabbitMQRPCClient, + osparc_product_name: ProductName, + logged_user: UserInfoDict, +): + for api_key_name in fake_user_api_keys: + result = await rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "api_key_get"), + product_name=osparc_product_name, + user_id=logged_user["id"], + name=api_key_name, + ) + assert result.display_name == api_key_name + + +async def test_api_keys_workflow( + web_server: TestServer, + rpc_client: RabbitMQRPCClient, + osparc_product_name: ProductName, + logged_user: UserInfoDict, + faker: Faker, +): + key_name = faker.pystr() + + # creating a key + created_api_key = await rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "create_api_keys"), + product_name=osparc_product_name, + user_id=logged_user["id"], + new=ApiKeyCreate(display_name=key_name, expiration=None), + ) + assert created_api_key.display_name == key_name + + # query the key is still present + queried_api_key = await rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "api_key_get"), + product_name=osparc_product_name, + user_id=logged_user["id"], + name=key_name, + ) + assert queried_api_key.display_name == key_name + + assert created_api_key == queried_api_key + + # remove the key + delete_key_result = await rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "delete_api_keys"), + product_name=osparc_product_name, + user_id=logged_user["id"], + name=key_name, + ) + assert delete_key_result is None + + # key no longer present + query_missing_query = await rpc_client.request( + WEBSERVER_RPC_NAMESPACE, + parse_obj_as(RPCMethodName, "api_key_get"), + product_name=osparc_product_name, + user_id=logged_user["id"], + name=key_name, + ) + assert query_missing_query is None