Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/check gather return exceptions #1360

Merged
5 changes: 4 additions & 1 deletion packages/service-library/src/servicelib/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ async def emit(event: str, *args, **kwargs):

coroutines = [observer(*args, **kwargs) for observer in event_registry[event]]
# all coroutine called in //
await asyncio.gather(*coroutines, return_exceptions=True)
results = await asyncio.gather(*coroutines, return_exceptions=True)
for value in results:
if isinstance(value, Exception):
log.error("Exception occured while emitting event: %s", str(value))


def observe(event: str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ async def list_services(app: web.Application, service_type: ServiceType) -> List
for repo_details in results:
if repo_details and isinstance(repo_details, list):
services.extend(repo_details)
elif isinstance(repo_details, Exception):
_logger.error("Exception occured while listing services %s", repo_details)
return services

async def list_interactive_service_dependencies(app: web.Application, service_key: str, service_tag: str) -> List[Dict]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from servicelib.application_keys import APP_DB_ENGINE_KEY

from .projects import projects_api
from .projects import projects_api, projects_exceptions
from .projects.projects_models import projects, user_to_projects
from .socketio.events import post_messages

Expand Down Expand Up @@ -87,9 +87,15 @@ async def listen(app: web.Application):
)
async for row in conn.execute(query):
user_id = row["user_id"]
node_data = await projects_api.update_project_node_outputs(
app, user_id, project_id, node_id, data=task_output
)
try:
node_data = await projects_api.update_project_node_outputs(
app, user_id, project_id, node_id, data=task_output
)
except projects_exceptions.ProjectNotFoundError:
log.exception("Project %s not found", project_id)
except projects_exceptions.NodeNotFoundError:
log.exception("Node %s ib project %s not found", node_id, project_id)

messages = {"nodeUpdated": {"Node": node_id, "Data": node_data}}
await post_messages(app, user_id, messages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .computation_config import CONFIG_SECTION_NAME as CONFIG_RABBIT_SECTION
from .login.decorators import login_required
from .projects.projects_api import get_project_for_user
from .projects.projects_exceptions import ProjectNotFoundError
from .security_api import check_permission

log = logging.getLogger(__file__)
Expand Down Expand Up @@ -51,8 +52,12 @@ async def update_pipeline(request: web.Request) -> web.Response:

user_id, project_id = await _process_request(request)

project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
try:
project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
except ProjectNotFoundError:
raise web.HTTPNotFound(reason=f"Project {project_id} not found")


raise web.HTTPNoContent()

Expand All @@ -67,8 +72,11 @@ async def start_pipeline(request: web.Request) -> web.Response:

user_id, project_id = await _process_request(request)

project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
try:
project = await get_project_for_user(request.app, project_id, user_id)
await update_pipeline_db(request.app, project_id, project["workbench"])
except ProjectNotFoundError:
raise web.HTTPNotFound(reason=f"Project {project_id} not found")

# commit the tasks to celery
_ = get_celery(request.app).send_task(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# pylint: disable=too-many-arguments

import logging
from asyncio import ensure_future, gather
from asyncio import gather
from pprint import pformat
from typing import Dict, Optional
from uuid import uuid4
Expand All @@ -27,12 +27,12 @@
delete_data_folders_of_project,
delete_data_folders_of_project_node,
)
from ..utils import fire_and_forget_task
from .config import CONFIG_SECTION_NAME
from .projects_db import APP_PROJECT_DBAPI
from .projects_exceptions import NodeNotFoundError, ProjectNotFoundError
from .projects_exceptions import NodeNotFoundError
from .projects_utils import clone_project_document


log = logging.getLogger(__name__)


Expand All @@ -59,23 +59,19 @@ async def get_project_for_user(
:rtype: Dict
"""

try:
db = app[APP_PROJECT_DBAPI]

project = None
if include_templates:
project = await db.get_template_project(project_uuid)
db = app[APP_PROJECT_DBAPI]

if not project:
project = await db.get_user_project(user_id, project_uuid)
project = None
if include_templates:
project = await db.get_template_project(project_uuid)

# TODO: how to handle when database has an invalid project schema???
# Notice that db model does not include a check on project schema.
validate_project(app, project)
return project
if not project:
project = await db.get_user_project(user_id, project_uuid)

except ProjectNotFoundError:
raise web.HTTPNotFound(reason="Project not found")
# TODO: how to handle when database has an invalid project schema???
# Notice that db model does not include a check on project schema.
validate_project(app, project)
return project


async def clone_project(
Expand Down Expand Up @@ -150,7 +146,7 @@ async def remove_services_and_data():
await remove_project_interactive_services(user_id, project_uuid, request.app)
await delete_project_data(request, project_uuid, user_id)

ensure_future(remove_services_and_data())
fire_and_forget_task(remove_services_and_data())


@observe(event="SIGNAL_PROJECT_CLOSE")
Expand All @@ -168,7 +164,10 @@ async def remove_project_interactive_services(
for service in list_of_services
]
if stop_tasks:
await gather(*stop_tasks)
results = await gather(*stop_tasks, return_exceptions=True)
for value in results:
if isinstance(value, Exception):
log.error("Exception occured while stopping service: %s", value)


async def delete_project_data(
Expand All @@ -182,13 +181,8 @@ async def delete_project_from_db(
request: web.Request, project_uuid: str, user_id: int
) -> None:
db = request.config_dict[APP_PROJECT_DBAPI]
try:
await delete_pipeline_db(request.app, project_uuid)
await db.delete_user_project(user_id, project_uuid)
except ProjectNotFoundError:
# TODO: add flag in query to determine whether to respond if error?
raise web.HTTPNotFound

await delete_pipeline_db(request.app, project_uuid)
await db.delete_user_project(user_id, project_uuid)
# requests storage to delete all project's stored data
await delete_data_folders_of_project(request.app, project_uuid, user_id)

Expand Down
Loading