From c107f4baee716bb1fe990823e08f4309a58db577 Mon Sep 17 00:00:00 2001 From: Steven Ray Anaya Date: Wed, 31 Jan 2024 11:10:42 -0700 Subject: [PATCH 1/6] Increase time steps to reduce chance of success --- .../cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml b/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml index 7f57a05ee..5609cfe69 100644 --- a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml +++ b/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml @@ -1,7 +1,7 @@ { "grid_resolution": 32, "max_levels": 3, - "time_steps": 80000, + "time_steps": 150000, "steps_between_outputs": 10, "steps_between_graphics": 25, "graphics_type": "png", From 79e73a0809170d7ab7d48afe38a67d83024b9884 Mon Sep 17 00:00:00 2001 From: Steven Ray Anaya Date: Wed, 31 Jan 2024 11:11:05 -0700 Subject: [PATCH 2/6] Fixed checkpoint restart num_tries not working --- beeflow/common/parser/parser.py | 8 +++++++- beeflow/common/wf_interface.py | 5 ++--- beeflow/wf_manager/resources/wf_update.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 9c58e8d51..f64b008ed 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -302,7 +302,13 @@ def parse_requirements(self, requirements, as_hints=False): return reqs if as_hints: for req in requirements: - items = {k: str(v) for k, v in req.items() if k != "class"} + items = {} + for k, v in req.items(): + if k != 'class': + if isinstance(v, (int, float)): + items[k] = v + else: + items[k] = str(v) # Load in the dockerfile at parse time if 'dockerFile' in items: self._read_requirement_file('dockerFile', items) diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index 38849342c..edc28ead7 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -5,7 +5,6 @@ import re - class WorkflowInterface: """Interface for manipulating workflows.""" @@ -94,6 +93,7 @@ def restart_task(self, task, checkpoint_file): :param task: the task to restart :type task: Task + :param checkpoint_file: the task checkpoint file :rtype: Task or None """ for hint in task.hints: @@ -102,8 +102,7 @@ def restart_task(self, task, checkpoint_file): hint.params["num_tries"] -= 1 hint.params["bee_checkpoint_file__"] = checkpoint_file break - state = self.get_task_state(task) - self.set_task_state(task, f"FAILED RESTART: {state}") + self.set_task_state(task, f"FAILED") return None else: raise ValueError("invalid task for checkpoint restart") diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index a2ee2aa36..1dfd80262 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -96,11 +96,11 @@ def put(self): task_info = jsonpickle.decode(data['task_info']) checkpoint_file = task_info['checkpoint_file'] new_task = wfi.restart_task(task, checkpoint_file) - db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") if new_task is None: log.info('No more restarts') wf_state = wfi.get_task_state(task) return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) + db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") # Submit the restart task tasks = [new_task] wf_utils.schedule_submit_tasks(wf_id, tasks) From fb206d0ee9e45b22652db24e095eb067cc2c2318 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 6 Feb 2024 15:16:55 -0700 Subject: [PATCH 3/6] Fix checkpoint-too-long workflow and add better error checking to get_restart_file() --- beeflow/task_manager/background.py | 12 ++++++---- beeflow/task_manager/utils.py | 24 +++++++++++++------ .../checkpoint-too-long/workflow.cwl | 4 ++-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 10ca9de77..c07a4ca35 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -89,10 +89,14 @@ def update_jobs(): log.info(f'state: {new_job_state}') log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}') if task_checkpoint: - checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) - task_info = {'checkpoint_file': checkpoint_file, 'restart': True} - update_task_state(task.workflow_id, task.id, new_job_state, - task_info=task_info) + try: + checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) + task_info = {'checkpoint_file': checkpoint_file, 'restart': True} + update_task_state(task.workflow_id, task.id, new_job_state, + task_info=task_info) + except utils.CheckpointRestartError as err: + log.error(f'Checkpoint restart failed for {task.name} ({task.id}): {err}') + update_task_state(task.workflow_id, task.id, 'FAILED') else: update_task_state(task.workflow_id, task.id, new_job_state) else: diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py index d0459e12f..6ecdec5e0 100644 --- a/beeflow/task_manager/utils.py +++ b/beeflow/task_manager/utils.py @@ -67,22 +67,32 @@ def wfm_conn(): return Connection(paths.wfm_socket()) +class CheckpointRestartError(Exception): + """Exception to be thrown on checkpoint-restart failure.""" + pass + + def get_restart_file(task_checkpoint, task_workdir): """Find latest checkpoint file.""" if 'file_regex' not in task_checkpoint: - raise RuntimeError('file_regex is required for checkpointing') + raise CheckpointRestartError('file_regex is required for checkpointing') if 'file_path' not in task_checkpoint: - raise RuntimeError('file_path is required for checkpointing') + raise CheckpointRestartError('file_path is required for checkpointing') file_regex = task_checkpoint['file_regex'] file_path = Path(task_workdir, task_checkpoint['file_path']) regex = re.compile(file_regex) - checkpoint_files = [ - Path(file_path, fname) for fname in os.listdir(file_path) - if regex.match(fname) - ] + try: + checkpoint_files = [ + Path(file_path, fname) for fname in os.listdir(file_path) + if regex.match(fname) + ] + except FileNotFoundError: + raise CheckpointRestartError( + f'Checkpoint file_path ("{file_path}") not found' + ) from None checkpoint_files.sort(key=os.path.getmtime) try: checkpoint_file = checkpoint_files[-1] return str(checkpoint_file) except IndexError: - raise RuntimeError('Missing checkpoint file for task') from None + raise CheckpointRestartError('Missing checkpoint file for task') from None diff --git a/ci/test_workflows/checkpoint-too-long/workflow.cwl b/ci/test_workflows/checkpoint-too-long/workflow.cwl index 0fa9086e6..b8ea7f009 100644 --- a/ci/test_workflows/checkpoint-too-long/workflow.cwl +++ b/ci/test_workflows/checkpoint-too-long/workflow.cwl @@ -20,8 +20,8 @@ steps: hints: beeflow:CheckpointRequirement: enabled: true - file_path: checkpoint_output - container_path: checkpoint_output + file_path: . + container_path: . file_regex: backup[0-9]*.crx restart_parameters: -R num_tries: 1 From 9ce72391214c235818014fada8de338f55fc0936 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 7 Feb 2024 09:01:54 -0700 Subject: [PATCH 4/6] Ensure that checkpoint-restart workflows fail after exceeding num_tries --- beeflow/common/wf_interface.py | 4 +++- beeflow/task_manager/utils.py | 1 - beeflow/wf_manager/resources/wf_update.py | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index edc28ead7..f53f76725 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -5,6 +5,7 @@ import re + class WorkflowInterface: """Interface for manipulating workflows.""" @@ -102,7 +103,8 @@ def restart_task(self, task, checkpoint_file): hint.params["num_tries"] -= 1 hint.params["bee_checkpoint_file__"] = checkpoint_file break - self.set_task_state(task, f"FAILED") + self.set_task_state(task, "FAILED") + self.set_workflow_state("FAILED") return None else: raise ValueError("invalid task for checkpoint restart") diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py index 6ecdec5e0..3378c442e 100644 --- a/beeflow/task_manager/utils.py +++ b/beeflow/task_manager/utils.py @@ -69,7 +69,6 @@ def wfm_conn(): class CheckpointRestartError(Exception): """Exception to be thrown on checkpoint-restart failure.""" - pass def get_restart_file(task_checkpoint, task_workdir): diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 1dfd80262..5fe70ed32 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -98,7 +98,9 @@ def put(self): new_task = wfi.restart_task(task, checkpoint_file) if new_task is None: log.info('No more restarts') - wf_state = wfi.get_task_state(task) + wf_state = wfi.get_workflow_state() + wf_utils.update_wf_status(wf_id, 'Failed') + db.workflows.update_workflow_state(wf_id, 'Failed') return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") # Submit the restart task From dadd8083d49280b1521030a8040e8f71dda436ff Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 13 Feb 2024 12:18:54 -0700 Subject: [PATCH 5/6] Add TSC paper to README --- README.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index cc05d9e20..dd1250d0e 100644 --- a/README.rst +++ b/README.rst @@ -85,9 +85,8 @@ License can be found `here `_ Publications ========================== +- An HPC-Container Based Continuous Integration Tool for Detecting Scaling and Performance Issues in HPC Applications, IEEE Transactions on Services Computing, 2024, `DOI: 10.1109/TSC.2023.3337662 `_ - BEE Orchestrator: Running Complex Scientific Workflows on Multiple Systems, HiPC, 2021, `DOI: 10.1109/HiPC53243.2021.00052 `_ - "BeeSwarm: Enabling Parallel Scaling Performance Measurement in Continuous Integration for HPC Applications", ASE, 2021, `DOI: 10.1109/ASE51524.2021.9678805 `_ - "BeeFlow: A Workflow Management System for In Situ Processing across HPC and Cloud Systems", ICDCS, 2018, `DOI: 10.1109/ICDCS.2018.00103 `_ - "Build and execution environment (BEE): an encapsulated environment enabling HPC applications running everywhere", IEEE BigData, 2018, `DOI: 10.1109/BigData.2018.8622572 `_ - - From 82a75de17360742cbdb915efde06bb15dac52b40 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 14 Feb 2024 11:54:16 -0700 Subject: [PATCH 6/6] Update to Python <= 3.12 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5a0aa8c51..e926c1f67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,8 +47,8 @@ beeflow = 'beeflow.client.bee_client:main' beecloud = 'beeflow.cloud_launcher:main' [tool.poetry.dependencies] -# Python version (>=3.8.3, <3.11) -python = ">=3.8.3,<=3.11" +# Python version (>=3.8.3, <=3.12.2) +python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" }