From c5f8034fba658a757f1b92dfb89d7d228f4e1e7e Mon Sep 17 00:00:00 2001 From: Steven Ray Anaya Date: Tue, 16 Jan 2024 18:17:59 -0700 Subject: [PATCH 01/21] Strip step.id from step_input.id for inline step spec --- beeflow/common/parser/parser.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 9c58e8d51..3a4689c17 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -141,6 +141,11 @@ def parse_step(self, step, workflow_id): else: step_cwl = step.run step_id = _shortname(step.id) + # step_input.id needs to have its step.id prefix stripped + for step_input in step_cwl.inputs: + step_shortname = _shortname(step_input.id) + step_input.id = step_input.id.replace(step_shortname, + step_shortname.split("/")[-1]) if step_cwl.class_ != "CommandLineTool": raise CwlParseError(f"Step {step.id} class must be CommandLineTool") From b95b63602dbfb464d545cfdc3bbe26bff68bda33 Mon Sep 17 00:00:00 2001 From: Steven Ray Anaya Date: Tue, 16 Jan 2024 18:19:22 -0700 Subject: [PATCH 02/21] Fix CWL no job gold standard task inputs --- beeflow/tests/test_parser.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/beeflow/tests/test_parser.py b/beeflow/tests/test_parser.py index b6a10e0b6..0cf71fc5d 100644 --- a/beeflow/tests/test_parser.py +++ b/beeflow/tests/test_parser.py @@ -164,7 +164,8 @@ def test_parse_workflow_no_job(self): hints=[Hint(class_='DockerRequirement', params={'dockerImageId': '/usr/projects/beedev/clamr/clamr-toss.tar.gz'})], requirements=[], - inputs=[], + inputs=[StepInput(id='infile', type='File', value=None, default='lorem.txt', + source='infile', prefix=None, position=1, value_from=None)], outputs=[StepOutput(id='clamr/outfile', type='stdout', value=None, glob='graphics_output')], stdout='graphics_output', @@ -175,7 +176,8 @@ def test_parse_workflow_no_job(self): base_command='ffmpeg -f image2 -i $HOME/graphics_output/graph%05d.png -r 12 -s 800x800 -pix_fmt yuv420p $HOME/CLAMR_movie.mp4', # noqa hints=[], requirements=[], - inputs=[], + inputs=[StepInput(id='infile', type='File', value=None, default='graphics_output', + source='clamr/outfile', prefix=None, position=1, value_from=None)], outputs=[StepOutput(id='ffmpeg/outfile', type='stdout', value=None, glob='CLAMR_movie.mp4')], stdout='CLAMR_movie.mp4', From 3e58475d825ee13c0eb2523049fce48084ee24bf Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 17 Jan 2024 11:42:42 -0700 Subject: [PATCH 03/21] Initial Task Manager refactor This refactors the task manager into a subpackage with multiple files, similar to how the Workflow Manager is currently structured. Previously, the Task Manager code was in a single file, making it hard to manage. --- beeflow/client/core.py | 4 +- beeflow/common/wf_data.py | 30 +++ beeflow/task_manager.py | 345 --------------------------- beeflow/task_manager/background.py | 109 +++++++++ beeflow/task_manager/task_actions.py | 35 +++ beeflow/task_manager/task_manager.py | 42 ++++ beeflow/task_manager/task_submit.py | 26 ++ beeflow/task_manager/utils.py | 88 +++++++ beeflow/tests/test_tm.py | 30 +-- 9 files changed, 347 insertions(+), 362 deletions(-) delete mode 100755 beeflow/task_manager.py create mode 100644 beeflow/task_manager/background.py create mode 100644 beeflow/task_manager/task_actions.py create mode 100644 beeflow/task_manager/task_manager.py create mode 100644 beeflow/task_manager/task_submit.py create mode 100644 beeflow/task_manager/utils.py diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 5042dce6e..4c803a3e3 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -171,8 +171,8 @@ def start_wfm(): def start_task_manager(): """Start the TM.""" fp = open_log('task_manager') - return launch_with_gunicorn('beeflow.task_manager:flask_app', paths.tm_socket(), - stdout=fp, stderr=fp) + return launch_with_gunicorn('beeflow.task_manager.task_manager:create_app()', + paths.tm_socket(), stdout=fp, stderr=fp) @mgr.component('scheduler', ()) def start_scheduler(): diff --git a/beeflow/common/wf_data.py b/beeflow/common/wf_data.py index 17092d684..0761d8715 100644 --- a/beeflow/common/wf_data.py +++ b/beeflow/common/wf_data.py @@ -197,6 +197,36 @@ def get_requirement(self, req_type, req_param, default=None): requirement = default return requirement + def get_full_requirement(self, req_type): + """Get the full requirement (or hint) for this task, if it has one. + + :param req_type: the type of requirement (e.g. 'DockerRequirement') + :type req_type: str + + This prefers requirements over hints. Returns None if no hint or + requirement found. + """ + result = None + hints = dict(self.hints) + try: + # Try to get Hints + hint = hints[req_type] + except (KeyError, TypeError): + # Task Hints are not mandatory. No task hint specified. + hint = None + try: + # Try to get Requirements + req = self.requirements[req_type] + except (KeyError, TypeError): + # Task Requirements are not mandatory. No task requirement specified. + req = None + # Prefer requirements over hints + if req: + result = req + elif hint: + result = hint + return result + def __eq__(self, other): """Test the equality of two tasks. diff --git a/beeflow/task_manager.py b/beeflow/task_manager.py deleted file mode 100755 index de1614ec9..000000000 --- a/beeflow/task_manager.py +++ /dev/null @@ -1,345 +0,0 @@ -"""Task Manager submits & manages tasks from Work Flow Manager. - -Submits, cancels and monitors states of tasks. -Communicates status to the Work Flow Manager, through RESTful API. -""" -import atexit -import sys -import hashlib -import os -from pathlib import Path -import re -import socket -import traceback -import jsonpickle - -from flask import Flask, jsonify, make_response -from flask_restful import Resource, Api, reqparse - -from apscheduler.schedulers.background import BackgroundScheduler - -from beeflow.common.config_driver import BeeConfig as bc -from beeflow.common import log as bee_logging -from beeflow.common.build_interfaces import build_main -from beeflow.common.worker_interface import WorkerInterface -from beeflow.common.connection import Connection -import beeflow.common.worker as worker_pkg -from beeflow.common.db import tm_db -from beeflow.common.db.bdb import connect_db -from beeflow.common import paths - - -log = bee_logging.setup(__name__) - -runtime = bc.get('task_manager', 'container_runtime') - - -flask_app = Flask(__name__) -api = Api(flask_app) - -bee_workdir = bc.get('DEFAULT', 'bee_workdir') -db_path = bee_workdir + '/' + 'tm.db' - - -def _url(): - """Return the url to the WFM.""" - # Saving this for whenever we need to run jobs across different machines - # workflow_manager = 'bee_wfm/v1/jobs/' - # #wfm_listen_port = bc.get('workflow_manager', 'listen_port') - # wfm_listen_port = wf_db.get_wfm_port() - # return f'http://127.0.0.1:{wfm_listen_port}/{workflow_manager}' - return 'bee_wfm/v1/jobs/' - - -def _resource(tag=""): - """Access the WFM.""" - return _url() + str(tag) - - -def _wfm_conn(): - """Get a new connection to the WFM.""" - return Connection(paths.wfm_socket()) - - -def update_task_state(workflow_id, task_id, job_state, **kwargs): - """Informs the workflow manager of the current state of a task.""" - data = {'wf_id': workflow_id, 'task_id': task_id, 'job_state': job_state} - if 'metadata' in kwargs: - kwargs['metadata'] = jsonpickle.encode(kwargs['metadata']) - - if 'task_info' in kwargs: - kwargs['task_info'] = jsonpickle.encode(kwargs['task_info']) - - data.update(kwargs) - conn = _wfm_conn() - resp = conn.put(_resource("update/"), json=data) - if resp.status_code != 200: - log.warning("WFM not responding when sending task update.") - - -def update_task_metadata(task_id, metadata): - """Send workflow manager task metadata.""" - log.info(f'Update task metadata for {task_id}:\n {metadata}') - # resp = requests.put(_resource("update/"), json=metadata) - # if resp.status_code != 200: - # log.warning("WFM not responding when sending task metadata.") - - -def gen_task_metadata(task, job_id): - """Generate dictionary of task metadata for the job submitted. - - Includes: - job_id - hostname - container runtime (when task uses a container) - hash of container file (when task uses a container) - """ - hostname = socket.gethostname() - metadata = {'job_id': job_id, 'host': hostname} - for hint in task.hints: - if hint.class_ == "DockerRequirement" and "dockerImageId" in hint.params.keys(): - metadata['container_runtime'] = bc.get('task_manager', 'container_runtime') - container_path = hint.params["dockerImageId"] - with open(container_path, 'rb') as container: - c_hash = hashlib.md5() - chunk = container.read(8192) - while chunk: - c_hash.update(chunk) - chunk = container.read(8192) - container_hash = c_hash.hexdigest() - metadata['container_hash'] = container_hash - return metadata - - -def resolve_environment(task): - """Use build interface to create a valid environment.""" - build_main(task) - - -def submit_jobs(): - """Submit all jobs currently in submit queue to the workload scheduler.""" - db = connect_db(tm_db, db_path) - while db.submit_queue.count() >= 1: - # Single value dictionary - task = db.submit_queue.pop() - try: - log.info(f'Resolving environment for task {task.name}') - resolve_environment(task) - log.info(f'Environment preparation complete for task {task.name}') - job_id, job_state = worker.submit_task(task) - log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}') - # place job in queue to monitor - db.job_queue.push(task=task, job_id=job_id, job_state=job_state) - # update_task_metadata(task.id, task_metadata) - except Exception as err: - # Set job state to failed - job_state = 'SUBMIT_FAIL' - log.error(f'Task Manager submit task {task.name} failed! \n {err}') - log.error(f'{task.name} state: {job_state}') - # Log the traceback information as well - log.error(traceback.format_exc()) - finally: - # Send the initial state to WFM - # update_task_state(task.id, job_state, metadata=task_metadata) - update_task_state(task.workflow_id, task.id, job_state) - - -def get_task_checkpoint(task): - """Harvest task checkpoint.""" - task_checkpoint = None - hints = dict(task.hints) - try: - # Try to get Hints - hint_checkpoint = hints['beeflow:CheckpointRequirement'] - except (KeyError, TypeError): - # Task Hints are not mandatory. No task checkpoint hint specified. - hint_checkpoint = None - try: - # Try to get Requirements - req_checkpoint = task.requirements['beeflow:CheckpointRequirement'] - except (KeyError, TypeError): - # Task Requirements are not mandatory. No task checkpoint requirement specified. - req_checkpoint = None - # Prefer requirements over hints - if req_checkpoint: - task_checkpoint = req_checkpoint - elif hint_checkpoint: - task_checkpoint = hint_checkpoint - return task_checkpoint - - -def get_restart_file(task_checkpoint, task_workdir): - """Find latest checkpoint file.""" - if 'file_regex' not in task_checkpoint: - raise RuntimeError('file_regex is required for checkpointing') - if 'file_path' not in task_checkpoint: - raise RuntimeError('file_path is required for checkpointing') - file_regex = task_checkpoint['file_regex'] - file_path = Path(task_workdir, task_checkpoint['file_path']) - regex = re.compile(file_regex) - checkpoint_files = [ - Path(file_path, fname) for fname in os.listdir(file_path) - if regex.match(fname) - ] - checkpoint_files.sort(key=os.path.getmtime) - try: - checkpoint_file = checkpoint_files[-1] - return str(checkpoint_file) - except IndexError: - raise RuntimeError('Missing checkpoint file for task') from None - - -def update_jobs(): - """Check and update states of jobs in queue, remove completed jobs.""" - db = connect_db(tm_db, db_path) - # Need to make a copy first - job_q = list(db.job_queue) - for job in job_q: - id_ = job.id - task = job.task - job_id = job.job_id - job_state = job.job_state - new_job_state = worker.query_task(job_id) - - # If state changes update the WFM - if job_state != new_job_state: - db.job_queue.update_job_state(id_, new_job_state) - if new_job_state in ('FAILED', 'TIMELIMIT', 'TIMEOUT'): - # Harvest lastest checkpoint file. - task_checkpoint = get_task_checkpoint(task) - log.info(f'state: {new_job_state}') - log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}') - if task_checkpoint: - checkpoint_file = get_restart_file(task_checkpoint, task.workdir) - task_info = {'checkpoint_file': checkpoint_file, 'restart': True} - update_task_state(task.workflow_id, task.id, new_job_state, - task_info=task_info) - else: - update_task_state(task.workflow_id, task.id, new_job_state) - else: - update_task_state(task.workflow_id, task.id, new_job_state) - - if job_state in ('ZOMBIE', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'): - # Remove from the job queue. Our job is finished - db.job_queue.remove_by_id(id_) - - -def process_queues(): - """Look for newly submitted jobs and update status of scheduled jobs.""" - submit_jobs() # noqa - update_jobs() # noqa - - -if "pytest" not in sys.modules: - scheduler = BackgroundScheduler({'apscheduler.timezone': 'UTC'}) - scheduler.add_job(func=process_queues, trigger="interval", - seconds=bc.get('task_manager', 'background_interval')) - scheduler.start() - - # This kills the scheduler when the process terminates - # so we don't accidentally leave a zombie process - atexit.register(scheduler.shutdown) - - -class TaskSubmit(Resource): - """WFM sends tasks to the task manager.""" - - def __init__(self): - """Intialize request.""" - - @staticmethod - def post(): - """Receives task from WFM.""" - db = connect_db(tm_db, db_path) - parser = reqparse.RequestParser() - parser.add_argument('tasks', type=str, location='json') - data = parser.parse_args() - tasks = jsonpickle.decode(data['tasks']) - for task in tasks: - db.submit_queue.push(task) - log.info(f"Added {task.name} task to the submit queue") - resp = make_response(jsonify(msg='Tasks Added!', status='ok'), 200) - return resp - - -class TaskActions(Resource): - """Actions to take for tasks.""" - - @staticmethod - def delete(): - """Cancel received from WFM to cancel job, update queue to monitor state.""" - db = connect_db(tm_db, db_path) - cancel_msg = "" - for job in db.job_queue: - task_id = job.task.id - job_id = job.job_id - name = job.task.name - log.info(f"Cancelling {name} with job_id: {job_id}") - try: - job_state = worker.cancel_task(job_id) - except Exception as err: - log.error(err) - log.error(traceback.format_exc()) - job_state = 'ZOMBIE' - cancel_msg += f"{name} {task_id} {job_id} {job_state}" - db.job_queue.clear() - db.submit_queue.clear() - resp = make_response(jsonify(msg=cancel_msg, status='ok'), 200) - return resp - - -# This could probably be in a Resource class, but since its only one route -# it seems to be fine right here -@flask_app.route('/status') -def get_status(): - """Report the current status of the Task Manager.""" - return make_response(jsonify(status='up'), 200) - - -WLS = bc.get('DEFAULT', 'workload_scheduler') -worker_class = worker_pkg.find_worker(WLS) -if worker_class is None: - sys.exit(f'Workload scheduler {WLS}, not supported.\n' - + f'Please check {bc.userconfig_path()} and restart TaskManager.') -# Get the parameters for the worker classes -worker_kwargs = { - 'bee_workdir': bc.get('DEFAULT', 'bee_workdir'), - 'container_runtime': bc.get('task_manager', 'container_runtime'), - # extra options to be passed to the runner (i.e. srun [RUNNER_OPTS] ... for Slurm) - 'runner_opts': bc.get('task_manager', 'runner_opts'), -} -# Job defaults -for default_key in ['default_account', 'default_time_limit', 'default_partition']: - worker_kwargs[default_key] = bc.get('job', default_key) -# Special slurm arguments -if WLS == 'Slurm': - worker_kwargs['use_commands'] = bc.get('slurm', 'use_commands') - worker_kwargs['slurm_socket'] = paths.slurm_socket() - worker_kwargs['openapi_version'] = bc.get('slurm', 'openapi_version') -worker = WorkerInterface(worker_class, **worker_kwargs) - -api.add_resource(TaskSubmit, '/bee_tm/v1/task/submit/') -api.add_resource(TaskActions, '/bee_tm/v1/task/') - -# if __name__ == '__main__': -# hostname = socket.gethostname() -# log.info(f'Starting Task Manager on host: {hostname}') -# bee_workdir = bc.get('DEFAULT', 'bee_workdir') -# handler = bee_logging.save_log(bee_workdir=bee_workdir, log=log, logfile='task_manager.log') -# log.info(f'tm_listen_port:{tm_listen_port}') -# container_runtime = bc.get('task_manager', 'container_runtime') -# log.info(f'container_runtime: {container_runtime}') -# -# # Werkzeug logging -# werk_log = logging.getLogger('werkzeug') -# werk_log.setLevel(logging.INFO) -# werk_log.addHandler(handler) -# -# # Flask logging -# flask_app.logger.addHandler(handler) -# flask_app.run(debug=False, port=str(tm_listen_port)) - -# Ignoring W0703: Catching general exception is ok for failed submit and cancel. -# Ignoring E402: "module level import is not at top of file" - this is required -# for the bee config -# pylama:ignore=W0703,E402 diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py new file mode 100644 index 000000000..10ca9de77 --- /dev/null +++ b/beeflow/task_manager/background.py @@ -0,0 +1,109 @@ +"""Background task code. + +This code processes submitted tasks, monitors status, and sends info back to +the Workflow Manager. +""" +import traceback +import jsonpickle +from beeflow.task_manager import utils +from beeflow.common import log as bee_logging +from beeflow.common.build_interfaces import build_main + + +log = bee_logging.setup(__name__) + + +def update_task_state(workflow_id, task_id, job_state, **kwargs): + """Informs the workflow manager of the current state of a task.""" + data = {'wf_id': workflow_id, 'task_id': task_id, 'job_state': job_state} + if 'metadata' in kwargs: + kwargs['metadata'] = jsonpickle.encode(kwargs['metadata']) + + if 'task_info' in kwargs: + kwargs['task_info'] = jsonpickle.encode(kwargs['task_info']) + + data.update(kwargs) + conn = utils.wfm_conn() + resp = conn.put(utils.wfm_resource_url("update/"), json=data) + if resp.status_code != 200: + log.warning("WFM not responding when sending task update.") + + +def resolve_environment(task): + """Use build interface to create a valid environment. + + This will build and/or pull containers if necessary; it can take some time + to run this step. + """ + build_main(task) + + +def submit_jobs(): + """Submit all jobs currently in submit queue to the workload scheduler.""" + db = utils.connect_db() + worker = utils.worker_interface() + while db.submit_queue.count() >= 1: + # Single value dictionary + task = db.submit_queue.pop() + try: + log.info(f'Resolving environment for task {task.name}') + resolve_environment(task) + log.info(f'Environment preparation complete for task {task.name}') + job_id, job_state = worker.submit_task(task) + log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}') + # place job in queue to monitor + db.job_queue.push(task=task, job_id=job_id, job_state=job_state) + # update_task_metadata(task.id, task_metadata) + except Exception as err: # noqa (we have to catch everything here) + # Set job state to failed + job_state = 'SUBMIT_FAIL' + log.error(f'Task Manager submit task {task.name} failed! \n {err}') + log.error(f'{task.name} state: {job_state}') + # Log the traceback information as well + log.error(traceback.format_exc()) + finally: + # Send the initial state to WFM + # update_task_state(task.id, job_state, metadata=task_metadata) + update_task_state(task.workflow_id, task.id, job_state) + + +def update_jobs(): + """Check and update states of jobs in queue, remove completed jobs.""" + db = utils.connect_db() + worker = utils.worker_interface() + # Need to make a copy first + job_q = list(db.job_queue) + for job in job_q: + id_ = job.id + task = job.task + job_id = job.job_id + job_state = job.job_state + new_job_state = worker.query_task(job_id) + + # If state changes update the WFM + if job_state != new_job_state: + db.job_queue.update_job_state(id_, new_job_state) + if new_job_state in ('FAILED', 'TIMELIMIT', 'TIMEOUT'): + # Harvest lastest checkpoint file. + task_checkpoint = task.get_full_requirement('beeflow:CheckpointRequirement') + log.info(f'state: {new_job_state}') + log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}') + if task_checkpoint: + checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) + task_info = {'checkpoint_file': checkpoint_file, 'restart': True} + update_task_state(task.workflow_id, task.id, new_job_state, + task_info=task_info) + else: + update_task_state(task.workflow_id, task.id, new_job_state) + else: + update_task_state(task.workflow_id, task.id, new_job_state) + + if job_state in ('ZOMBIE', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'): + # Remove from the job queue. Our job is finished + db.job_queue.remove_by_id(id_) + + +def process_queues(): + """Look for newly submitted jobs and update status of scheduled jobs.""" + submit_jobs() + update_jobs() diff --git a/beeflow/task_manager/task_actions.py b/beeflow/task_manager/task_actions.py new file mode 100644 index 000000000..505f158c0 --- /dev/null +++ b/beeflow/task_manager/task_actions.py @@ -0,0 +1,35 @@ +"""Manage actions for tasks coming from the WFM.""" +import traceback +from flask import jsonify, make_response +from flask_restful import Resource +from beeflow.common import log as bee_logging +from beeflow.task_manager import utils + +log = bee_logging.setup(__name__) + + +class TaskActions(Resource): + """Actions to take for tasks.""" + + @staticmethod + def delete(): + """Cancel received from WFM to cancel job, update queue to monitor state.""" + db = utils.connect_db() + worker = utils.worker_interface() + cancel_msg = "" + for job in db.job_queue: + task_id = job.task.id + job_id = job.job_id + name = job.task.name + log.info(f"Cancelling {name} with job_id: {job_id}") + try: + job_state = worker.cancel_task(job_id) + except Exception as err: # noqa (we have to catch everything here) + log.error(err) + log.error(traceback.format_exc()) + job_state = 'ZOMBIE' + cancel_msg += f"{name} {task_id} {job_id} {job_state}" + db.job_queue.clear() + db.submit_queue.clear() + resp = make_response(jsonify(msg=cancel_msg, status='ok'), 200) + return resp diff --git a/beeflow/task_manager/task_manager.py b/beeflow/task_manager/task_manager.py new file mode 100644 index 000000000..ca20d73d8 --- /dev/null +++ b/beeflow/task_manager/task_manager.py @@ -0,0 +1,42 @@ +"""Task Manager app and api set up code. + +Submits, cancels and monitors states of tasks. +Communicates status to the Work Flow Manager, through RESTful API. +""" +import atexit +import sys +from apscheduler.schedulers.background import BackgroundScheduler +from flask import Flask, jsonify, make_response +from beeflow.common.api import BeeApi +from beeflow.task_manager.task_submit import TaskSubmit +from beeflow.task_manager.task_actions import TaskActions +from beeflow.task_manager.background import process_queues +from beeflow.common.config_driver import BeeConfig as bc + + +def create_app(): + """Create the flask app and add the REST endpoints for the TM.""" + app = Flask(__name__) + api = BeeApi(app) + + # Endpoints + api.add_resource(TaskSubmit, '/bee_tm/v1/task/submit/') + api.add_resource(TaskActions, '/bee_tm/v1/task/') + + @app.route('/status') + def get_status(): + """Report the current status of the Task Manager.""" + return make_response(jsonify(stauts='up'), 200) + + # Start the background scheduler and make sure it gets cleaned up + if "pytest" not in sys.modules: + scheduler = BackgroundScheduler({'apscheduler.timezone': 'UTC'}) + scheduler.add_job(func=process_queues, trigger="interval", + seconds=bc.get('task_manager', 'background_interval')) + scheduler.start() + + # This kills the scheduler when the process terminates + # so we don't accidentally leave a zombie process + atexit.register(scheduler.shutdown) + + return app diff --git a/beeflow/task_manager/task_submit.py b/beeflow/task_manager/task_submit.py new file mode 100644 index 000000000..b7148fafc --- /dev/null +++ b/beeflow/task_manager/task_submit.py @@ -0,0 +1,26 @@ +"""Handle task submission.""" +from flask import jsonify, make_response +from flask_restful import Resource, reqparse +import jsonpickle +from beeflow.common import log as bee_logging +from beeflow.task_manager import utils + +log = bee_logging.setup(__name__) + + +class TaskSubmit(Resource): + """WFM sends tasks to the task manager.""" + + @staticmethod + def post(): + """Receives task from WFM.""" + db = utils.connect_db() + parser = reqparse.RequestParser() + parser.add_argument('tasks', type=str, location='json') + data = parser.parse_args() + tasks = jsonpickle.decode(data['tasks']) + for task in tasks: + db.submit_queue.push(task) + log.info(f"Added {task.name} task to the submit queue") + resp = make_response(jsonify(msg='Tasks Added!', status='ok'), 200) + return resp diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py new file mode 100644 index 000000000..d0459e12f --- /dev/null +++ b/beeflow/task_manager/utils.py @@ -0,0 +1,88 @@ +"""Task Manager utility functions.""" +import os +from pathlib import Path +import re +from beeflow.common.config_driver import BeeConfig as bc +from beeflow.common.db import tm_db +from beeflow.common.db import bdb +from beeflow.common import worker +from beeflow.common import paths +from beeflow.common.connection import Connection +from beeflow.common.worker_interface import WorkerInterface + + +def db_path(): + """Return the TM database path.""" + bee_workdir = bc.get('DEFAULT', 'bee_workdir') + return os.path.join(bee_workdir, 'tm.db') + + +def connect_db(): + """Connect to the TM database.""" + return bdb.connect_db(tm_db, db_path()) + + +def worker_interface(): + """Load the worker interface.""" + wls = bc.get('DEFAULT', 'workload_scheduler') + worker_class = worker.find_worker(wls) + if worker_class is None: + raise RuntimeError(f'Workload scheduler {wls}, not supported.\n' + + f'Please check {bc.userconfig_path()} and restart TaskManager.') + # Get the parameters for the worker classes + worker_kwargs = { + 'bee_workdir': bc.get('DEFAULT', 'bee_workdir'), + 'container_runtime': bc.get('task_manager', 'container_runtime'), + # extra options to be passed to the runner (i.e. srun [RUNNER_OPTS] ... for Slurm) + 'runner_opts': bc.get('task_manager', 'runner_opts'), + } + # Job defaults + for default_key in ['default_account', 'default_time_limit', 'default_partition']: + worker_kwargs[default_key] = bc.get('job', default_key) + # Special slurm arguments + if wls == 'Slurm': + worker_kwargs['use_commands'] = bc.get('slurm', 'use_commands') + worker_kwargs['slurm_socket'] = paths.slurm_socket() + worker_kwargs['openapi_version'] = bc.get('slurm', 'openapi_version') + return WorkerInterface(worker_class, **worker_kwargs) + + +def wfm_url(): + """Return the url to the WFM.""" + # Saving this for whenever we need to run jobs across different machines + # workflow_manager = 'bee_wfm/v1/jobs/' + # #wfm_listen_port = bc.get('workflow_manager', 'listen_port') + # wfm_listen_port = wf_db.get_wfm_port() + # return f'http://127.0.0.1:{wfm_listen_port}/{workflow_manager}' + return 'bee_wfm/v1/jobs/' + + +def wfm_resource_url(tag=""): + """Get the full resource URL for access to the WFM.""" + return wfm_url() + str(tag) + + +def wfm_conn(): + """Get a new connection to the WFM.""" + return Connection(paths.wfm_socket()) + + +def get_restart_file(task_checkpoint, task_workdir): + """Find latest checkpoint file.""" + if 'file_regex' not in task_checkpoint: + raise RuntimeError('file_regex is required for checkpointing') + if 'file_path' not in task_checkpoint: + raise RuntimeError('file_path is required for checkpointing') + file_regex = task_checkpoint['file_regex'] + file_path = Path(task_workdir, task_checkpoint['file_path']) + regex = re.compile(file_regex) + checkpoint_files = [ + Path(file_path, fname) for fname in os.listdir(file_path) + if regex.match(fname) + ] + checkpoint_files.sort(key=os.path.getmtime) + try: + checkpoint_file = checkpoint_files[-1] + return str(checkpoint_file) + except IndexError: + raise RuntimeError('Missing checkpoint file for task') from None diff --git a/beeflow/tests/test_tm.py b/beeflow/tests/test_tm.py index 96f23666e..35491e99a 100644 --- a/beeflow/tests/test_tm.py +++ b/beeflow/tests/test_tm.py @@ -10,7 +10,7 @@ from beeflow.common.db.bdb import connect_db from beeflow.common.db import tm_db -import beeflow.task_manager as tm +import beeflow.task_manager.task_manager as tm from beeflow.common.wf_data import Task import beeflow @@ -18,7 +18,7 @@ @pytest.fixture def flask_client(): """Client lets us run flask queries.""" - app = tm.flask_app + app = tm.create_app() client = app.test_client() yield client @@ -45,9 +45,9 @@ def temp_db(): @pytest.mark.usefixtures('flask_client', 'mocker') def test_submit_task(flask_client, mocker, temp_db): # noqa """Create a workflow and get the ID back.""" - mocker.patch('beeflow.task_manager.worker', - new_callable=MockWorkerSubmission) - mocker.patch('beeflow.task_manager.db_path', temp_db.db_file) + mocker.patch('beeflow.task_manager.utils.worker_interface', + MockWorkerSubmission) + mocker.patch('beeflow.task_manager.utils.db_path', lambda: temp_db.db_file) # Generate a task tasks = generate_tasks(1) tasks_json = jsonpickle.encode(tasks) @@ -55,12 +55,12 @@ def test_submit_task(flask_client, mocker, temp_db): # noqa response = flask_client.post('/bee_tm/v1/task/submit/', json={'tasks': tasks_json}) - mocker.patch('beeflow.task_manager.worker', - new_callable=MockWorkerSubmission) + mocker.patch('beeflow.task_manager.utils.worker_interface', + MockWorkerSubmission) # Patch the connection object for WFM communication mocker.patch('beeflow.common.connection.Connection.put', mock_put) - beeflow.task_manager.process_queues() + beeflow.task_manager.background.process_queues() msg = response.get_json()['msg'] status = response.status_code @@ -81,14 +81,14 @@ def test_submit_task(flask_client, mocker, temp_db): # noqa def test_completed_task(flask_client, mocker, temp_db): # noqa """Tests how the task manager processes a completed task.""" # 42 is the sample task ID - mocker.patch('beeflow.task_manager.worker', - new_callable=MockWorkerCompletion) + mocker.patch('beeflow.task_manager.utils.worker_interface', + MockWorkerCompletion) # Patch the connection object for WFM communication mocker.patch('beeflow.common.connection.Connection.put', mock_put) - mocker.patch('beeflow.task_manager.db_path', temp_db.db_file) + mocker.patch('beeflow.task_manager.utils.db_path', lambda: temp_db.db_file) # This should notice the job is complete and empty the job_queue - beeflow.task_manager.process_queues() + beeflow.task_manager.background.process_queues() job_queue = list(temp_db.job_queue) assert len(job_queue) == 0 @@ -102,9 +102,9 @@ def test_remove_task(flask_client, mocker, temp_db): # noqa temp_db.job_queue.push(task=task2, job_id=2, job_state='PENDING') temp_db.job_queue.push(task=task3, job_id=3, job_state='PENDING') - mocker.patch('beeflow.task_manager.worker', - new_callable=MockWorkerCompletion) - mocker.patch('beeflow.task_manager.db_path', temp_db.db_file) + mocker.patch('beeflow.task_manager.utils.worker_interface', + MockWorkerCompletion) + mocker.patch('beeflow.task_manager.utils.db_path', lambda: temp_db.db_file) response = flask_client.delete('/bee_tm/v1/task/') From c107f4baee716bb1fe990823e08f4309a58db577 Mon Sep 17 00:00:00 2001 From: Steven Ray Anaya Date: Wed, 31 Jan 2024 11:10:42 -0700 Subject: [PATCH 04/21] Increase time steps to reduce chance of success --- .../cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml b/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml index 7f57a05ee..5609cfe69 100644 --- a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml +++ b/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml @@ -1,7 +1,7 @@ { "grid_resolution": 32, "max_levels": 3, - "time_steps": 80000, + "time_steps": 150000, "steps_between_outputs": 10, "steps_between_graphics": 25, "graphics_type": "png", From 79e73a0809170d7ab7d48afe38a67d83024b9884 Mon Sep 17 00:00:00 2001 From: Steven Ray Anaya Date: Wed, 31 Jan 2024 11:11:05 -0700 Subject: [PATCH 05/21] Fixed checkpoint restart num_tries not working --- beeflow/common/parser/parser.py | 8 +++++++- beeflow/common/wf_interface.py | 5 ++--- beeflow/wf_manager/resources/wf_update.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 9c58e8d51..f64b008ed 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -302,7 +302,13 @@ def parse_requirements(self, requirements, as_hints=False): return reqs if as_hints: for req in requirements: - items = {k: str(v) for k, v in req.items() if k != "class"} + items = {} + for k, v in req.items(): + if k != 'class': + if isinstance(v, (int, float)): + items[k] = v + else: + items[k] = str(v) # Load in the dockerfile at parse time if 'dockerFile' in items: self._read_requirement_file('dockerFile', items) diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index 38849342c..edc28ead7 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -5,7 +5,6 @@ import re - class WorkflowInterface: """Interface for manipulating workflows.""" @@ -94,6 +93,7 @@ def restart_task(self, task, checkpoint_file): :param task: the task to restart :type task: Task + :param checkpoint_file: the task checkpoint file :rtype: Task or None """ for hint in task.hints: @@ -102,8 +102,7 @@ def restart_task(self, task, checkpoint_file): hint.params["num_tries"] -= 1 hint.params["bee_checkpoint_file__"] = checkpoint_file break - state = self.get_task_state(task) - self.set_task_state(task, f"FAILED RESTART: {state}") + self.set_task_state(task, f"FAILED") return None else: raise ValueError("invalid task for checkpoint restart") diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index a2ee2aa36..1dfd80262 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -96,11 +96,11 @@ def put(self): task_info = jsonpickle.decode(data['task_info']) checkpoint_file = task_info['checkpoint_file'] new_task = wfi.restart_task(task, checkpoint_file) - db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") if new_task is None: log.info('No more restarts') wf_state = wfi.get_task_state(task) return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) + db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") # Submit the restart task tasks = [new_task] wf_utils.schedule_submit_tasks(wf_id, tasks) From a8767fdaf98c3a037b63ea58f8b19ce30a8f30a6 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Mon, 5 Feb 2024 09:28:50 -0700 Subject: [PATCH 06/21] Remove Jinja2 dependency --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d96bd9a85..5a0aa8c51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,6 @@ python = ">=3.8.3,<=3.11" # Package dependencies Flask = { version = "^2.0" } -Jinja2 = { version = "<3.1" } neo4j = { version = "^1.7.4" } PyYAML = { version = "^6.0.1" } flask_restful = "0.3.9" From fb206d0ee9e45b22652db24e095eb067cc2c2318 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 6 Feb 2024 15:16:55 -0700 Subject: [PATCH 07/21] Fix checkpoint-too-long workflow and add better error checking to get_restart_file() --- beeflow/task_manager/background.py | 12 ++++++---- beeflow/task_manager/utils.py | 24 +++++++++++++------ .../checkpoint-too-long/workflow.cwl | 4 ++-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 10ca9de77..c07a4ca35 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -89,10 +89,14 @@ def update_jobs(): log.info(f'state: {new_job_state}') log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}') if task_checkpoint: - checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) - task_info = {'checkpoint_file': checkpoint_file, 'restart': True} - update_task_state(task.workflow_id, task.id, new_job_state, - task_info=task_info) + try: + checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) + task_info = {'checkpoint_file': checkpoint_file, 'restart': True} + update_task_state(task.workflow_id, task.id, new_job_state, + task_info=task_info) + except utils.CheckpointRestartError as err: + log.error(f'Checkpoint restart failed for {task.name} ({task.id}): {err}') + update_task_state(task.workflow_id, task.id, 'FAILED') else: update_task_state(task.workflow_id, task.id, new_job_state) else: diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py index d0459e12f..6ecdec5e0 100644 --- a/beeflow/task_manager/utils.py +++ b/beeflow/task_manager/utils.py @@ -67,22 +67,32 @@ def wfm_conn(): return Connection(paths.wfm_socket()) +class CheckpointRestartError(Exception): + """Exception to be thrown on checkpoint-restart failure.""" + pass + + def get_restart_file(task_checkpoint, task_workdir): """Find latest checkpoint file.""" if 'file_regex' not in task_checkpoint: - raise RuntimeError('file_regex is required for checkpointing') + raise CheckpointRestartError('file_regex is required for checkpointing') if 'file_path' not in task_checkpoint: - raise RuntimeError('file_path is required for checkpointing') + raise CheckpointRestartError('file_path is required for checkpointing') file_regex = task_checkpoint['file_regex'] file_path = Path(task_workdir, task_checkpoint['file_path']) regex = re.compile(file_regex) - checkpoint_files = [ - Path(file_path, fname) for fname in os.listdir(file_path) - if regex.match(fname) - ] + try: + checkpoint_files = [ + Path(file_path, fname) for fname in os.listdir(file_path) + if regex.match(fname) + ] + except FileNotFoundError: + raise CheckpointRestartError( + f'Checkpoint file_path ("{file_path}") not found' + ) from None checkpoint_files.sort(key=os.path.getmtime) try: checkpoint_file = checkpoint_files[-1] return str(checkpoint_file) except IndexError: - raise RuntimeError('Missing checkpoint file for task') from None + raise CheckpointRestartError('Missing checkpoint file for task') from None diff --git a/ci/test_workflows/checkpoint-too-long/workflow.cwl b/ci/test_workflows/checkpoint-too-long/workflow.cwl index 0fa9086e6..b8ea7f009 100644 --- a/ci/test_workflows/checkpoint-too-long/workflow.cwl +++ b/ci/test_workflows/checkpoint-too-long/workflow.cwl @@ -20,8 +20,8 @@ steps: hints: beeflow:CheckpointRequirement: enabled: true - file_path: checkpoint_output - container_path: checkpoint_output + file_path: . + container_path: . file_regex: backup[0-9]*.crx restart_parameters: -R num_tries: 1 From 9ce72391214c235818014fada8de338f55fc0936 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 7 Feb 2024 09:01:54 -0700 Subject: [PATCH 08/21] Ensure that checkpoint-restart workflows fail after exceeding num_tries --- beeflow/common/wf_interface.py | 4 +++- beeflow/task_manager/utils.py | 1 - beeflow/wf_manager/resources/wf_update.py | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index edc28ead7..f53f76725 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -5,6 +5,7 @@ import re + class WorkflowInterface: """Interface for manipulating workflows.""" @@ -102,7 +103,8 @@ def restart_task(self, task, checkpoint_file): hint.params["num_tries"] -= 1 hint.params["bee_checkpoint_file__"] = checkpoint_file break - self.set_task_state(task, f"FAILED") + self.set_task_state(task, "FAILED") + self.set_workflow_state("FAILED") return None else: raise ValueError("invalid task for checkpoint restart") diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py index 6ecdec5e0..3378c442e 100644 --- a/beeflow/task_manager/utils.py +++ b/beeflow/task_manager/utils.py @@ -69,7 +69,6 @@ def wfm_conn(): class CheckpointRestartError(Exception): """Exception to be thrown on checkpoint-restart failure.""" - pass def get_restart_file(task_checkpoint, task_workdir): diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 1dfd80262..5fe70ed32 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -98,7 +98,9 @@ def put(self): new_task = wfi.restart_task(task, checkpoint_file) if new_task is None: log.info('No more restarts') - wf_state = wfi.get_task_state(task) + wf_state = wfi.get_workflow_state() + wf_utils.update_wf_status(wf_id, 'Failed') + db.workflows.update_workflow_state(wf_id, 'Failed') return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") # Submit the restart task From f43c3a2eeaf292aee495b4c592eae00a94dac3a4 Mon Sep 17 00:00:00 2001 From: Krishna Chilleri - 389395 Date: Thu, 8 Feb 2024 13:51:26 -0700 Subject: [PATCH 09/21] Store submit command args in designated workflow directory. --- beeflow/client/bee_client.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index ac22c5e57..5f65b6b5b 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -26,6 +26,7 @@ from beeflow.common.parser import CwlParser from beeflow.common.wf_data import generate_workflow_id from beeflow.client import core +from beeflow.wf_manager.resources import wf_utils # Length of a shortened workflow ID short_id_len = 6 #noqa: Not a constant @@ -281,6 +282,20 @@ def is_parent(parent, path): if tarball_path: os.remove(tarball_path) + # Store provided arguments in text file for future reference + wf_dir = wf_utils.get_workflow_dir(wf_id) + sub_wf_dir = wf_dir + "/submit_command_args.txt" + + f = open(sub_wf_dir, "w") + f.write("beeflow submit wf_name wf_path main_cwl yaml workdir\n") + f.write("wf_name: " + str(wf_name) + "\n") + f.write("wf_path: " + str(wf_path) + "\n") + f.write("main_cwl: " + str(main_cwl) + "\n") + f.write("yaml: " + str(yaml) + "\n") + f.write("workdir: " + str(workdir) + "\n") + f.write("wf_id: " + str(wf_id)) + f.close() + return wf_id From 84620be107096c828c4305f67005d5a614d4f794 Mon Sep 17 00:00:00 2001 From: Krishna Chilleri <149612138+kchilleri@users.noreply.github.com> Date: Thu, 8 Feb 2024 14:13:43 -0700 Subject: [PATCH 10/21] Fix pylint error --- beeflow/client/bee_client.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 5f65b6b5b..6f8ab925d 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -286,15 +286,15 @@ def is_parent(parent, path): wf_dir = wf_utils.get_workflow_dir(wf_id) sub_wf_dir = wf_dir + "/submit_command_args.txt" - f = open(sub_wf_dir, "w") - f.write("beeflow submit wf_name wf_path main_cwl yaml workdir\n") - f.write("wf_name: " + str(wf_name) + "\n") - f.write("wf_path: " + str(wf_path) + "\n") - f.write("main_cwl: " + str(main_cwl) + "\n") - f.write("yaml: " + str(yaml) + "\n") - f.write("workdir: " + str(workdir) + "\n") - f.write("wf_id: " + str(wf_id)) - f.close() + f_name = open(sub_wf_dir, "w") + f_name.write("beeflow submit wf_name wf_path main_cwl yaml workdir\n") + f_name.write("wf_name: " + str(wf_name) + "\n") + f_name.write("wf_path: " + str(wf_path) + "\n") + f_name.write("main_cwl: " + str(main_cwl) + "\n") + f_name.write("yaml: " + str(yaml) + "\n") + f_name.write("workdir: " + str(workdir) + "\n") + f_name.write("wf_id: " + str(wf_id)) + f_name.close() return wf_id From b2ba781b77e7b9866f5dff86e6412cb1ae009ede Mon Sep 17 00:00:00 2001 From: Krishna Chilleri <149612138+kchilleri@users.noreply.github.com> Date: Thu, 8 Feb 2024 14:17:46 -0700 Subject: [PATCH 11/21] Fix pylama error about encoding --- beeflow/client/bee_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 6f8ab925d..6130080fb 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -286,7 +286,7 @@ def is_parent(parent, path): wf_dir = wf_utils.get_workflow_dir(wf_id) sub_wf_dir = wf_dir + "/submit_command_args.txt" - f_name = open(sub_wf_dir, "w") + f_name = open(sub_wf_dir, "w", encoding="utf-8") f_name.write("beeflow submit wf_name wf_path main_cwl yaml workdir\n") f_name.write("wf_name: " + str(wf_name) + "\n") f_name.write("wf_path: " + str(wf_path) + "\n") From dadd8083d49280b1521030a8040e8f71dda436ff Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 13 Feb 2024 12:18:54 -0700 Subject: [PATCH 12/21] Add TSC paper to README --- README.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index cc05d9e20..dd1250d0e 100644 --- a/README.rst +++ b/README.rst @@ -85,9 +85,8 @@ License can be found `here `_ Publications ========================== +- An HPC-Container Based Continuous Integration Tool for Detecting Scaling and Performance Issues in HPC Applications, IEEE Transactions on Services Computing, 2024, `DOI: 10.1109/TSC.2023.3337662 `_ - BEE Orchestrator: Running Complex Scientific Workflows on Multiple Systems, HiPC, 2021, `DOI: 10.1109/HiPC53243.2021.00052 `_ - "BeeSwarm: Enabling Parallel Scaling Performance Measurement in Continuous Integration for HPC Applications", ASE, 2021, `DOI: 10.1109/ASE51524.2021.9678805 `_ - "BeeFlow: A Workflow Management System for In Situ Processing across HPC and Cloud Systems", ICDCS, 2018, `DOI: 10.1109/ICDCS.2018.00103 `_ - "Build and execution environment (BEE): an encapsulated environment enabling HPC applications running everywhere", IEEE BigData, 2018, `DOI: 10.1109/BigData.2018.8622572 `_ - - From 82a75de17360742cbdb915efde06bb15dac52b40 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 14 Feb 2024 11:54:16 -0700 Subject: [PATCH 13/21] Update to Python <= 3.12 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5a0aa8c51..e926c1f67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,8 +47,8 @@ beeflow = 'beeflow.client.bee_client:main' beecloud = 'beeflow.cloud_launcher:main' [tool.poetry.dependencies] -# Python version (>=3.8.3, <3.11) -python = ">=3.8.3,<=3.11" +# Python version (>=3.8.3, <=3.12.2) +python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" } From bd67ad808cf43f7dce69d65f0782240838904086 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 15 Feb 2024 13:52:12 -0700 Subject: [PATCH 14/21] Refactor CI scripts to make them more general Also adds support for using a different bee.conf path with the `BEE_CONFIG` environment variable. --- .github/workflows/integration.yml | 19 ++++++++++--------- .github/workflows/unit-tests.yml | 19 ++++++++++--------- beeflow/common/config_driver.py | 4 ++++ ci/bee_config.sh | 14 ++++++-------- ci/bee_install.sh | 2 -- ci/env.sh | 2 ++ ci/integration_test.sh | 1 - ci/unit_tests.sh | 1 - 8 files changed, 32 insertions(+), 30 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index d01df67f1..f8af883ca 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -26,13 +26,14 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - - name: Dependency Install - run: ./ci/deps_install.sh - - name: Batch Scheduler Install and Start - run: ./ci/batch_scheduler.sh - - name: BEE Install - run: ./ci/bee_install.sh - - name: BEE Config - run: ./ci/bee_config.sh + - name: Install and Configure + run: | + . ./ci/env.sh + ./ci/deps_install.sh + ./ci/batch_scheduler.sh + ./ci/bee_install.sh + ./ci/bee_config.sh - name: Integration Test - run: ./ci/integration_test.sh + run: | + . ./ci/env.sh + ./ci/integration_test.sh diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index a89ef0c6f..f3607b124 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -24,13 +24,14 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - - name: Dependency Install - run: ./ci/deps_install.sh - - name: Slurm Setup and Install - run: ./ci/slurm_start.sh - - name: BEE Install - run: ./ci/bee_install.sh - - name: BEE Config - run: ./ci/bee_config.sh + - name: Install and Configure + run: | + . ./ci/env.sh + ./ci/deps_install.sh + ./ci/batch_scheduler.sh + ./ci/bee_install.sh + ./ci/bee_config.sh - name: Unit tests - run: ./ci/unit_tests.sh + run: | + . ./ci/env.sh + ./ci/unit_tests.sh diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index d4b4a7293..f73c7a6b6 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -35,6 +35,10 @@ else: raise RuntimeError(f'System "{_SYSTEM}" is not supported') +BEE_CONFIG = os.getenv('BEE_CONFIG') +if BEE_CONFIG is not None: + USERCONFIG_FILE = BEE_CONFIG + class BeeConfig: r"""Class to manage and store all BEE configuration. diff --git a/ci/bee_config.sh b/ci/bee_config.sh index 89b9072d7..55659053b 100755 --- a/ci/bee_config.sh +++ b/ci/bee_config.sh @@ -1,10 +1,8 @@ #!/bin/sh # BEE Configuration -. ./ci/env.sh - -mkdir -p ~/.config/beeflow -cat >> ~/.config/beeflow/bee.conf <> $BEE_CONFIG <> ~/.config/beeflow/bee.conf <> $BEE_CONFIG < Date: Thu, 15 Feb 2024 14:26:00 -0700 Subject: [PATCH 15/21] update write format style --- beeflow/client/bee_client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 6130080fb..f61aa9989 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -287,13 +287,13 @@ def is_parent(parent, path): sub_wf_dir = wf_dir + "/submit_command_args.txt" f_name = open(sub_wf_dir, "w", encoding="utf-8") - f_name.write("beeflow submit wf_name wf_path main_cwl yaml workdir\n") - f_name.write("wf_name: " + str(wf_name) + "\n") - f_name.write("wf_path: " + str(wf_path) + "\n") - f_name.write("main_cwl: " + str(main_cwl) + "\n") - f_name.write("yaml: " + str(yaml) + "\n") - f_name.write("workdir: " + str(workdir) + "\n") - f_name.write("wf_id: " + str(wf_id)) + f_name.write(f"beeflow submit wf_name wf_path main_cwl yaml workdir\n") + f_name.write(f"wf_name: {str(wf_name)}\n") + f_name.write(f"wf_path: {str(wf_path)}\n") + f_name.write(f"main_cwl: {str(main_cwl)}\n") + f_name.write(f"yaml: {str(yaml)}\n") + f_name.write(f"workdir: {str(workdir)}\n") + f_name.write(f"wf_id: {str(wf_id)}") f_name.close() return wf_id From 83d440290b1cac027a48d52ddc31fdb2d7f3d5ba Mon Sep 17 00:00:00 2001 From: Krishna Chilleri <149612138+kchilleri@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:29:54 -0700 Subject: [PATCH 16/21] fix f-string error --- beeflow/client/bee_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index f61aa9989..b7e6b5b33 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -287,7 +287,6 @@ def is_parent(parent, path): sub_wf_dir = wf_dir + "/submit_command_args.txt" f_name = open(sub_wf_dir, "w", encoding="utf-8") - f_name.write(f"beeflow submit wf_name wf_path main_cwl yaml workdir\n") f_name.write(f"wf_name: {str(wf_name)}\n") f_name.write(f"wf_path: {str(wf_path)}\n") f_name.write(f"main_cwl: {str(main_cwl)}\n") From 2edb17af5a6db703650d1e296c1724490e250730 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 15 Feb 2024 14:43:00 -0700 Subject: [PATCH 17/21] Show $BEE_CONFIG in ci/bee_config.sh --- ci/bee_config.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/bee_config.sh b/ci/bee_config.sh index 55659053b..2241d34f1 100755 --- a/ci/bee_config.sh +++ b/ci/bee_config.sh @@ -59,7 +59,7 @@ EOF fi printf "\n\n" -printf "#### bee.conf ####\n" +printf "#### %s ####\n" $BEE_CONFIG cat $BEE_CONFIG -printf "#### bee.conf ####\n" +printf "#### %s ####\n" $BEE_CONFIG printf "\n\n" From 59dcf9ffe1f2e97f43633206bbf3383009c43a62 Mon Sep 17 00:00:00 2001 From: Krishna Chilleri <149612138+kchilleri@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:51:19 -0700 Subject: [PATCH 18/21] correcting f-string format style --- beeflow/client/bee_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index b7e6b5b33..eed47db0a 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -287,12 +287,12 @@ def is_parent(parent, path): sub_wf_dir = wf_dir + "/submit_command_args.txt" f_name = open(sub_wf_dir, "w", encoding="utf-8") - f_name.write(f"wf_name: {str(wf_name)}\n") - f_name.write(f"wf_path: {str(wf_path)}\n") - f_name.write(f"main_cwl: {str(main_cwl)}\n") - f_name.write(f"yaml: {str(yaml)}\n") - f_name.write(f"workdir: {str(workdir)}\n") - f_name.write(f"wf_id: {str(wf_id)}") + f_name.write(f"wf_name: {wf_name}\n") + f_name.write(f"wf_path: {wf_path}\n") + f_name.write(f"main_cwl: {main_cwl}\n") + f_name.write(f"yaml: {yaml}\n") + f_name.write(f"workdir: {workdir}\n") + f_name.write(f"wf_id: {wf_id}") f_name.close() return wf_id From 1b8f3fd81f5a1323fb9e550c49ccd7af345efa12 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Thu, 15 Feb 2024 15:27:00 -0700 Subject: [PATCH 19/21] Add Krisna to contributors --- README.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/README.rst b/README.rst index dd1250d0e..d83f2103d 100644 --- a/README.rst +++ b/README.rst @@ -42,6 +42,7 @@ Contributors: * Paul Bryant - `paulbry `_ * Rusty Davis - `rstyd `_ * Jieyang Chen - `JieyangChen7 `_ +* Krishna Chilleri - `Krishna Chilleri `_ * Patricia Grubel - `pagrubel `_ * Qiang Guan - `guanxyz `_ * Ragini Gupta - `raginigupta6 `_ From 77adbc339266b4a6a785fa2b0ed21158c3f5ff27 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 15 Feb 2024 22:50:58 +0000 Subject: [PATCH 20/21] Bump follow-redirects from 1.15.2 to 1.15.5 in /beeflow/enhanced_client Bumps [follow-redirects](https://github.com/follow-redirects/follow-redirects) from 1.15.2 to 1.15.5. - [Release notes](https://github.com/follow-redirects/follow-redirects/releases) - [Commits](https://github.com/follow-redirects/follow-redirects/compare/v1.15.2...v1.15.5) --- updated-dependencies: - dependency-name: follow-redirects dependency-type: indirect ... Signed-off-by: dependabot[bot] --- beeflow/enhanced_client/package-lock.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beeflow/enhanced_client/package-lock.json b/beeflow/enhanced_client/package-lock.json index bfd8236c3..2420b0e70 100644 --- a/beeflow/enhanced_client/package-lock.json +++ b/beeflow/enhanced_client/package-lock.json @@ -3187,9 +3187,9 @@ } }, "node_modules/follow-redirects": { - "version": "1.15.2", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.2.tgz", - "integrity": "sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==", + "version": "1.15.5", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.5.tgz", + "integrity": "sha512-vSFWUON1B+yAw1VN4xMfxgn5fTUiaOzAJCKBwIIgT/+7CuGy9+r+5gITvP62j3RmaD5Ph65UaERdOSRGUzZtgw==", "funding": [ { "type": "individual", From dae3dcd67338a99dea087a0a5f6eaf71153f5e0e Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 15 Feb 2024 15:57:22 -0700 Subject: [PATCH 21/21] Export all environment variables in CI --- ci/env.sh | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/ci/env.sh b/ci/env.sh index bb45a56ef..e7e6bfaab 100644 --- a/ci/env.sh +++ b/ci/env.sh @@ -1,31 +1,31 @@ # Environment set up # Set up environment -PYTHON=python3 -HOSTNAME=`$PYTHON -c 'import socket; print(socket.gethostname())'` +export PYTHON=python3 +export HOSTNAME=`$PYTHON -c 'import socket; print(socket.gethostname())'` # Everything is in /tmp for right now -SLURMCTLD_PID=/tmp/slurmctld.pid -SLURMD_PID=/tmp/slurmd.pid -SLURMD_SPOOL_DIR=/tmp/slurm_spool -LOG_DIR=/tmp/slurm_log -SLURM_STATE_SAVE_LOCATION=/tmp/slurm_state +export SLURMCTLD_PID=/tmp/slurmctld.pid +export SLURMD_PID=/tmp/slurmd.pid +export SLURMD_SPOOL_DIR=/tmp/slurm_spool +export LOG_DIR=/tmp/slurm_log +export SLURM_STATE_SAVE_LOCATION=/tmp/slurm_state mkdir -p $SLURMD_SPOOL_DIR $SLURM_STATE_SAVE_LOCATION $LOG_DIR -SLURMCTLD_LOG=$LOG_DIR/slurmctld.log -SLURMD_LOG=$LOG_DIR/slurmd.log -SLURM_USER=`whoami` -MUNGE_SOCKET=/tmp/munge.sock -MUNGE_LOG=/tmp/munge.log -MUNGE_PID=/tmp/munge.pid +export SLURMCTLD_LOG=$LOG_DIR/slurmctld.log +export SLURMD_LOG=$LOG_DIR/slurmd.log +export SLURM_USER=`whoami` +export MUNGE_SOCKET=/tmp/munge.sock +export MUNGE_LOG=/tmp/munge.log +export MUNGE_PID=/tmp/munge.pid mkdir -p /tmp/munge -MUNGE_KEY=/tmp/munge/munge.key +export MUNGE_KEY=/tmp/munge/munge.key # Determine config of CI host -NODE_CONFIG=`slurmd -C | head -n 1` -BEE_WORKDIR=$HOME/.beeflow -NEO4J_CONTAINER=$HOME/img/neo4j.tar.gz -REDIS_CONTAINER=$HOME/img/redis.tar.gz +export NODE_CONFIG=`slurmd -C | head -n 1` +export BEE_WORKDIR=$HOME/.beeflow +export NEO4J_CONTAINER=$HOME/img/neo4j.tar.gz +export REDIS_CONTAINER=$HOME/img/redis.tar.gz mkdir -p $BEE_WORKDIR export SLURM_CONF=~/slurm.conf # Flux variables -FLUX_CORE_VERSION=0.51.0 -FLUX_SECURITY_VERSION=0.9.0 -BEE_CONFIG=$HOME/.config/beeflow/bee.conf -OPENAPI_VERSION=v0.0.37 +export FLUX_CORE_VERSION=0.51.0 +export FLUX_SECURITY_VERSION=0.9.0 +export BEE_CONFIG=$HOME/.config/beeflow/bee.conf +export OPENAPI_VERSION=v0.0.37