Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/check gather return exceptions #1360

Merged
5 changes: 3 additions & 2 deletions packages/service-library/src/servicelib/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

"""

import asyncio
import logging
from collections import defaultdict
from functools import wraps

from .utils import logged_gather

log = logging.getLogger(__name__)

event_registry = defaultdict(list)
Expand All @@ -19,7 +20,7 @@ async def emit(event: str, *args, **kwargs):

coroutines = [observer(*args, **kwargs) for observer in event_registry[event]]
# all coroutine called in //
await asyncio.gather(*coroutines, return_exceptions=True)
await logged_gather(*coroutines)


def observe(event: str):
Expand Down
36 changes: 35 additions & 1 deletion packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
I order to avoid cyclic dependences, please
DO NOT IMPORT ANYTHING from .
"""
import asyncio
import logging
from pathlib import Path
from typing import Any, Coroutine, List, Optional, Union

logger = logging.getLogger(__name__)


def is_osparc_repo_dir(path: Path) -> bool:
Expand All @@ -14,7 +19,7 @@ def is_osparc_repo_dir(path: Path) -> bool:
return all(d in got for d in expected)


def search_osparc_repo_dir(start, max_iterations=8):
def search_osparc_repo_dir(start: Union[str, Path], max_iterations=8) -> Optional[Path]:
""" Returns path to root repo dir or None if it does not exists

NOTE: assumes starts is a path within repo
Expand All @@ -27,3 +32,32 @@ def search_osparc_repo_dir(start, max_iterations=8):
iteration_number += 1

return root_dir if is_osparc_repo_dir(root_dir) else None


# FUTURES
def fire_and_forget_task(obj: Union[Coroutine, asyncio.Future]) -> None:
future = asyncio.ensure_future(obj)

def log_exception_callback(fut: asyncio.Future):
try:
fut.result()
except Exception: # pylint: disable=broad-except
logger.exception("Error occured while running task!")

future.add_done_callback(log_exception_callback)


# // tasks
async def logged_gather(*tasks, reraise: bool = True) -> List[Any]:
# all coroutine called in // and we take care of returning the exceptions
results = await asyncio.gather(*tasks, return_exceptions=True)
for value in results:
if isinstance(value, Exception):
if reraise:
raise value
logger.error(
"Exception occured while running %s: %s",
str(tasks[results.index(value)]),
str(value),
)
return results
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ async def list_services(app: web.Application, service_type: ServiceType) -> List
for repo_details in results:
if repo_details and isinstance(repo_details, list):
services.extend(repo_details)
elif isinstance(repo_details, Exception):
_logger.error("Exception occured while listing services %s", repo_details)
return services

async def list_interactive_service_dependencies(app: web.Application, service_key: str, service_tag: str) -> List[Dict]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from servicelib.application_keys import APP_DB_ENGINE_KEY

from .projects import projects_api
from .projects import projects_api, projects_exceptions
from .projects.projects_models import projects, user_to_projects
from .socketio.events import post_messages

Expand Down Expand Up @@ -87,9 +87,15 @@ async def listen(app: web.Application):
)
async for row in conn.execute(query):
user_id = row["user_id"]
node_data = await projects_api.update_project_node_outputs(
app, user_id, project_id, node_id, data=task_output
)
try:
node_data = await projects_api.update_project_node_outputs(
app, user_id, project_id, node_id, data=task_output
)
except projects_exceptions.ProjectNotFoundError:
log.exception("Project %s not found", project_id)
except projects_exceptions.NodeNotFoundError:
log.exception("Node %s ib project %s not found", node_id, project_id)

messages = {"nodeUpdated": {"Node": node_id, "Data": node_data}}
await post_messages(app, user_id, messages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .computation_config import CONFIG_SECTION_NAME as CONFIG_RABBIT_SECTION
from .login.decorators import login_required
from .projects.projects_api import get_project_for_user
from .projects.projects_exceptions import ProjectNotFoundError
from .security_api import check_permission

log = logging.getLogger(__file__)
Expand Down Expand Up @@ -51,8 +52,12 @@ async def update_pipeline(request: web.Request) -> web.Response:

user_id, project_id = await _process_request(request)

project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
try:
project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
except ProjectNotFoundError:
raise web.HTTPNotFound(reason=f"Project {project_id} not found")


raise web.HTTPNoContent()

Expand All @@ -67,8 +72,11 @@ async def start_pipeline(request: web.Request) -> web.Response:

user_id, project_id = await _process_request(request)

project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
try:
project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
except ProjectNotFoundError:
raise web.HTTPNotFound(reason=f"Project {project_id} not found")

# commit the tasks to celery
_ = get_celery(request.app).send_task(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# pylint: disable=too-many-arguments

import asyncio
import logging
import urllib
from typing import Dict, List, Optional

from aiohttp import web
from yarl import URL

from .config import get_client_session, get_config
from servicelib.utils import logged_gather

from . import director_exceptions
from .config import get_client_session, get_config

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,7 +100,7 @@ async def stop_services(
app, user_id=user_id, project_id=project_id
)
stop_tasks = [stop_service(app, service_uuid) for service_uuid in services]
await asyncio.gather(*stop_tasks)
await logged_gather(*stop_tasks, reraise=False)


async def get_service_by_key_version(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# pylint: disable=too-many-arguments

import logging
from asyncio import ensure_future, gather
from pprint import pformat
from typing import Dict, Optional
from uuid import uuid4
Expand All @@ -19,20 +18,19 @@
from servicelib.application_keys import APP_JSONSCHEMA_SPECS_KEY
from servicelib.jsonschema_validation import validate_instance
from servicelib.observer import observe
from servicelib.utils import fire_and_forget_task, logged_gather

from ..computation_api import delete_pipeline_db
from ..director import director_api
from ..storage_api import copy_data_folders_from_project # mocked in unit-tests
from ..storage_api import (
delete_data_folders_of_project,
delete_data_folders_of_project_node,
)
from ..storage_api import \
copy_data_folders_from_project # mocked in unit-tests
from ..storage_api import (delete_data_folders_of_project,
delete_data_folders_of_project_node)
from .config import CONFIG_SECTION_NAME
from .projects_db import APP_PROJECT_DBAPI
from .projects_exceptions import NodeNotFoundError, ProjectNotFoundError
from .projects_exceptions import NodeNotFoundError
from .projects_utils import clone_project_document


log = logging.getLogger(__name__)


Expand All @@ -59,23 +57,19 @@ async def get_project_for_user(
:rtype: Dict
"""

try:
db = app[APP_PROJECT_DBAPI]

project = None
if include_templates:
project = await db.get_template_project(project_uuid)
db = app[APP_PROJECT_DBAPI]

if not project:
project = await db.get_user_project(user_id, project_uuid)
project = None
if include_templates:
project = await db.get_template_project(project_uuid)

# TODO: how to handle when database has an invalid project schema???
# Notice that db model does not include a check on project schema.
validate_project(app, project)
return project
if not project:
project = await db.get_user_project(user_id, project_uuid)

except ProjectNotFoundError:
raise web.HTTPNotFound(reason="Project not found")
# TODO: how to handle when database has an invalid project schema???
# Notice that db model does not include a check on project schema.
validate_project(app, project)
return project


async def clone_project(
Expand Down Expand Up @@ -140,7 +134,7 @@ async def start_project_interactive_services(
)
for service_uuid, service in project_needed_services.items()
]
await gather(*start_service_tasks)
await logged_gather(*start_service_tasks, reraise=True)


async def delete_project(request: web.Request, project_uuid: str, user_id: int) -> None:
Expand All @@ -150,7 +144,7 @@ async def remove_services_and_data():
await remove_project_interactive_services(user_id, project_uuid, request.app)
await delete_project_data(request, project_uuid, user_id)

ensure_future(remove_services_and_data())
fire_and_forget_task(remove_services_and_data())


@observe(event="SIGNAL_PROJECT_CLOSE")
Expand All @@ -168,7 +162,7 @@ async def remove_project_interactive_services(
for service in list_of_services
]
if stop_tasks:
await gather(*stop_tasks)
await logged_gather(*stop_tasks, reraise=False)


async def delete_project_data(
Expand All @@ -182,13 +176,8 @@ async def delete_project_from_db(
request: web.Request, project_uuid: str, user_id: int
) -> None:
db = request.config_dict[APP_PROJECT_DBAPI]
try:
await delete_pipeline_db(request.app, project_uuid)
await db.delete_user_project(user_id, project_uuid)
except ProjectNotFoundError:
# TODO: add flag in query to determine whether to respond if error?
raise web.HTTPNotFound

await delete_pipeline_db(request.app, project_uuid)
await db.delete_user_project(user_id, project_uuid)
# requests storage to delete all project's stored data
await delete_data_folders_of_project(request.app, project_uuid, user_id)

Expand Down
Loading