Skip to content

Commit

Permalink
rest: set workflow status to pending after starting the workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
audrium committed Apr 22, 2021
1 parent 24c3a62 commit 5d6f418
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 10 deletions.
1 change: 1 addition & 0 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def _update_workflow_status(workflow, status, logs):
RunStatus.created,
RunStatus.running,
RunStatus.queued,
RunStatus.pending,
]
if status not in alive_statuses:
_delete_workflow_engine_pod(workflow)
Expand Down
23 changes: 19 additions & 4 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def start_workflow(workflow, parameters):
"""Start a workflow."""

def _start_workflow_db(workflow, parameters):
workflow.status = RunStatus.running
workflow.status = RunStatus.pending
if parameters:
workflow.input_parameters = parameters.get("input_parameters")
workflow.operational_options = parameters.get("operational_options")
Expand All @@ -58,19 +58,21 @@ def _start_workflow_db(workflow, parameters):
current_db_sessions = Session.object_session(workflow)
kwrm = KubernetesWorkflowRunManager(workflow)

workflow_status_name = get_workflow_status_name(workflow)
failure_message = (
"Workflow {id_} could not be started because it {verb} " "already {status}."
).format(
id_=workflow.id_,
verb=get_workflow_status_change_verb(workflow.status.name),
status=str(workflow.status.name),
verb=get_workflow_status_change_verb(workflow_status_name),
status=str(workflow_status_name),
)
if "restart" in parameters.keys():
if parameters["restart"]:
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)
elif workflow.status not in [RunStatus.created, RunStatus.queued]:
Expand Down Expand Up @@ -125,6 +127,18 @@ def get_workflow_name(workflow):
return workflow.name + "." + str(workflow.run_number)


def get_workflow_status_name(workflow):
"""Return workflow status name
:param workflow: Workflow object which name should be returned.
:type workflow: reana-commons.models.Workflow
"""
status = workflow.status
if status == RunStatus.pending:
status = RunStatus.queued
return status.name


def build_workflow_logs(workflow, steps=None, paginate=None):
"""Return the logs for all jobs of a workflow."""
query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
Expand Down Expand Up @@ -193,6 +207,7 @@ def delete_workflow(workflow, all_runs=False, workspace=False):
RunStatus.deleted,
RunStatus.failed,
RunStatus.queued,
RunStatus.pending,
]:
try:
to_be_deleted = [workflow]
Expand All @@ -217,7 +232,7 @@ def delete_workflow(workflow, all_runs=False, workspace=False):
"message": "Workflow successfully deleted",
"workflow_id": workflow.id_,
"workflow_name": get_workflow_name(workflow),
"status": workflow.status.name,
"status": get_workflow_status_name(workflow),
"user": str(workflow.owner_id),
}
),
Expand Down
5 changes: 4 additions & 1 deletion reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
create_workflow_workspace,
get_specification_diff,
get_workflow_name,
get_workflow_status_name,
get_workflow_progress,
get_workspace_diff,
use_paginate_args,
Expand Down Expand Up @@ -225,6 +226,8 @@ def get_workflows(paginate=None): # noqa
query = query.filter(Workflow.name.ilike("%{}%".format(search)))
if status_list:
workflow_status = [RunStatus[status] for status in status_list.split(",")]
if RunStatus.queued in workflow_status:
workflow_status.append(RunStatus.pending)
query = query.filter(Workflow.status.in_(workflow_status))
if sort not in ["asc", "desc"]:
sort = "desc"
Expand All @@ -234,7 +237,7 @@ def get_workflows(paginate=None): # noqa
workflow_response = {
"id": workflow.id_,
"name": get_workflow_name(workflow),
"status": workflow.status.name,
"status": get_workflow_status_name(workflow),
"user": user_uuid,
"created": workflow.created.strftime(WORKFLOW_TIME_FORMAT),
"progress": get_workflow_progress(workflow),
Expand Down
7 changes: 4 additions & 3 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
delete_workflow,
get_current_job_progress,
get_workflow_name,
get_workflow_status_name,
get_workflow_progress,
start_workflow,
stop_workflow,
Expand Down Expand Up @@ -308,7 +309,7 @@ def get_workflow_status(workflow_id_or_name): # noqa
"id": workflow.id_,
"name": get_workflow_name(workflow),
"created": workflow.created.strftime(WORKFLOW_TIME_FORMAT),
"status": workflow.status.name,
"status": get_workflow_status_name(workflow),
"progress": get_workflow_progress(workflow),
"user": user_uuid,
"logs": json.dumps(workflow_logs),
Expand Down Expand Up @@ -514,7 +515,7 @@ def set_workflow_status(workflow_id_or_name): # noqa
"message": "Workflow successfully launched",
"workflow_id": str(workflow.id_),
"workflow_name": get_workflow_name(workflow),
"status": workflow.status.name,
"status": get_workflow_status_name(workflow),
"user": str(workflow.owner_id),
}
),
Expand All @@ -532,7 +533,7 @@ def set_workflow_status(workflow_id_or_name): # noqa
"message": "Workflow successfully stopped",
"workflow_id": workflow.id_,
"workflow_name": get_workflow_name(workflow),
"status": workflow.status.name,
"status": get_workflow_status_name(workflow),
"user": str(workflow.owner_id),
}
),
Expand Down
1 change: 1 addition & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RunStatus.created,
RunStatus.failed,
RunStatus.finished,
RunStatus.pending,
RunStatus.stopped,
pytest.param(RunStatus.deleted, marks=pytest.mark.xfail),
pytest.param(RunStatus.running, marks=pytest.mark.xfail),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from reana_workflow_controller.workflow_run_manager import WorkflowRunManager

status_dict = {
START: RunStatus.running,
START: RunStatus.queued,
STOP: RunStatus.finished,
}

Expand Down Expand Up @@ -608,7 +608,7 @@ def test_start_already_started_workflow(
assert res.status_code == 409
expected_message = (
"Workflow {0} could not be started because"
" it is already running."
" it has been already queued."
).format(workflow_created_uuid)
assert json_response.get("message") == expected_message

Expand Down

0 comments on commit 5d6f418

Please sign in to comment.