Skip to content

Commit

Permalink
consumer: daemon to not die on errors
Browse files Browse the repository at this point in the history
* Currently if an exception would be raised during workflow status
  update, the changes would not be saved in DB, causing for example
  inconsistencies if calls to Kubernetes fail for some reason,
  leaving workflows in running state forever
  (addresses reanahub/reana#478).
  • Loading branch information
Diego Rodriguez committed Feb 12, 2021
1 parent 345558b commit 956691d
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 33 deletions.
86 changes: 53 additions & 33 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datetime import datetime

import requests
from kombu import Queue
from kubernetes.client.rest import ApiException
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
from reana_commons.consumer import BaseConsumer
Expand All @@ -32,6 +33,7 @@
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.attributes import flag_modified

from reana_workflow_controller.config import (
Expand All @@ -50,9 +52,11 @@
class JobStatusConsumer(BaseConsumer):
"""Consumer of jobs-status queue."""

def __init__(self):
def __init__(self, connection=None, queue=None):
"""Initialise JobStatusConsumer class."""
super(JobStatusConsumer, self).__init__(queue="jobs-status")
super(JobStatusConsumer, self).__init__(
connection=connection, queue=queue or Queue("jobs-status")
)

def get_consumers(self, Consumer, channel):
"""Implement providing kombu.Consumers with queues/callbacks."""
Expand All @@ -70,35 +74,48 @@ def on_message(self, body, message):
body_dict = json.loads(body)
workflow_uuid = body_dict.get("workflow_uuid")
if workflow_uuid:
workflow = (
Session.query(Workflow).filter_by(id_=workflow_uuid).one_or_none()
)
next_status = body_dict.get("status")
if next_status:
next_status = RunStatus(next_status)
print(
" [x] Received workflow_uuid: {0} status: {1}".format(
workflow_uuid, next_status
)
try:
workflow = (
Session.query(Workflow).filter_by(id_=workflow_uuid).one_or_none()
)
logs = body_dict.get("logs") or ""
if workflow.can_transition_to(next_status):
_update_workflow_status(workflow, next_status, logs)
if "message" in body_dict and body_dict.get("message"):
msg = body_dict["message"]
if "progress" in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
# Caching: calculate input hash and store in JobCache
if "caching_info" in msg:
_update_job_cache(msg)
Session.commit()
else:
next_status = body_dict.get("status")
if next_status:
next_status = RunStatus(next_status)
print(
" [x] Received workflow_uuid: {0} status: {1}".format(
workflow_uuid, next_status
)
)
logs = body_dict.get("logs") or ""
if workflow.can_transition_to(next_status):
_update_workflow_status(workflow, next_status, logs)
if "message" in body_dict and body_dict.get("message"):
msg = body_dict["message"]
if "progress" in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
# Caching: calculate input hash and store in JobCache
if "caching_info" in msg:
_update_job_cache(msg)
Session.commit()
else:
logging.error(
f"Cannot transition workflow {workflow.id_}"
f" from status {workflow.status} to"
f" {next_status}."
)
except REANAWorkflowControllerError as rwce:
logging.error(traceback.format_exc())
logging.error(e)
except SQLAlchemyError as sae:
logging.error(
f"Cannot transition workflow {workflow.id_}"
f" from status {workflow.status} to"
f" {next_status}."
f"Something went wrong while querying the database for workflow: {workflow.id_}"
)
logging.error(e)
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error while processing workflow")
logging.error(e)


def _update_workflow_status(workflow, status, logs):
Expand All @@ -112,8 +129,14 @@ def _update_workflow_status(workflow, status, logs):
RunStatus.running,
RunStatus.queued,
]
if status not in alive_statuses:
_delete_workflow_engine_pod(workflow)
if statups not in alive_statuses:
try:
_delete_workflow_engine_pod(workflow)
except REANAWorkflowControllerError as rwce:
logging.error(
f"Could not clean up workflow engine for workflow {workflow.id_}"
)
workflow.logs += "Workflow engine logs could not be retrieved.\n"


def _update_commit_status(workflow, status):
Expand Down Expand Up @@ -250,6 +273,3 @@ def _delete_workflow_engine_pod(workflow):
raise REANAWorkflowControllerError(
"Workflow engine pod cound not be deleted {}.".format(e)
)
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))
45 changes: 45 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANAN Workflow Controller consumer tests."""

import pytest
from kubernetes.client.rest import ApiException
from mock import Mock, patch
from reana_commons.config import MQ_DEFAULT_QUEUES
from reana_commons.consumer import BaseConsumer
from reana_commons.publisher import WorkflowStatusPublisher
from reana_db.models import RunStatus

from reana_workflow_controller.consumer import JobStatusConsumer


def test_workflow_finish_and_kubernetes_not_available(
in_memory_queue_connection, sample_serial_workflow_in_db, consume_queue,
):
"""Test workflow finish with a Kubernetes connection troubles."""
sample_serial_workflow_in_db.status = RunStatus.running
next_status = RunStatus.failed
job_status_consumer = JobStatusConsumer(connection=in_memory_queue_connection)
job_status_consumer.queue
workflow_status_publisher = WorkflowStatusPublisher(
connection=in_memory_queue_connection, queue=job_status_consumer.queue
)
workflow_status_publisher.publish_workflow_status(
str(sample_serial_workflow_in_db.id_), next_status.value,
)
k8s_corev1_api_client_mock = Mock()
k8s_corev1_api_client_mock.delete_namespaced_job = Mock(
side_effect=ApiException(reason="Could not delete job.", status=404)
)
with patch(
"reana_workflow_controller.consumer.current_k8s_corev1_api_client",
k8s_corev1_api_client_mock,
):
consume_queue(job_status_consumer, limit=1)
assert sample_serial_workflow_in_db.status == next_status

0 comments on commit 956691d

Please sign in to comment.