diff --git a/packages/service-library/src/servicelib/observer.py b/packages/service-library/src/servicelib/observer.py index 5eb47c1ecd3..a7e73188a9b 100644 --- a/packages/service-library/src/servicelib/observer.py +++ b/packages/service-library/src/servicelib/observer.py @@ -3,11 +3,12 @@ """ -import asyncio import logging from collections import defaultdict from functools import wraps +from .utils import logged_gather + log = logging.getLogger(__name__) event_registry = defaultdict(list) @@ -19,7 +20,7 @@ async def emit(event: str, *args, **kwargs): coroutines = [observer(*args, **kwargs) for observer in event_registry[event]] # all coroutine called in // - await asyncio.gather(*coroutines, return_exceptions=True) + await logged_gather(*coroutines) def observe(event: str): diff --git a/packages/service-library/src/servicelib/utils.py b/packages/service-library/src/servicelib/utils.py index b1abb8e53a0..07763aad9f7 100644 --- a/packages/service-library/src/servicelib/utils.py +++ b/packages/service-library/src/servicelib/utils.py @@ -4,7 +4,12 @@ I order to avoid cyclic dependences, please DO NOT IMPORT ANYTHING from . """ +import asyncio +import logging from pathlib import Path +from typing import Any, Coroutine, List, Optional, Union + +logger = logging.getLogger(__name__) def is_osparc_repo_dir(path: Path) -> bool: @@ -14,7 +19,7 @@ def is_osparc_repo_dir(path: Path) -> bool: return all(d in got for d in expected) -def search_osparc_repo_dir(start, max_iterations=8): +def search_osparc_repo_dir(start: Union[str, Path], max_iterations=8) -> Optional[Path]: """ Returns path to root repo dir or None if it does not exists NOTE: assumes starts is a path within repo @@ -27,3 +32,32 @@ def search_osparc_repo_dir(start, max_iterations=8): iteration_number += 1 return root_dir if is_osparc_repo_dir(root_dir) else None + + +# FUTURES +def fire_and_forget_task(obj: Union[Coroutine, asyncio.Future]) -> None: + future = asyncio.ensure_future(obj) + + def log_exception_callback(fut: asyncio.Future): + try: + fut.result() + except Exception: # pylint: disable=broad-except + logger.exception("Error occured while running task!") + + future.add_done_callback(log_exception_callback) + + +# // tasks +async def logged_gather(*tasks, reraise: bool = True) -> List[Any]: + # all coroutine called in // and we take care of returning the exceptions + results = await asyncio.gather(*tasks, return_exceptions=True) + for value in results: + if isinstance(value, Exception): + if reraise: + raise value + logger.error( + "Exception occured while running %s: %s", + str(tasks[results.index(value)]), + str(value), + ) + return results diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 2e61606840a..746014c80fd 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -195,6 +195,8 @@ async def list_services(app: web.Application, service_type: ServiceType) -> List for repo_details in results: if repo_details and isinstance(repo_details, list): services.extend(repo_details) + elif isinstance(repo_details, Exception): + _logger.error("Exception occured while listing services %s", repo_details) return services async def list_interactive_service_dependencies(app: web.Application, service_key: str, service_tag: str) -> List[Dict]: diff --git a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py index bc312f4fbd5..73798c6dba9 100644 --- a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py +++ b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py @@ -13,7 +13,7 @@ from servicelib.application_keys import APP_DB_ENGINE_KEY -from .projects import projects_api +from .projects import projects_api, projects_exceptions from .projects.projects_models import projects, user_to_projects from .socketio.events import post_messages @@ -87,9 +87,15 @@ async def listen(app: web.Application): ) async for row in conn.execute(query): user_id = row["user_id"] - node_data = await projects_api.update_project_node_outputs( - app, user_id, project_id, node_id, data=task_output - ) + try: + node_data = await projects_api.update_project_node_outputs( + app, user_id, project_id, node_id, data=task_output + ) + except projects_exceptions.ProjectNotFoundError: + log.exception("Project %s not found", project_id) + except projects_exceptions.NodeNotFoundError: + log.exception("Node %s ib project %s not found", node_id, project_id) + messages = {"nodeUpdated": {"Node": node_id, "Data": node_data}} await post_messages(app, user_id, messages) diff --git a/services/web/server/src/simcore_service_webserver/computation_handlers.py b/services/web/server/src/simcore_service_webserver/computation_handlers.py index 8ce9360f63c..ae7f4b41680 100644 --- a/services/web/server/src/simcore_service_webserver/computation_handlers.py +++ b/services/web/server/src/simcore_service_webserver/computation_handlers.py @@ -15,6 +15,7 @@ from .computation_config import CONFIG_SECTION_NAME as CONFIG_RABBIT_SECTION from .login.decorators import login_required from .projects.projects_api import get_project_for_user +from .projects.projects_exceptions import ProjectNotFoundError from .security_api import check_permission log = logging.getLogger(__file__) @@ -51,8 +52,12 @@ async def update_pipeline(request: web.Request) -> web.Response: user_id, project_id = await _process_request(request) - project = await get_project_for_user(request.app, project_id, user_id) - await update_pipeline_db(request.app, project_id, project["workbench"]) + try: + project = await get_project_for_user(request.app, project_id, user_id) + await update_pipeline_db(request.app, project_id, project["workbench"]) + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_id} not found") + raise web.HTTPNoContent() @@ -67,8 +72,11 @@ async def start_pipeline(request: web.Request) -> web.Response: user_id, project_id = await _process_request(request) - project = await get_project_for_user(request.app, project_id, user_id) - await update_pipeline_db(request.app, project_id, project["workbench"]) + try: + project = await get_project_for_user(request.app, project_id, user_id) + await update_pipeline_db(request.app, project_id, project["workbench"]) + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_id} not found") # commit the tasks to celery _ = get_celery(request.app).send_task( diff --git a/services/web/server/src/simcore_service_webserver/director/director_api.py b/services/web/server/src/simcore_service_webserver/director/director_api.py index e1c4035fe1b..c50eb0b2973 100644 --- a/services/web/server/src/simcore_service_webserver/director/director_api.py +++ b/services/web/server/src/simcore_service_webserver/director/director_api.py @@ -1,6 +1,5 @@ # pylint: disable=too-many-arguments -import asyncio import logging import urllib from typing import Dict, List, Optional @@ -8,8 +7,10 @@ from aiohttp import web from yarl import URL -from .config import get_client_session, get_config +from servicelib.utils import logged_gather + from . import director_exceptions +from .config import get_client_session, get_config log = logging.getLogger(__name__) @@ -99,7 +100,7 @@ async def stop_services( app, user_id=user_id, project_id=project_id ) stop_tasks = [stop_service(app, service_uuid) for service_uuid in services] - await asyncio.gather(*stop_tasks) + await logged_gather(*stop_tasks, reraise=False) async def get_service_by_key_version( diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index de0d29f2e59..1f4db02e8f6 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -9,7 +9,6 @@ # pylint: disable=too-many-arguments import logging -from asyncio import ensure_future, gather from pprint import pformat from typing import Dict, Optional from uuid import uuid4 @@ -19,20 +18,19 @@ from servicelib.application_keys import APP_JSONSCHEMA_SPECS_KEY from servicelib.jsonschema_validation import validate_instance from servicelib.observer import observe +from servicelib.utils import fire_and_forget_task, logged_gather from ..computation_api import delete_pipeline_db from ..director import director_api -from ..storage_api import copy_data_folders_from_project # mocked in unit-tests -from ..storage_api import ( - delete_data_folders_of_project, - delete_data_folders_of_project_node, -) +from ..storage_api import \ + copy_data_folders_from_project # mocked in unit-tests +from ..storage_api import (delete_data_folders_of_project, + delete_data_folders_of_project_node) from .config import CONFIG_SECTION_NAME from .projects_db import APP_PROJECT_DBAPI -from .projects_exceptions import NodeNotFoundError, ProjectNotFoundError +from .projects_exceptions import NodeNotFoundError from .projects_utils import clone_project_document - log = logging.getLogger(__name__) @@ -59,23 +57,19 @@ async def get_project_for_user( :rtype: Dict """ - try: - db = app[APP_PROJECT_DBAPI] - - project = None - if include_templates: - project = await db.get_template_project(project_uuid) + db = app[APP_PROJECT_DBAPI] - if not project: - project = await db.get_user_project(user_id, project_uuid) + project = None + if include_templates: + project = await db.get_template_project(project_uuid) - # TODO: how to handle when database has an invalid project schema??? - # Notice that db model does not include a check on project schema. - validate_project(app, project) - return project + if not project: + project = await db.get_user_project(user_id, project_uuid) - except ProjectNotFoundError: - raise web.HTTPNotFound(reason="Project not found") + # TODO: how to handle when database has an invalid project schema??? + # Notice that db model does not include a check on project schema. + validate_project(app, project) + return project async def clone_project( @@ -140,7 +134,7 @@ async def start_project_interactive_services( ) for service_uuid, service in project_needed_services.items() ] - await gather(*start_service_tasks) + await logged_gather(*start_service_tasks, reraise=True) async def delete_project(request: web.Request, project_uuid: str, user_id: int) -> None: @@ -150,7 +144,7 @@ async def remove_services_and_data(): await remove_project_interactive_services(user_id, project_uuid, request.app) await delete_project_data(request, project_uuid, user_id) - ensure_future(remove_services_and_data()) + fire_and_forget_task(remove_services_and_data()) @observe(event="SIGNAL_PROJECT_CLOSE") @@ -168,7 +162,7 @@ async def remove_project_interactive_services( for service in list_of_services ] if stop_tasks: - await gather(*stop_tasks) + await logged_gather(*stop_tasks, reraise=False) async def delete_project_data( @@ -182,13 +176,8 @@ async def delete_project_from_db( request: web.Request, project_uuid: str, user_id: int ) -> None: db = request.config_dict[APP_PROJECT_DBAPI] - try: - await delete_pipeline_db(request.app, project_uuid) - await db.delete_user_project(user_id, project_uuid) - except ProjectNotFoundError: - # TODO: add flag in query to determine whether to respond if error? - raise web.HTTPNotFound - + await delete_pipeline_db(request.app, project_uuid) + await db.delete_user_project(user_id, project_uuid) # requests storage to delete all project's stored data await delete_data_folders_of_project(request.app, project_uuid, user_id) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py b/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py index cd01997fc5c..0ae3f78a3ef 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py @@ -1,20 +1,22 @@ """ Handlers for CRUD operations on /projects/ """ -import asyncio import json import logging from aiohttp import web from jsonschema import ValidationError +from servicelib.utils import fire_and_forget_task + from ..computation_api import update_pipeline_db from ..login.decorators import RQT_USERID_KEY, login_required from ..resource_manager.websocket_manager import managed_resource from ..security_api import check_permission from . import projects_api from .projects_db import APP_PROJECT_DBAPI -from .projects_exceptions import ProjectInvalidRightsError, ProjectNotFoundError +from .projects_exceptions import (ProjectInvalidRightsError, + ProjectNotFoundError) OVERRIDABLE_DOCUMENT_KEYS = ["name", "description", "thumbnail", "prjOwner"] # TODO: validate these against api/specs/webserver/v0/components/schemas/project-v0.0.1.json @@ -90,7 +92,8 @@ async def create_projects(request: web.Request): except ValidationError: raise web.HTTPBadRequest(reason="Invalid project data") - + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project not found") except ProjectInvalidRightsError: raise web.HTTPUnauthorized @@ -147,15 +150,18 @@ async def get_project(request: web.Request): from .projects_api import get_project_for_user project_uuid = request.match_info.get("project_id") + try: + project = await get_project_for_user( + request.app, + project_uuid=project_uuid, + user_id=request[RQT_USERID_KEY], + include_templates=True, + ) - project = await get_project_for_user( - request.app, - project_uuid=project_uuid, - user_id=request[RQT_USERID_KEY], - include_templates=True, - ) - - return {"data": project} + return {"data": project} + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") + @login_required @@ -220,21 +226,26 @@ async def delete_project(request: web.Request): # first check if the project exists user_id = request[RQT_USERID_KEY] project_uuid = request.match_info.get("project_id") - project = await projects_api.get_project_for_user( - request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True - ) - with managed_resource(user_id, None, request.app) as rt: - other_users = await rt.find_users_of_resource("project_id", project_uuid) - if other_users: - message = "Project is opened by another user. It cannot be deleted." - if user_id in other_users: - message = ( - "Project is still open. It cannot be deleted until it is closed." - ) - # we cannot delete that project - raise web.HTTPForbidden(reason=message) - - await projects_api.delete_project(request, project_uuid, user_id) + try: + project = await projects_api.get_project_for_user( + request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True + ) + with managed_resource(user_id, None, request.app) as rt: + other_users = await rt.find_users_of_resource("project_id", project_uuid) + if other_users: + message = "Project is opened by another user. It cannot be deleted." + if user_id in other_users: + message = ( + "Project is still open. It cannot be deleted until it is closed." + ) + # we cannot delete that project + raise web.HTTPForbidden(reason=message) + + + await projects_api.delete_project(request, project_uuid, user_id) + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") + raise web.HTTPNoContent(content_type="application/json") @@ -244,26 +255,27 @@ async def open_project(request: web.Request) -> web.Response: # TODO: replace by decorator since it checks again authentication await check_permission(request, "project.open") - # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! - from .projects_api import get_project_for_user - user_id = request[RQT_USERID_KEY] project_uuid = request.match_info.get("project_id") client_session_id = await request.json() + try: + with managed_resource(user_id, client_session_id, request.app) as rt: + # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! + from .projects_api import get_project_for_user + project = await get_project_for_user( + request.app, + project_uuid=project_uuid, + user_id=user_id, + include_templates=True, + ) + await rt.add("project_id", project_uuid) - with managed_resource(user_id, client_session_id, request.app) as rt: - project = await get_project_for_user( - request.app, - project_uuid=project_uuid, - user_id=user_id, - include_templates=True, - ) - await rt.add("project_id", project_uuid) - - # user id opened project uuid - await projects_api.start_project_interactive_services(request, project, user_id) + # user id opened project uuid + await projects_api.start_project_interactive_services(request, project, user_id) - return {"data": project} + return {"data": project} + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") @login_required @@ -275,28 +287,32 @@ async def close_project(request: web.Request) -> web.Response: project_uuid = request.match_info.get("project_id") client_session_id = await request.json() - # ensure the project exists - # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! - from .projects_api import get_project_for_user + try: + # ensure the project exists + # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! + from .projects_api import get_project_for_user - with managed_resource(user_id, client_session_id, request.app) as rt: - project = await get_project_for_user( - request.app, - project_uuid=project_uuid, - user_id=user_id, - include_templates=True, - ) - await rt.remove("project_id") - other_users = await rt.find_users_of_resource("project_id", project_uuid) - if not other_users: - # only remove the services if no one else is using them now - asyncio.ensure_future( - projects_api.remove_project_interactive_services( - user_id, project_uuid, request.app - ) + with managed_resource(user_id, client_session_id, request.app) as rt: + project = await get_project_for_user( + request.app, + project_uuid=project_uuid, + user_id=user_id, + include_templates=True, ) + await rt.remove("project_id") + other_users = await rt.find_users_of_resource("project_id", project_uuid) + if not other_users: + # only remove the services if no one else is using them now + fire_and_forget_task( + projects_api.remove_project_interactive_services( + user_id, project_uuid, request.app + ) + ) - raise web.HTTPNoContent(content_type="application/json") + raise web.HTTPNoContent(content_type="application/json") + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") + @login_required @@ -304,22 +320,26 @@ async def get_active_project(request: web.Request) -> web.Response: await check_permission(request, "project.read") user_id = request[RQT_USERID_KEY] client_session_id = request.query["client_session_id"] - project = None - with managed_resource(user_id, client_session_id, request.app) as rt: - # get user's projects - list_project_ids = await rt.find("project_id") - if list_project_ids: - # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! - from .projects_api import get_project_for_user - project = await get_project_for_user( - request.app, - project_uuid=list_project_ids[0], - user_id=user_id, - include_templates=True, - ) + try: + project = None + with managed_resource(user_id, client_session_id, request.app) as rt: + # get user's projects + list_project_ids = await rt.find("project_id") + if list_project_ids: + # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! + from .projects_api import get_project_for_user + + project = await get_project_for_user( + request.app, + project_uuid=list_project_ids[0], + user_id=user_id, + include_templates=True, + ) - return {"data": project} + return {"data": project} + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project not found") @login_required @@ -330,24 +350,28 @@ async def create_node(request: web.Request) -> web.Response: project_uuid = request.match_info.get("project_id") body = await request.json() - # ensure the project exists - # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! - from .projects_api import get_project_for_user + try: + # ensure the project exists + # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! + from .projects_api import get_project_for_user - await get_project_for_user( - request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True - ) - data = { - "node_id": await projects_api.add_project_node( - request, - project_uuid, - user_id, - body["service_key"], - body["service_version"], - body["service_id"] if "service_id" in body else None, + await get_project_for_user( + request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True ) - } - return web.json_response({"data": data}, status=web.HTTPCreated.status_code) + data = { + "node_id": await projects_api.add_project_node( + request, + project_uuid, + user_id, + body["service_key"], + body["service_version"], + body["service_id"] if "service_id" in body else None, + ) + } + return web.json_response({"data": data}, status=web.HTTPCreated.status_code) + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") + @login_required @@ -357,18 +381,22 @@ async def get_node(request: web.Request) -> web.Response: user_id = request[RQT_USERID_KEY] project_uuid = request.match_info.get("project_id") node_uuid = request.match_info.get("node_id") - # ensure the project exists - # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! - from .projects_api import get_project_for_user + try: + # ensure the project exists + # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! + from .projects_api import get_project_for_user - await get_project_for_user( - request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True - ) + await get_project_for_user( + request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True + ) - node_details = await projects_api.get_project_node( - request, project_uuid, user_id, node_uuid - ) - return {"data": node_details} + node_details = await projects_api.get_project_node( + request, project_uuid, user_id, node_uuid + ) + return {"data": node_details} + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") + @login_required @@ -378,17 +406,20 @@ async def delete_node(request: web.Request) -> web.Response: user_id = request[RQT_USERID_KEY] project_uuid = request.match_info.get("project_id") node_uuid = request.match_info.get("node_id") - # ensure the project exists - # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! - from .projects_api import get_project_for_user + try: + # ensure the project exists + # TODO: temporary hidden until get_handlers_from_namespace refactor to seek marked functions instead! + from .projects_api import get_project_for_user - await get_project_for_user( - request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True - ) + await get_project_for_user( + request.app, project_uuid=project_uuid, user_id=user_id, include_templates=True + ) - await projects_api.delete_project_node(request, project_uuid, user_id, node_uuid) + await projects_api.delete_project_node(request, project_uuid, user_id, node_uuid) - raise web.HTTPNoContent(content_type="application/json") + raise web.HTTPNoContent(content_type="application/json") + except ProjectNotFoundError: + raise web.HTTPNotFound(reason=f"Project {project_uuid} not found") @login_required diff --git a/services/web/server/src/simcore_service_webserver/resource_manager/garbage_collector.py b/services/web/server/src/simcore_service_webserver/resource_manager/garbage_collector.py index a7911f39af2..3fe25762d70 100644 --- a/services/web/server/src/simcore_service_webserver/resource_manager/garbage_collector.py +++ b/services/web/server/src/simcore_service_webserver/resource_manager/garbage_collector.py @@ -12,6 +12,7 @@ from aiohttp import web from servicelib.observer import emit +from servicelib.utils import logged_gather from .config import APP_GARBAGE_COLLECTOR_KEY, get_garbage_collector_interval from .registry import RedisResourceRegistry, get_registry @@ -53,7 +54,8 @@ async def collect_garbage(registry: RedisResourceRegistry, app: web.Application) logger.debug( "removing resource entry: %s: %s", other_keys, resources ) - await asyncio.gather(*remove_tasks) + await logged_gather(*remove_tasks, reraise=False) + logger.debug( "the resources %s:%s of %s may be now safely closed", resource_name, diff --git a/services/web/server/src/simcore_service_webserver/socketio/events.py b/services/web/server/src/simcore_service_webserver/socketio/events.py index a07e00d5111..d7477aca481 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/events.py +++ b/services/web/server/src/simcore_service_webserver/socketio/events.py @@ -2,15 +2,19 @@ This module takes care of sending events to the connected webclient through the socket.io interface """ -import asyncio import json +import logging from typing import Any, Dict, List from aiohttp.web import Application +from servicelib.utils import fire_and_forget_task + from ..resource_manager.websocket_manager import managed_resource from .config import AsyncServer, get_socket_server +log = logging.getLogger(__name__) + async def post_messages( app: Application, user_id: str, messages: Dict[str, Any] @@ -22,30 +26,5 @@ async def post_messages( for sid in socket_ids: # We only send the data to the right sockets # Notice that there might be several tabs open - tasks = [ - sio.emit(event_name, json.dumps(data), room=sid) - for event_name, data in messages.items() - ] - asyncio.ensure_future( - asyncio.gather( - *tasks - ) # TODO: PC->SAN??, return_exceptions=True othewise will error '_GatheringFuture exception was never retrieved' - ) - - -# FIXME: PC->SAN: I wonder if here is the reason for this unhandled -# -# { -# "txt": " exception=OSError()>", -# "type": "", -# "done": true, -# "cancelled": false, -# "stack": null, -# "exception": ": " -# }, -# and https://github.com/miguelgrinberg/python-engineio/blob/master/engineio/async_drivers/aiohttp.py#L114) shows that ``IOError = OSError`` is raised -# when received data is corrupted!! -# -# -# It might be that sio.emit raise exception, which propagates throw gather -# + for event_name, data in messages.items(): + fire_and_forget_task(sio.emit(event_name, json.dumps(data), room=sid)) diff --git a/services/web/server/src/simcore_service_webserver/socketio/handlers.py b/services/web/server/src/simcore_service_webserver/socketio/handlers.py index 2fb99cf8edc..4e28450a6e2 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/handlers.py +++ b/services/web/server/src/simcore_service_webserver/socketio/handlers.py @@ -11,9 +11,10 @@ from typing import Dict, List, Optional from aiohttp import web +from socketio.exceptions import ConnectionRefusedError as socket_io_connection_error + from servicelib.observer import observe -from socketio.exceptions import \ - ConnectionRefusedError as socket_io_connection_error +from servicelib.utils import fire_and_forget_task, logged_gather from ..login.decorators import RQT_USERID_KEY, login_required from ..resource_manager.websocket_manager import managed_resource @@ -76,12 +77,13 @@ async def disconnect_other_sockets(sio, sockets: List[str]) -> None: sio.emit("logout", to=sid, data={"reason": "user logged out"}) for sid in sockets ] - await asyncio.gather(*logout_tasks, return_exceptions=True) + await logged_gather(*logout_tasks, reraise=False) + # let the client react await asyncio.sleep(3) # ensure disconnection is effective disconnect_tasks = [sio.disconnect(sid=sid) for sid in sockets] - await asyncio.gather(*disconnect_tasks, return_exceptions=True) + await logged_gather(*disconnect_tasks) @observe(event="SIGNAL_USER_LOGOUT") @@ -102,7 +104,7 @@ async def user_logged_out( sockets = await rt.find_socket_ids() if sockets: # let's do it as a task so it does not block us here - asyncio.ensure_future(disconnect_other_sockets(sio, sockets)) + fire_and_forget_task(disconnect_other_sockets(sio, sockets)) async def disconnect(sid: str, app: web.Application) -> None: @@ -123,4 +125,8 @@ async def disconnect(sid: str, app: web.Application) -> None: await rt.remove_socket_id() else: # this should not happen!! - log.error("Unknown client diconnected sid: %s, session %s", sid, str(socketio_session)) + log.error( + "Unknown client diconnected sid: %s, session %s", + sid, + str(socketio_session), + ) diff --git a/services/web/server/src/simcore_service_webserver/storage_api.py b/services/web/server/src/simcore_service_webserver/storage_api.py index fb969adc196..cac23f492cb 100644 --- a/services/web/server/src/simcore_service_webserver/storage_api.py +++ b/services/web/server/src/simcore_service_webserver/storage_api.py @@ -70,9 +70,6 @@ async def delete_data_folders_of_project(app, project_id, user_id): ) await _delete(session, url) - # asyncio.ensure_future(_delete(session, url)) - # loop = asyncio.get_event_loop() - # loop.run_until_complete(_delete(session, url)) async def delete_data_folders_of_project_node( diff --git a/services/web/server/tests/unit/with_dbs/test_projects.py b/services/web/server/tests/unit/with_dbs/test_projects.py index f482783f7e1..29a11190736 100644 --- a/services/web/server/tests/unit/with_dbs/test_projects.py +++ b/services/web/server/tests/unit/with_dbs/test_projects.py @@ -41,8 +41,33 @@ API_PREFIX = "/" + API_VERSION +def future_with_result(result) -> Future: + f = Future() + f.set_result(result) + return f + + +@pytest.fixture +def mocked_director_subsystem(mocker): + mock_director_api = { + "get_running_interactive_services": mocker.patch( + "simcore_service_webserver.director.director_api.get_running_interactive_services", + return_value=future_with_result(""), + ), + "start_service": mocker.patch( + "simcore_service_webserver.director.director_api.start_service", + return_value=future_with_result(""), + ), + "stop_service": mocker.patch( + "simcore_service_webserver.director.director_api.stop_service", + return_value=future_with_result(""), + ), + } + return mock_director_api + + @pytest.fixture -def client(loop, aiohttp_client, app_cfg, postgres_service): +def client(loop, aiohttp_client, app_cfg, postgres_service, mocked_director_subsystem): # def client(loop, aiohttp_client, app_cfg): # <<<< FOR DEVELOPMENT. DO NOT REMOVE. # config app @@ -567,32 +592,26 @@ async def test_delete_project( user_project, expected, storage_subsystem_mock, - mocker, + mocked_director_subsystem, fake_services, ): # DELETE /v0/projects/{project_id} fakes = fake_services(5) - mock_director_api = mocker.patch( - "simcore_service_webserver.director.director_api.get_running_interactive_services", - return_value=Future(), - ) - mock_director_api.return_value.set_result(fakes) - - mock_director_api_stop_services = mocker.patch( - "simcore_service_webserver.director.director_api.stop_service", - return_value=Future(), - ) - mock_director_api_stop_services.return_value.set_result("") + mocked_director_subsystem[ + "get_running_interactive_services" + ].return_value = future_with_result(fakes) url = client.app.router["delete_project"].url_for(project_id=user_project["uuid"]) resp = await client.delete(url) await assert_status(resp, expected) if resp.status == web.HTTPNoContent.status_code: - mock_director_api.assert_called_once() + mocked_director_subsystem[ + "get_running_interactive_services" + ].assert_called_once() calls = [call(client.server.app, service["service_uuid"]) for service in fakes] - mock_director_api_stop_services.has_calls(calls) + mocked_director_subsystem["stop_service"].has_calls(calls) # wait for the fire&forget to run await sleep(2) # check if database entries are correctly removed, there should be no project available here @@ -612,21 +631,15 @@ async def test_delete_project( ], ) async def test_open_project( - client, logged_user, user_project, client_session_id, expected, mocker + client, + logged_user, + user_project, + client_session_id, + expected, + mocked_director_subsystem, ): # POST /v0/projects/{project_id}:open # open project - mock_director_api = mocker.patch( - "simcore_service_webserver.director.director_api.get_running_interactive_services", - return_value=Future(), - ) - mock_director_api.return_value.set_result("") - - mock_director_api_start_service = mocker.patch( - "simcore_service_webserver.director.director_api.start_service", - return_value=Future(), - ) - mock_director_api_start_service.return_value.set_result("") url = client.app.router["open_project"].url_for(project_id=user_project["uuid"]) resp = await client.post(url, json=client_session_id()) @@ -649,7 +662,7 @@ async def test_open_project( user_id=logged_user["id"], ) ) - mock_director_api_start_service.assert_has_calls(calls) + mocked_director_subsystem["start_service"].assert_has_calls(calls) @pytest.mark.parametrize( @@ -667,48 +680,43 @@ async def test_close_project( user_project, client_session_id, expected, - mocker, + mocked_director_subsystem, fake_services, ): # POST /v0/projects/{project_id}:close fakes = fake_services(5) assert len(fakes) == 5 - mock_director_api = mocker.patch( - "simcore_service_webserver.director.director_api.get_running_interactive_services", - return_value=Future(), - ) - mock_director_api.return_value.set_result(fakes) - - mock_director_api_start_service = mocker.patch( - "simcore_service_webserver.director.director_api.start_service", - return_value=Future(), - ) - mock_director_api_start_service.return_value.set_result("") + mocked_director_subsystem[ + "get_running_interactive_services" + ].return_value = future_with_result(fakes) - mock_director_api_stop_services = mocker.patch( - "simcore_service_webserver.director.director_api.stop_service", - return_value=Future(), - ) - mock_director_api_stop_services.return_value.set_result("") # open project client_id = client_session_id() url = client.app.router["open_project"].url_for(project_id=user_project["uuid"]) resp = await client.post(url, json=client_id) + if resp.status == web.HTTPOk.status_code: - mock_director_api.assert_called_once() - mock_director_api.reset_mock() - else: - mock_director_api.assert_not_called() + calls = [ + call(client.server.app, user_project["uuid"], logged_user["id"]), + ] + mocked_director_subsystem[ + "get_running_interactive_services" + ].has_calls(calls) + mocked_director_subsystem["get_running_interactive_services"].reset_mock() + # close project url = client.app.router["close_project"].url_for(project_id=user_project["uuid"]) resp = await client.post(url, json=client_id) await assert_status(resp, expected) if resp.status == web.HTTPNoContent.status_code: - mock_director_api.assert_called_once() + calls = [ + call(client.server.app, user_project["uuid"], None), + call(client.server.app, user_project["uuid"], logged_user["id"]), + ] + mocked_director_subsystem["get_running_interactive_services"].has_calls(calls) calls = [call(client.server.app, service["service_uuid"]) for service in fakes] - mock_director_api_stop_services.has_calls(calls) - else: - mock_director_api.assert_not_called() + mocked_director_subsystem["stop_service"].has_calls(calls) + @pytest.mark.parametrize( @@ -727,25 +735,8 @@ async def test_get_active_project( client_session_id, expected, socketio_client, - mocker, + mocked_director_subsystem, ): - mock_director_api = mocker.patch( - "simcore_service_webserver.director.director_api.get_running_interactive_services", - return_value=Future(), - ) - mock_director_api.return_value.set_result("") - - mock_director_api_start_service = mocker.patch( - "simcore_service_webserver.director.director_api.start_service", - return_value=Future(), - ) - mock_director_api_start_service.return_value.set_result("") - - mock_director_api_stop_services = mocker.patch( - "simcore_service_webserver.director.director_api.stop_service", - return_value=Future(), - ) - mock_director_api_stop_services.return_value.set_result("") # login with socket using client session id client_id1 = client_session_id() sio = await socketio_client(client_id1) @@ -810,26 +801,8 @@ async def test_delete_shared_project_forbidden( socketio_client, client_session_id, expected, - mocker, + mocked_director_subsystem, ): - mock_director_api = mocker.patch( - "simcore_service_webserver.director.director_api.get_running_interactive_services", - return_value=Future(), - ) - mock_director_api.return_value.set_result("") - - mock_director_api_start_service = mocker.patch( - "simcore_service_webserver.director.director_api.start_service", - return_value=Future(), - ) - mock_director_api_start_service.return_value.set_result("") - - mock_director_api_stop_services = mocker.patch( - "simcore_service_webserver.director.director_api.stop_service", - return_value=Future(), - ) - mock_director_api_stop_services.return_value.set_result("") - # service in project = await mocked_dynamic_service(logged_user["id"], empty_user_project["uuid"]) service = await mocked_dynamic_service(logged_user["id"], user_project["uuid"]) # open project in tab1 @@ -868,24 +841,11 @@ async def test_project_node_lifetime( create_exp, get_exp, deletion_exp, - mocker, + mocked_director_subsystem, storage_subsystem_mock, + mocker, ): - mock_director_api_get_running_services = mocker.patch( - "simcore_service_webserver.director.director_api.get_running_interactive_services", - return_value=Future(), - ) - mock_director_api_start_service = mocker.patch( - "simcore_service_webserver.director.director_api.start_service", - return_value=Future(), - ) - mock_director_api_start_service.return_value.set_result("") - mock_director_api_stop_services = mocker.patch( - "simcore_service_webserver.director.director_api.stop_service", - return_value=Future(), - ) - mock_director_api_stop_services.return_value.set_result("") mock_storage_api_delete_data_folders_of_project_node = mocker.patch( "simcore_service_webserver.projects.projects_handlers.projects_api.delete_data_folders_of_project_node", return_value=Future(), @@ -899,27 +859,29 @@ async def test_project_node_lifetime( data, errors = await assert_status(resp, create_exp) node_id = "wrong_node_id" if resp.status == web.HTTPCreated.status_code: - mock_director_api_start_service.assert_called_once() + mocked_director_subsystem["start_service"].assert_called_once() assert "node_id" in data node_id = data["node_id"] else: - mock_director_api_start_service.assert_not_called() + mocked_director_subsystem["start_service"].assert_not_called() # create a new NOT dynamic node... - mock_director_api_start_service.reset_mock() + mocked_director_subsystem["start_service"].reset_mock() url = client.app.router["create_node"].url_for(project_id=user_project["uuid"]) body = {"service_key": "some/notdynamic/key", "service_version": "1.3.4"} resp = await client.post(url, json=body) data, errors = await assert_status(resp, create_exp) node_id_2 = "wrong_node_id" if resp.status == web.HTTPCreated.status_code: - mock_director_api_start_service.assert_not_called() + mocked_director_subsystem["start_service"].assert_not_called() assert "node_id" in data node_id_2 = data["node_id"] else: - mock_director_api_start_service.assert_not_called() + mocked_director_subsystem["start_service"].assert_not_called() # get the node state - mock_director_api_get_running_services.return_value.set_result( + mocked_director_subsystem[ + "get_running_interactive_services" + ].return_value = future_with_result( [{"service_uuid": node_id, "service_state": "running"}] ) url = client.app.router["get_node"].url_for( @@ -932,8 +894,10 @@ async def test_project_node_lifetime( assert data["service_state"] == "running" # get the NOT dynamic node state - mock_director_api_get_running_services.return_value = Future() - mock_director_api_get_running_services.return_value.set_result("") + mocked_director_subsystem[ + "get_running_interactive_services" + ].return_value = future_with_result("") + url = client.app.router["get_node"].url_for( project_id=user_project["uuid"], node_id=node_id_2 ) @@ -944,24 +908,23 @@ async def test_project_node_lifetime( assert data["service_state"] == "idle" # delete the node - mock_director_api_get_running_services.return_value = Future() - mock_director_api_get_running_services.return_value.set_result( - [{"service_uuid": node_id}] - ) + mocked_director_subsystem[ + "get_running_interactive_services" + ].return_value = future_with_result([{"service_uuid": node_id}]) url = client.app.router["delete_node"].url_for( project_id=user_project["uuid"], node_id=node_id ) resp = await client.delete(url) data, errors = await assert_status(resp, deletion_exp) if resp.status == web.HTTPNoContent.status_code: - mock_director_api_stop_services.assert_called_once() + mocked_director_subsystem["stop_service"].assert_called_once() mock_storage_api_delete_data_folders_of_project_node.assert_called_once() else: - mock_director_api_stop_services.assert_not_called() + mocked_director_subsystem["stop_service"].assert_not_called() mock_storage_api_delete_data_folders_of_project_node.assert_not_called() # delete the NOT dynamic node - mock_director_api_stop_services.reset_mock() + mocked_director_subsystem["stop_service"].reset_mock() mock_storage_api_delete_data_folders_of_project_node.reset_mock() # mock_director_api_get_running_services.return_value.set_result([{"service_uuid": node_id}]) url = client.app.router["delete_node"].url_for( @@ -970,10 +933,10 @@ async def test_project_node_lifetime( resp = await client.delete(url) data, errors = await assert_status(resp, deletion_exp) if resp.status == web.HTTPNoContent.status_code: - mock_director_api_stop_services.assert_not_called() + mocked_director_subsystem["stop_service"].assert_not_called() mock_storage_api_delete_data_folders_of_project_node.assert_called_once() else: - mock_director_api_stop_services.assert_not_called() + mocked_director_subsystem["stop_service"].assert_not_called() mock_storage_api_delete_data_folders_of_project_node.assert_not_called()