diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index b17bc1f2b..e868a8bde 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -459,11 +459,9 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)): tasks_status = resp.json()['tasks_status'] wf_status = resp.json()['wf_status'] - if tasks_status == 'Unavailable': - typer.echo(wf_status) - else: - typer.echo(wf_status) - typer.echo(tasks_status) + typer.echo(wf_status) + for _task_id, task_name, task_state in tasks_status: + typer.echo(f'{task_name}--{task_state}') logging.info('Query workflow: {resp.text}') return wf_status, tasks_status diff --git a/beeflow/common/build/build_driver.py b/beeflow/common/build/build_driver.py index b76a64136..84ae5abb7 100644 --- a/beeflow/common/build/build_driver.py +++ b/beeflow/common/build/build_driver.py @@ -1,27 +1,6 @@ """Abstract base class for the handling build systems.""" from abc import ABC, abstractmethod -import jsonpickle - - -def arg2task(task_arg): - """Convert JSON encoded task to Task object. - - The build driver will expect a Task object, and the build - interface starts with a JSON representation of the Task object. - """ - return jsonpickle.decode(task_arg) - - -def task2arg(task): - """Convert Task object to JSON encoded string. - - The build interface needs to pass Task data on the command line, - because each compute node needs to understand the Task description. - JSON format is a convenient way to describe the Task object at the - command line. - """ - return jsonpickle.encode(task) class BuildDriver(ABC): diff --git a/beeflow/common/build/container_drivers.py b/beeflow/common/build/container_drivers.py index 06f53423d..555356dac 100644 --- a/beeflow/common/build/container_drivers.py +++ b/beeflow/common/build/container_drivers.py @@ -9,6 +9,7 @@ import tempfile from beeflow.common.config_driver import BeeConfig as bc from beeflow.common import log as bee_logging +from beeflow.common.build.utils import ContainerBuildError from beeflow.common.build.build_driver import BuildDriver from beeflow.common.crt.charliecloud_driver import CharliecloudDriver as crt_driver @@ -125,8 +126,7 @@ def process_docker_pull(self, addr=None, force=False): # If Requirement is set but not specified, and param empty, do nothing and error. if self.task.requirements == {} and not addr: - log.error("dockerPull set but no image path specified.") - return 1 + raise ContainerBuildError("dockerPull set but no image path specified.") # If no image specified and no image required, nothing to do. if not task_addr and not addr: log.info('No image specified and no image required, nothing to do.') @@ -170,8 +170,7 @@ def process_docker_load(self): log.warning('Charliecloud does not have the concept of a layered image tarball.') log.warning('Did you mean to use dockerImport?') if req_dockerload: - log.warning('dockerLoad specified as requirement.') - return 1 + raise ContainerBuildError('dockerLoad is not supported') return 0 def process_docker_file(self, task_dockerfile=None, force=False): @@ -185,14 +184,14 @@ def process_docker_file(self, task_dockerfile=None, force=False): # beeflow:containerName is always processed before dockerFile, so safe to assume it exists # otherwise, raise an error. if self.container_name is None: - log.error("dockerFile may not be specified without beeflow:containerName") - return 1 + raise ContainerBuildError( + "dockerFile may not be specified without beeflow:containerName" + ) # Need dockerfile in order to build, else fail task_dockerfile = self.get_docker_req('dockerFile') if not task_dockerfile: - log.error("dockerFile not specified as task attribute or parameter.") - return 1 + raise ContainerBuildError("dockerFile not specified as task attribute or parameter.") # Create context directory to use as Dockerfile context, use container name so user # can prep the directory with COPY sources as needed. @@ -281,7 +280,7 @@ def process_docker_image_id(self, param_imageid=None): # If task and parameter still doesn't specify ImageId, consider this an error. if not self.docker_image_id: return 0 - return 1 + raise ContainerBuildError('dockerImageId is required but not found') def process_docker_output_directory(self, param_output_directory=None): """Get and process the CWL compliant dockerOutputDirectory dockerRequirement. @@ -308,8 +307,9 @@ def process_copy_container(self, force=False): # Need container_path to know how dockerfile should be named, else fail task_container_path = self.get_docker_req('beeflow:copyContainer') if not task_container_path: - log.error("beeflow:copyContainer: You must specify the path to an existing container.") - return 1 + raise ContainerBuildError( + "beeflow:copyContainer: You must specify the path to an existing container." + ) if self.container_name: copy_target = '/'.join([self.container_archive, self.container_name + '.tar.gz']) @@ -345,8 +345,9 @@ def process_container_name(self): """ task_container_name = self.get_docker_req('beeflow:containerName') if not task_container_name and self.docker_image_id is None: - log.error("beeflow:containerName: You must specify the containerName or dockerImageId") - return 1 + raise ContainerBuildError( + "beeflow:containerName: You must specify the containerName or dockerImageId" + ) self.container_name = task_container_name log.info(f'Setting container_name to: {self.container_name}') return 0 diff --git a/beeflow/common/build/utils.py b/beeflow/common/build/utils.py new file mode 100644 index 000000000..1cd32eba4 --- /dev/null +++ b/beeflow/common/build/utils.py @@ -0,0 +1,26 @@ +"""Container build utility code.""" +import jsonpickle + + +def arg2task(task_arg): + """Convert JSON encoded task to Task object. + + The build driver will expect a Task object, and the build + interface starts with a JSON representation of the Task object. + """ + return jsonpickle.decode(task_arg) + + +def task2arg(task): + """Convert Task object to JSON encoded string. + + The build interface needs to pass Task data on the command line, + because each compute node needs to understand the Task description. + JSON format is a convenient way to describe the Task object at the + command line. + """ + return jsonpickle.encode(task) + + +class ContainerBuildError(Exception): + """Cotnainer build error class.""" diff --git a/beeflow/common/build_interfaces.py b/beeflow/common/build_interfaces.py index 95288009f..1aa1944a7 100644 --- a/beeflow/common/build_interfaces.py +++ b/beeflow/common/build_interfaces.py @@ -14,7 +14,7 @@ from beeflow.common.build.container_drivers import CharliecloudBuildDriver from beeflow.common.config_driver import BeeConfig as bc from beeflow.common import log as bee_logging -from beeflow.common.build.build_driver import arg2task +from beeflow.common.build.utils import arg2task, ContainerBuildError log = bee_logging.setup(__name__) @@ -47,9 +47,11 @@ def build_main(task): return_code = return_obj.returncode except AttributeError: return_code = int(return_obj) - except CalledProcessError: + except CalledProcessError as error: return_code = 1 - log.warning(f'There was a problem executing {op_dict["op_name"]}!') + raise ContainerBuildError( + f'There was a problem executing {op_dict["op_name"]}!' + ) from error # Case 1: Not the last operation spec'd, but is a terminal operation. if op_dict["op_terminal"] and return_code == 0: op_values = [None, None, None, True] diff --git a/beeflow/common/crt/charliecloud_driver.py b/beeflow/common/crt/charliecloud_driver.py index 19d0fafe9..c9218d230 100644 --- a/beeflow/common/crt/charliecloud_driver.py +++ b/beeflow/common/crt/charliecloud_driver.py @@ -8,7 +8,7 @@ from beeflow.common.crt.crt_driver import (ContainerRuntimeDriver, ContainerRuntimeResult, Command, CommandType) from beeflow.common.config_driver import BeeConfig as bc -from beeflow.common.build.build_driver import task2arg +from beeflow.common.build.utils import task2arg from beeflow.common.container_path import convert_path from beeflow.common import log as bee_logging diff --git a/beeflow/common/crt/singularity_driver.py b/beeflow/common/crt/singularity_driver.py index 829a66d0c..7461f44a0 100644 --- a/beeflow/common/crt/singularity_driver.py +++ b/beeflow/common/crt/singularity_driver.py @@ -4,7 +4,7 @@ """ from beeflow.common.crt.crt_driver import (ContainerRuntimeDriver, ContainerRuntimeResult, Command) -from beeflow.common.build.build_driver import task2arg +from beeflow.common.build.utils import task2arg class SingularityDriver(ContainerRuntimeDriver): diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py index 2bf4435e1..981e0b97a 100644 --- a/beeflow/common/integration/utils.py +++ b/beeflow/common/integration/utils.py @@ -91,6 +91,11 @@ def status(self): """Get the status of the workflow.""" return bee_client.query(self.wf_id)[0] + @property + def task_states(self): + """Get the task states of the workflow.""" + return bee_client.query(self.wf_id)[1] + def cleanup(self): """Clean up any leftover workflow data.""" # Remove the generated tarball @@ -233,4 +238,10 @@ def check_path_exists(path): def check_completed(workflow): """Ensure the workflow has a completed status.""" - ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') + ci_assert(workflow.status == 'Archived', f'bad workflow status {workflow.status}') + + +def check_workflow_failed(workflow): + """Ensure that the workflow completed in a Failed state.""" + ci_assert(workflow.status == 'Failed', + f'workflow did not fail as expected (final status: {workflow.status})') diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py index 704616c66..1dbc5b65b 100644 --- a/beeflow/common/integration_test.py +++ b/beeflow/common/integration_test.py @@ -194,6 +194,22 @@ def multiple_workflows(outer_workdir): utils.check_path_exists(path) +@TEST_RUNNER.add() +def build_failure(outer_workdir): + """Test running a workflow with a bad container.""" + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow = utils.Workflow('build-failure', 'ci/test_workflows/build-failure', + main_cwl='workflow.cwl', job_file='input.yml', + workdir=workdir, containers=[]) + yield [workflow] + utils.check_workflow_failed(workflow) + # Only one task + task_state = workflow.task_states[0][2] + utils.ci_assert(task_state == 'BUILD_FAIL', + f'task was not in state BUILD_FAIL as expected: {task_state}') + + @TEST_RUNNER.add(ignore=True) def checkpoint_restart(outer_workdir): """Test the clamr-ffmpeg checkpoint restart workflow.""" @@ -220,8 +236,7 @@ def checkpoint_restart_failure(outer_workdir): main_cwl='workflow.cwl', job_file='input.yml', workdir=workdir, containers=[]) yield [workflow] - utils.ci_assert(workflow.status == 'Failed', - f'Workflow did not fail as expected (final status: {workflow.status})') + utils.check_workflow_failed(workflow) def test_input_callback(arg): diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index c07a4ca35..2c1360fbd 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -7,6 +7,7 @@ import jsonpickle from beeflow.task_manager import utils from beeflow.common import log as bee_logging +from beeflow.common.build.utils import ContainerBuildError from beeflow.common.build_interfaces import build_main @@ -38,6 +39,33 @@ def resolve_environment(task): build_main(task) +def submit_task(db, worker, task): + """Submit (or resubmit) a task.""" + try: + log.info(f'Resolving environment for task {task.name}') + resolve_environment(task) + log.info(f'Environment preparation complete for task {task.name}') + job_id, job_state = worker.submit_task(task) + log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}') + # place job in queue to monitor + db.job_queue.push(task=task, job_id=job_id, job_state=job_state) + # update_task_metadata(task.id, task_metadata) + except ContainerBuildError as err: + job_state = 'BUILD_FAIL' + log.error(f'Failed to build container for {task.name}: {err}') + log.error(f'{task.name} state: {job_state}') + except Exception as err: # noqa (we have to catch everything here) + # Set job state to failed + job_state = 'SUBMIT_FAIL' + log.error(f'Task Manager submit task {task.name} failed! \n {err}') + log.error(f'{task.name} state: {job_state}') + # Log the traceback information as well + log.error(traceback.format_exc()) + # Send the initial state to WFM + # update_task_state(task.id, job_state, metadata=task_metadata) + return job_state + + def submit_jobs(): """Submit all jobs currently in submit queue to the workload scheduler.""" db = utils.connect_db() @@ -45,26 +73,8 @@ def submit_jobs(): while db.submit_queue.count() >= 1: # Single value dictionary task = db.submit_queue.pop() - try: - log.info(f'Resolving environment for task {task.name}') - resolve_environment(task) - log.info(f'Environment preparation complete for task {task.name}') - job_id, job_state = worker.submit_task(task) - log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}') - # place job in queue to monitor - db.job_queue.push(task=task, job_id=job_id, job_state=job_state) - # update_task_metadata(task.id, task_metadata) - except Exception as err: # noqa (we have to catch everything here) - # Set job state to failed - job_state = 'SUBMIT_FAIL' - log.error(f'Task Manager submit task {task.name} failed! \n {err}') - log.error(f'{task.name} state: {job_state}') - # Log the traceback information as well - log.error(traceback.format_exc()) - finally: - # Send the initial state to WFM - # update_task_state(task.id, job_state, metadata=task_metadata) - update_task_state(task.workflow_id, task.id, job_state) + job_state = submit_task(db, worker, task) + update_task_state(task.workflow_id, task.id, job_state) def update_jobs(): @@ -99,6 +109,14 @@ def update_jobs(): update_task_state(task.workflow_id, task.id, 'FAILED') else: update_task_state(task.workflow_id, task.id, new_job_state) + # States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES + elif new_job_state in ('BOOT_FAIL', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED'): + # Don't update wfm, just resubmit + log.info(f'Task {task.name} in state {new_job_state}') + log.info(f'Resubmitting task {task.name}') + db.job_queue.remove_by_id(id_) + job_state = submit_task(db, worker, task) + update_task_state(task.workflow_id, task.id, job_state) else: update_task_state(task.workflow_id, task.id, new_job_state) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 7d238e874..0abc0443f 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -190,7 +190,8 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db): temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING") resp = client().get(f'/bee_wfm/v1/jobs/{WF_ID}') - assert 'RUNNING' in resp.json['tasks_status'] + tasks_status = resp.json['tasks_status'] + assert tasks_status[0][2] == 'RUNNING' or tasks_status[1][2] == 'RUNNING' def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 49b555483..d19e54ea7 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -42,13 +42,11 @@ def get(wf_id): if not tasks: log.info(f"Bad query for wf {wf_id}.") wf_status = 'No workflow with that ID is currently loaded' - tasks_status.append('Unavailable') resp = make_response(jsonify(tasks_status=tasks_status, wf_status=wf_status, status='not found'), 404) for task in tasks: - tasks_status.append(f"{task.name}--{task.state}") - tasks_status = '\n'.join(tasks_status) + tasks_status.append((task.id, task.name, task.state)) wf_status = db.workflows.get_workflow_state(wf_id) resp = make_response(jsonify(tasks_status=tasks_status, diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 5fe70ed32..8e557a163 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -99,6 +99,7 @@ def put(self): if new_task is None: log.info('No more restarts') wf_state = wfi.get_workflow_state() + wfi.set_workflow_state('Failed') wf_utils.update_wf_status(wf_id, 'Failed') db.workflows.update_workflow_state(wf_id, 'Failed') return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) @@ -133,6 +134,12 @@ def put(self): pid = db.workflows.get_gdb_pid(wf_id) dep_manager.kill_gdb(pid) + if job_state == 'BUILD_FAIL': + log.error(f'Workflow failed due to failed container build for task {task.name}') + wfi.set_workflow_state('Failed') + wf_utils.update_wf_status(wf_id, 'Failed') + db.workflows.update_workflow_state(wf_id, 'Failed') + resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to' f'{job_state}')), 200) return resp diff --git a/ci/test_workflows/build-failure/Dockerfile.build-failure b/ci/test_workflows/build-failure/Dockerfile.build-failure new file mode 100644 index 000000000..745fdb930 --- /dev/null +++ b/ci/test_workflows/build-failure/Dockerfile.build-failure @@ -0,0 +1,3 @@ +FROM some_nonexistent_container + +RUN touch /file diff --git a/ci/test_workflows/build-failure/input.yml b/ci/test_workflows/build-failure/input.yml new file mode 100644 index 000000000..633cf8836 --- /dev/null +++ b/ci/test_workflows/build-failure/input.yml @@ -0,0 +1 @@ +fname: /file diff --git a/ci/test_workflows/build-failure/workflow.cwl b/ci/test_workflows/build-failure/workflow.cwl new file mode 100644 index 000000000..b71987602 --- /dev/null +++ b/ci/test_workflows/build-failure/workflow.cwl @@ -0,0 +1,32 @@ +# Dummy workflow designed to fail at the container build stage +class: Workflow +cwlVersion: v1.2 + +inputs: + fname: string + +outputs: + step0_stdout: + type: File + outputSource: step0/step0_stdout + +steps: + step0: + run: + class: CommandLineTool + baseCommand: ls + stdout: step0_stdout.txt + inputs: + fname: + type: string + inputBinding: {} + outputs: + step0_stdout: + type: stdout + in: + fname: fname + out: [step0_stdout] + hints: + DockerRequirement: + dockerFile: "Dockerfile.build-failure" + beeflow:containerName: "build-failure"