Skip to content

Commit

Permalink
Archive failed workflows
Browse files Browse the repository at this point in the history
Failed workflows, including those that failed due to job errors and
container build errors, will now be archived with a state of
`Archived/Failed`.
  • Loading branch information
jtronge committed Apr 15, 2024
1 parent 2d82d99 commit 27d53b2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
2 changes: 1 addition & 1 deletion beeflow/common/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,5 +248,5 @@ def check_completed(workflow):

def check_workflow_failed(workflow):
"""Ensure that the workflow completed in a Failed state."""
ci_assert(workflow.status == 'Failed',
ci_assert(workflow.status == 'Archived/Failed',
f'workflow did not fail as expected (final status: {workflow.status})')
19 changes: 8 additions & 11 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
db_path = wf_utils.get_db_path()


def archive_workflow(db, wf_id):
def archive_workflow(db, wf_id, final_state=None):
"""Archive a workflow after completion."""
# Archive Config
workflow_dir = wf_utils.get_workflow_dir(wf_id)
shutil.copyfile(os.path.expanduser("~") + '/.config/beeflow/bee.conf',
workflow_dir + '/' + 'bee.conf')

db.workflows.update_workflow_state(wf_id, 'Archived')
wf_utils.update_wf_status(wf_id, 'Archived')
wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
wf_utils.update_wf_status(wf_id, wf_state)

bee_workdir = wf_utils.get_bee_workdir()
archive_dir = os.path.join(bee_workdir, 'archives')
Expand Down Expand Up @@ -143,23 +144,19 @@ def put(self):
# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
if job_state == 'FAILED':
wfi.set_workflow_state('FAILED')
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
set_dependent_tasks_dep_fail(db, wfi, wf_id, task)
log.info("Workflow failed")
log.info("Shutting down GDB")
wf_id = wfi.workflow_id
# Should failed workflows be archived?
# archive_workflow(db, wf_id)
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)

if job_state == 'BUILD_FAIL':
log.error(f'Workflow failed due to failed container build for task {task.name}')
wfi.set_workflow_state('Failed')
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)

resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to'
f'{job_state}')), 200)
Expand Down

0 comments on commit 27d53b2

Please sign in to comment.