Skip to content

Commit

Permalink
Merge pull request #824 from lanl/archive-failed-checkpoint-workflows
Browse files Browse the repository at this point in the history
Archive failed checkpoint restart workflows
  • Loading branch information
pagrubel authored Apr 18, 2024
2 parents e90da17 + ae3deb7 commit 54a6747
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def archive_workflow(db, wf_id, final_state=None):
subprocess.call(['tar', '-czf', archive_path, wf_id], cwd=workflows_dir)


def archive_fail_workflow(db, wf_id):
"""Archive and fail a workflow."""
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)


def set_dependent_tasks_dep_fail(db, wfi, wf_id, task):
"""Recursively set all dependent task states of this task to DEP_FAIL."""
# List of tasks whose states have already been updated
Expand Down Expand Up @@ -111,10 +118,7 @@ def put(self):
new_task = wfi.restart_task(task, checkpoint_file)
if new_task is None:
log.info('No more restarts')
wf_state = wfi.get_workflow_state()
wfi.set_workflow_state('Failed')
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
archive_fail_workflow(db, wf_id)
return make_response(jsonify(status=f'Task {task_id} set to {job_state}'))
db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING")
# Submit the restart task
Expand Down Expand Up @@ -148,15 +152,11 @@ def put(self):
log.info("Workflow failed")
log.info("Shutting down GDB")
wf_id = wfi.workflow_id
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
archive_fail_workflow(db, wf_id)

if job_state == 'BUILD_FAIL':
log.error(f'Workflow failed due to failed container build for task {task.name}')
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
archive_fail_workflow(db, wf_id)

resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to'
f'{job_state}')), 200)
Expand Down

0 comments on commit 54a6747

Please sign in to comment.