Skip to content

Commit

Permalink
Merge pull request #806 from lanl/issue688/failed-dependent-steps
Browse files Browse the repository at this point in the history
Ensure failed tasks cause dependent tasks to fail
  • Loading branch information
pagrubel authored Apr 15, 2024
2 parents 7c50b74 + 6cc0322 commit 46d2e40
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 16 deletions.
7 changes: 6 additions & 1 deletion beeflow/common/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ def task_states(self):
"""Get the task states of the workflow."""
return bee_client.query(self.wf_id)[1]

def get_task_state_by_name(self, name):
"""Get the state of a task by name."""
task_states = self.task_states
return [task_state for _, task_name, task_state in task_states if task_name == name][0]

def cleanup(self):
"""Clean up any leftover workflow data."""
# Remove the generated tarball
Expand Down Expand Up @@ -243,5 +248,5 @@ def check_completed(workflow):

def check_workflow_failed(workflow):
"""Ensure that the workflow completed in a Failed state."""
ci_assert(workflow.status == 'Failed',
ci_assert(workflow.status == 'Archived/Failed',
f'workflow did not fail as expected (final status: {workflow.status})')
21 changes: 21 additions & 0 deletions beeflow/common/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ def build_failure(outer_workdir):
f'task was not in state BUILD_FAIL as expected: {task_state}')


@TEST_RUNNER.add()
def dependent_tasks_fail(outer_workdir):
"""Test that dependent tasks don't run after a failure."""
workdir = os.path.join(outer_workdir, uuid.uuid4().hex)
os.makedirs(workdir)
workflow = utils.Workflow('failure-dependent-tasks',
'ci/test_workflows/failure-dependent-tasks',
main_cwl='workflow.cwl', job_file='input.yml',
workdir=workdir, containers=[])
yield [workflow]
utils.check_workflow_failed(workflow)
# Check each task state
fail_state = workflow.get_task_state_by_name('fail')
utils.ci_assert(fail_state == 'FAILED',
f'task fail did not fail as expected: {fail_state}')
for task in ['dependent0', 'dependent1', 'dependent2']:
task_state = workflow.get_task_state_by_name(task)
utils.ci_assert(task_state == 'DEP_FAIL',
f'task {task} did not get state DEP_FAIL as expected: {task_state}')


@TEST_RUNNER.add(ignore=True)
def checkpoint_restart(outer_workdir):
"""Test the clamr-ffmpeg checkpoint restart workflow."""
Expand Down
5 changes: 4 additions & 1 deletion beeflow/common/wf_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,10 @@ def command(self):
nonpositional_inputs = []
for input_ in self.inputs:
if input_.value is None:
raise ValueError("trying to construct command for task with missing input value")
raise ValueError(
("trying to construct command for task with missing input value "
f"(id: {input_.id})")
)

if input_.position is not None:
positional_inputs.append(input_)
Expand Down
46 changes: 32 additions & 14 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
db_path = wf_utils.get_db_path()


def archive_workflow(db, wf_id):
def archive_workflow(db, wf_id, final_state=None):
"""Archive a workflow after completion."""
# Archive Config
workflow_dir = wf_utils.get_workflow_dir(wf_id)
shutil.copyfile(os.path.expanduser("~") + '/.config/beeflow/bee.conf',
workflow_dir + '/' + 'bee.conf')

db.workflows.update_workflow_state(wf_id, 'Archived')
wf_utils.update_wf_status(wf_id, 'Archived')
wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
wf_utils.update_wf_status(wf_id, wf_state)

bee_workdir = wf_utils.get_bee_workdir()
archive_dir = os.path.join(bee_workdir, 'archives')
Expand All @@ -40,6 +41,18 @@ def archive_workflow(db, wf_id):
subprocess.call(['tar', '-czf', archive_path, wf_id], cwd=workflows_dir)


def set_dependent_tasks_dep_fail(db, wfi, wf_id, task):
"""Recursively set all dependent task states of this task to DEP_FAIL."""
# List of tasks whose states have already been updated
set_tasks = [task]
while len(set_tasks) > 0:
dep_tasks = wfi.get_dependent_tasks(set_tasks.pop())
for dep_task in dep_tasks:
wfi.set_task_state(dep_task, 'DEP_FAIL')
db.workflows.update_task_state(dep_task.id, wf_id, 'DEP_FAIL')
set_tasks.extend(dep_tasks)


class WFUpdate(Resource):
"""Class to interact with an existing workflow."""

Expand Down Expand Up @@ -109,13 +122,14 @@ def put(self):
wf_utils.schedule_submit_tasks(wf_id, tasks)
return make_response(jsonify(status='Task {task_id} restarted'))

if job_state in ('COMPLETED', 'FAILED'):
if job_state == 'COMPLETED':
for output in task.outputs:
if output.glob is not None:
wfi.set_task_output(task, output.id, output.glob)
else:
wfi.set_task_output(task, output.id, "temp")
tasks = wfi.finalize_task(task)
log.info(f'next tasks to run: {tasks}')
wf_state = wfi.get_workflow_state()
if tasks and wf_state != 'PAUSED':
wf_utils.schedule_submit_tasks(wf_id, tasks)
Expand All @@ -126,19 +140,23 @@ def put(self):
archive_workflow(db, wf_id)
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
if wf_state == 'FAILED':
log.info("Workflow failed")
log.info("Shutting down GDB")
wf_id = wfi.workflow_id
archive_workflow(db, wf_id)
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)

# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
if job_state == 'FAILED':
set_dependent_tasks_dep_fail(db, wfi, wf_id, task)
log.info("Workflow failed")
log.info("Shutting down GDB")
wf_id = wfi.workflow_id
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)

if job_state == 'BUILD_FAIL':
log.error(f'Workflow failed due to failed container build for task {task.name}')
wfi.set_workflow_state('Failed')
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)

resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to'
f'{job_state}')), 200)
Expand Down
2 changes: 2 additions & 0 deletions ci/test_workflows/failure-dependent-tasks/input.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
fname: some_file_that_doesnt_exist
cat_argument: -n
105 changes: 105 additions & 0 deletions ci/test_workflows/failure-dependent-tasks/workflow.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
cwlVersion: v1.2
class: Workflow

inputs:
fname: File
cat_argument: string

outputs:
fail_stdout:
type: File
outputSource: fail/fail_stdout
dependent0_stdout:
type: File
outputSource: dependent0/dependent_stdout
dependent1_stdout:
type: File
outputSource: dependent1/dependent_stdout
dependent2_stdout:
type: File
outputSource: dependent2/dependent_stdout

steps:
fail:
run:
class: CommandLineTool
baseCommand: [ls]
stdout: fail.txt
inputs:
fname:
type: File
inputBinding:
position: 1
outputs:
fail_stdout:
type: stdout
in:
fname: fname
out: [fail_stdout]
# Two duplicate tasks that depend on the task above, which should fail and cause these to not run.
dependent0:
run:
cwlVersion: v1.2
class: CommandLineTool
baseCommand: [cat]
stdout: dependent.txt
inputs:
cat_argument:
type: string
inputBinding:
position: 1
file_to_cat:
type: File
inputBinding:
position: 1
outputs:
dependent_stdout:
type: stdout
in:
file_to_cat: fail/fail_stdout
cat_argument: cat_argument
out: [dependent_stdout]
dependent1:
run:
cwlVersion: v1.2
class: CommandLineTool
baseCommand: [cat]
stdout: dependent1.txt
inputs:
cat_argument:
type: string
inputBinding:
position: 1
file_to_cat:
type: File
inputBinding:
position: 2
outputs:
dependent_stdout:
type: stdout
in:
cat_argument: cat_argument
file_to_cat: fail/fail_stdout
out: [dependent_stdout]
dependent2:
run:
cwlVersion: v1.2
class: CommandLineTool
baseCommand: [cat]
stdout: dependent1.txt
inputs:
cat_argument:
type: string
inputBinding:
position: 1
file_to_cat:
type: File
inputBinding:
position: 2
outputs:
dependent_stdout:
type: stdout
in:
cat_argument: cat_argument
file_to_cat: dependent1/dependent_stdout
out: [dependent_stdout]

0 comments on commit 46d2e40

Please sign in to comment.