Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial task manager resiliency and error handling #789

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,9 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)):

tasks_status = resp.json()['tasks_status']
wf_status = resp.json()['wf_status']
if tasks_status == 'Unavailable':
typer.echo(wf_status)
else:
typer.echo(wf_status)
typer.echo(tasks_status)
typer.echo(wf_status)
for _task_id, task_name, task_state in tasks_status:
typer.echo(f'{task_name}--{task_state}')

logging.info('Query workflow: {resp.text}')
return wf_status, tasks_status
Expand Down
21 changes: 0 additions & 21 deletions beeflow/common/build/build_driver.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,6 @@
"""Abstract base class for the handling build systems."""

from abc import ABC, abstractmethod
import jsonpickle


def arg2task(task_arg):
"""Convert JSON encoded task to Task object.

The build driver will expect a Task object, and the build
interface starts with a JSON representation of the Task object.
"""
return jsonpickle.decode(task_arg)


def task2arg(task):
"""Convert Task object to JSON encoded string.

The build interface needs to pass Task data on the command line,
because each compute node needs to understand the Task description.
JSON format is a convenient way to describe the Task object at the
command line.
"""
return jsonpickle.encode(task)


class BuildDriver(ABC):
Expand Down
27 changes: 14 additions & 13 deletions beeflow/common/build/container_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tempfile
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common import log as bee_logging
from beeflow.common.build.utils import ContainerBuildError
from beeflow.common.build.build_driver import BuildDriver
from beeflow.common.crt.charliecloud_driver import CharliecloudDriver as crt_driver

Expand Down Expand Up @@ -125,8 +126,7 @@ def process_docker_pull(self, addr=None, force=False):

# If Requirement is set but not specified, and param empty, do nothing and error.
if self.task.requirements == {} and not addr:
log.error("dockerPull set but no image path specified.")
return 1
raise ContainerBuildError("dockerPull set but no image path specified.")
# If no image specified and no image required, nothing to do.
if not task_addr and not addr:
log.info('No image specified and no image required, nothing to do.')
Expand Down Expand Up @@ -170,8 +170,7 @@ def process_docker_load(self):
log.warning('Charliecloud does not have the concept of a layered image tarball.')
log.warning('Did you mean to use dockerImport?')
if req_dockerload:
log.warning('dockerLoad specified as requirement.')
return 1
raise ContainerBuildError('dockerLoad is not supported')
return 0

def process_docker_file(self, task_dockerfile=None, force=False):
Expand All @@ -185,14 +184,14 @@ def process_docker_file(self, task_dockerfile=None, force=False):
# beeflow:containerName is always processed before dockerFile, so safe to assume it exists
# otherwise, raise an error.
if self.container_name is None:
log.error("dockerFile may not be specified without beeflow:containerName")
return 1
raise ContainerBuildError(
"dockerFile may not be specified without beeflow:containerName"
)

# Need dockerfile in order to build, else fail
task_dockerfile = self.get_docker_req('dockerFile')
if not task_dockerfile:
log.error("dockerFile not specified as task attribute or parameter.")
return 1
raise ContainerBuildError("dockerFile not specified as task attribute or parameter.")

# Create context directory to use as Dockerfile context, use container name so user
# can prep the directory with COPY sources as needed.
Expand Down Expand Up @@ -281,7 +280,7 @@ def process_docker_image_id(self, param_imageid=None):
# If task and parameter still doesn't specify ImageId, consider this an error.
if not self.docker_image_id:
return 0
return 1
raise ContainerBuildError('dockerImageId is required but not found')

def process_docker_output_directory(self, param_output_directory=None):
"""Get and process the CWL compliant dockerOutputDirectory dockerRequirement.
Expand All @@ -308,8 +307,9 @@ def process_copy_container(self, force=False):
# Need container_path to know how dockerfile should be named, else fail
task_container_path = self.get_docker_req('beeflow:copyContainer')
if not task_container_path:
log.error("beeflow:copyContainer: You must specify the path to an existing container.")
return 1
raise ContainerBuildError(
"beeflow:copyContainer: You must specify the path to an existing container."
)

if self.container_name:
copy_target = '/'.join([self.container_archive, self.container_name + '.tar.gz'])
Expand Down Expand Up @@ -345,8 +345,9 @@ def process_container_name(self):
"""
task_container_name = self.get_docker_req('beeflow:containerName')
if not task_container_name and self.docker_image_id is None:
log.error("beeflow:containerName: You must specify the containerName or dockerImageId")
return 1
raise ContainerBuildError(
"beeflow:containerName: You must specify the containerName or dockerImageId"
)
self.container_name = task_container_name
log.info(f'Setting container_name to: {self.container_name}')
return 0
Expand Down
26 changes: 26 additions & 0 deletions beeflow/common/build/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Container build utility code."""
import jsonpickle


def arg2task(task_arg):
"""Convert JSON encoded task to Task object.

The build driver will expect a Task object, and the build
interface starts with a JSON representation of the Task object.
"""
return jsonpickle.decode(task_arg)


def task2arg(task):
"""Convert Task object to JSON encoded string.

The build interface needs to pass Task data on the command line,
because each compute node needs to understand the Task description.
JSON format is a convenient way to describe the Task object at the
command line.
"""
return jsonpickle.encode(task)


class ContainerBuildError(Exception):
"""Cotnainer build error class."""
8 changes: 5 additions & 3 deletions beeflow/common/build_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from beeflow.common.build.container_drivers import CharliecloudBuildDriver
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common import log as bee_logging
from beeflow.common.build.build_driver import arg2task
from beeflow.common.build.utils import arg2task, ContainerBuildError


log = bee_logging.setup(__name__)
Expand Down Expand Up @@ -47,9 +47,11 @@ def build_main(task):
return_code = return_obj.returncode
except AttributeError:
return_code = int(return_obj)
except CalledProcessError:
except CalledProcessError as error:
return_code = 1
log.warning(f'There was a problem executing {op_dict["op_name"]}!')
raise ContainerBuildError(
f'There was a problem executing {op_dict["op_name"]}!'
) from error
# Case 1: Not the last operation spec'd, but is a terminal operation.
if op_dict["op_terminal"] and return_code == 0:
op_values = [None, None, None, True]
Expand Down
2 changes: 1 addition & 1 deletion beeflow/common/crt/charliecloud_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from beeflow.common.crt.crt_driver import (ContainerRuntimeDriver, ContainerRuntimeResult,
Command, CommandType)
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common.build.build_driver import task2arg
from beeflow.common.build.utils import task2arg
from beeflow.common.container_path import convert_path
from beeflow.common import log as bee_logging

Expand Down
2 changes: 1 addition & 1 deletion beeflow/common/crt/singularity_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

from beeflow.common.crt.crt_driver import (ContainerRuntimeDriver, ContainerRuntimeResult, Command)
from beeflow.common.build.build_driver import task2arg
from beeflow.common.build.utils import task2arg


class SingularityDriver(ContainerRuntimeDriver):
Expand Down
13 changes: 12 additions & 1 deletion beeflow/common/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def status(self):
"""Get the status of the workflow."""
return bee_client.query(self.wf_id)[0]

@property
def task_states(self):
"""Get the task states of the workflow."""
return bee_client.query(self.wf_id)[1]

def cleanup(self):
"""Clean up any leftover workflow data."""
# Remove the generated tarball
Expand Down Expand Up @@ -233,4 +238,10 @@ def check_path_exists(path):

def check_completed(workflow):
"""Ensure the workflow has a completed status."""
ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}')
ci_assert(workflow.status == 'Archived', f'bad workflow status {workflow.status}')


def check_workflow_failed(workflow):
"""Ensure that the workflow completed in a Failed state."""
ci_assert(workflow.status == 'Failed',
f'workflow did not fail as expected (final status: {workflow.status})')
19 changes: 17 additions & 2 deletions beeflow/common/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ def multiple_workflows(outer_workdir):
utils.check_path_exists(path)


@TEST_RUNNER.add()
def build_failure(outer_workdir):
"""Test running a workflow with a bad container."""
workdir = os.path.join(outer_workdir, uuid.uuid4().hex)
os.makedirs(workdir)
workflow = utils.Workflow('build-failure', 'ci/test_workflows/build-failure',
main_cwl='workflow.cwl', job_file='input.yml',
workdir=workdir, containers=[])
yield [workflow]
utils.check_workflow_failed(workflow)
# Only one task
task_state = workflow.task_states[0][2]
utils.ci_assert(task_state == 'BUILD_FAIL',
f'task was not in state BUILD_FAIL as expected: {task_state}')


@TEST_RUNNER.add(ignore=True)
def checkpoint_restart(outer_workdir):
"""Test the clamr-ffmpeg checkpoint restart workflow."""
Expand All @@ -220,8 +236,7 @@ def checkpoint_restart_failure(outer_workdir):
main_cwl='workflow.cwl', job_file='input.yml',
workdir=workdir, containers=[])
yield [workflow]
utils.ci_assert(workflow.status == 'Failed',
f'Workflow did not fail as expected (final status: {workflow.status})')
utils.check_workflow_failed(workflow)


def test_input_callback(arg):
Expand Down
58 changes: 38 additions & 20 deletions beeflow/task_manager/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import jsonpickle
from beeflow.task_manager import utils
from beeflow.common import log as bee_logging
from beeflow.common.build.utils import ContainerBuildError
from beeflow.common.build_interfaces import build_main


Expand Down Expand Up @@ -38,33 +39,42 @@ def resolve_environment(task):
build_main(task)


def submit_task(db, worker, task):
"""Submit (or resubmit) a task."""
try:
log.info(f'Resolving environment for task {task.name}')
resolve_environment(task)
log.info(f'Environment preparation complete for task {task.name}')
job_id, job_state = worker.submit_task(task)
log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}')
# place job in queue to monitor
db.job_queue.push(task=task, job_id=job_id, job_state=job_state)
# update_task_metadata(task.id, task_metadata)
except ContainerBuildError as err:
job_state = 'BUILD_FAIL'
log.error(f'Failed to build container for {task.name}: {err}')
log.error(f'{task.name} state: {job_state}')
except Exception as err: # noqa (we have to catch everything here)
# Set job state to failed
job_state = 'SUBMIT_FAIL'
log.error(f'Task Manager submit task {task.name} failed! \n {err}')
log.error(f'{task.name} state: {job_state}')
# Log the traceback information as well
log.error(traceback.format_exc())
# Send the initial state to WFM
# update_task_state(task.id, job_state, metadata=task_metadata)
return job_state


def submit_jobs():
"""Submit all jobs currently in submit queue to the workload scheduler."""
db = utils.connect_db()
worker = utils.worker_interface()
while db.submit_queue.count() >= 1:
# Single value dictionary
task = db.submit_queue.pop()
try:
log.info(f'Resolving environment for task {task.name}')
resolve_environment(task)
log.info(f'Environment preparation complete for task {task.name}')
job_id, job_state = worker.submit_task(task)
log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}')
# place job in queue to monitor
db.job_queue.push(task=task, job_id=job_id, job_state=job_state)
# update_task_metadata(task.id, task_metadata)
except Exception as err: # noqa (we have to catch everything here)
# Set job state to failed
job_state = 'SUBMIT_FAIL'
log.error(f'Task Manager submit task {task.name} failed! \n {err}')
log.error(f'{task.name} state: {job_state}')
# Log the traceback information as well
log.error(traceback.format_exc())
finally:
# Send the initial state to WFM
# update_task_state(task.id, job_state, metadata=task_metadata)
update_task_state(task.workflow_id, task.id, job_state)
job_state = submit_task(db, worker, task)
update_task_state(task.workflow_id, task.id, job_state)


def update_jobs():
Expand Down Expand Up @@ -99,6 +109,14 @@ def update_jobs():
update_task_state(task.workflow_id, task.id, 'FAILED')
else:
update_task_state(task.workflow_id, task.id, new_job_state)
# States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES
elif new_job_state in ('BOOT_FAIL', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED'):
# Don't update wfm, just resubmit
log.info(f'Task {task.name} in state {new_job_state}')
log.info(f'Resubmitting task {task.name}')
db.job_queue.remove_by_id(id_)
job_state = submit_task(db, worker, task)
update_task_state(task.workflow_id, task.id, job_state)
else:
update_task_state(task.workflow_id, task.id, new_job_state)

Expand Down
3 changes: 2 additions & 1 deletion beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db):
temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING")

resp = client().get(f'/bee_wfm/v1/jobs/{WF_ID}')
assert 'RUNNING' in resp.json['tasks_status']
tasks_status = resp.json['tasks_status']
assert tasks_status[0][2] == 'RUNNING' or tasks_status[1][2] == 'RUNNING'


def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db):
Expand Down
4 changes: 1 addition & 3 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,11 @@ def get(wf_id):
if not tasks:
log.info(f"Bad query for wf {wf_id}.")
wf_status = 'No workflow with that ID is currently loaded'
tasks_status.append('Unavailable')
resp = make_response(jsonify(tasks_status=tasks_status,
wf_status=wf_status, status='not found'), 404)

for task in tasks:
tasks_status.append(f"{task.name}--{task.state}")
tasks_status = '\n'.join(tasks_status)
tasks_status.append((task.id, task.name, task.state))
wf_status = db.workflows.get_workflow_state(wf_id)

resp = make_response(jsonify(tasks_status=tasks_status,
Expand Down
7 changes: 7 additions & 0 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def put(self):
if new_task is None:
log.info('No more restarts')
wf_state = wfi.get_workflow_state()
wfi.set_workflow_state('Failed')
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
return make_response(jsonify(status=f'Task {task_id} set to {job_state}'))
Expand Down Expand Up @@ -133,6 +134,12 @@ def put(self):
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)

if job_state == 'BUILD_FAIL':
log.error(f'Workflow failed due to failed container build for task {task.name}')
wfi.set_workflow_state('Failed')
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')

resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to'
f'{job_state}')), 200)
return resp
Expand Down
3 changes: 3 additions & 0 deletions ci/test_workflows/build-failure/Dockerfile.build-failure
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM some_nonexistent_container

RUN touch /file
1 change: 1 addition & 0 deletions ci/test_workflows/build-failure/input.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fname: /file
Loading
Loading