diff --git a/README.rst b/README.rst index cc05d9e20..dd1250d0e 100644 --- a/README.rst +++ b/README.rst @@ -85,9 +85,8 @@ License can be found `here `_ Publications ========================== +- An HPC-Container Based Continuous Integration Tool for Detecting Scaling and Performance Issues in HPC Applications, IEEE Transactions on Services Computing, 2024, `DOI: 10.1109/TSC.2023.3337662 `_ - BEE Orchestrator: Running Complex Scientific Workflows on Multiple Systems, HiPC, 2021, `DOI: 10.1109/HiPC53243.2021.00052 `_ - "BeeSwarm: Enabling Parallel Scaling Performance Measurement in Continuous Integration for HPC Applications", ASE, 2021, `DOI: 10.1109/ASE51524.2021.9678805 `_ - "BeeFlow: A Workflow Management System for In Situ Processing across HPC and Cloud Systems", ICDCS, 2018, `DOI: 10.1109/ICDCS.2018.00103 `_ - "Build and execution environment (BEE): an encapsulated environment enabling HPC applications running everywhere", IEEE BigData, 2018, `DOI: 10.1109/BigData.2018.8622572 `_ - - diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 3a4689c17..c6e845f66 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -307,7 +307,13 @@ def parse_requirements(self, requirements, as_hints=False): return reqs if as_hints: for req in requirements: - items = {k: str(v) for k, v in req.items() if k != "class"} + items = {} + for k, v in req.items(): + if k != 'class': + if isinstance(v, (int, float)): + items[k] = v + else: + items[k] = str(v) # Load in the dockerfile at parse time if 'dockerFile' in items: self._read_requirement_file('dockerFile', items) diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index 38849342c..f53f76725 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -94,6 +94,7 @@ def restart_task(self, task, checkpoint_file): :param task: the task to restart :type task: Task + :param checkpoint_file: the task checkpoint file :rtype: Task or None """ for hint in task.hints: @@ -102,8 +103,8 @@ def restart_task(self, task, checkpoint_file): hint.params["num_tries"] -= 1 hint.params["bee_checkpoint_file__"] = checkpoint_file break - state = self.get_task_state(task) - self.set_task_state(task, f"FAILED RESTART: {state}") + self.set_task_state(task, "FAILED") + self.set_workflow_state("FAILED") return None else: raise ValueError("invalid task for checkpoint restart") diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 10ca9de77..c07a4ca35 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -89,10 +89,14 @@ def update_jobs(): log.info(f'state: {new_job_state}') log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}') if task_checkpoint: - checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) - task_info = {'checkpoint_file': checkpoint_file, 'restart': True} - update_task_state(task.workflow_id, task.id, new_job_state, - task_info=task_info) + try: + checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir) + task_info = {'checkpoint_file': checkpoint_file, 'restart': True} + update_task_state(task.workflow_id, task.id, new_job_state, + task_info=task_info) + except utils.CheckpointRestartError as err: + log.error(f'Checkpoint restart failed for {task.name} ({task.id}): {err}') + update_task_state(task.workflow_id, task.id, 'FAILED') else: update_task_state(task.workflow_id, task.id, new_job_state) else: diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py index d0459e12f..3378c442e 100644 --- a/beeflow/task_manager/utils.py +++ b/beeflow/task_manager/utils.py @@ -67,22 +67,31 @@ def wfm_conn(): return Connection(paths.wfm_socket()) +class CheckpointRestartError(Exception): + """Exception to be thrown on checkpoint-restart failure.""" + + def get_restart_file(task_checkpoint, task_workdir): """Find latest checkpoint file.""" if 'file_regex' not in task_checkpoint: - raise RuntimeError('file_regex is required for checkpointing') + raise CheckpointRestartError('file_regex is required for checkpointing') if 'file_path' not in task_checkpoint: - raise RuntimeError('file_path is required for checkpointing') + raise CheckpointRestartError('file_path is required for checkpointing') file_regex = task_checkpoint['file_regex'] file_path = Path(task_workdir, task_checkpoint['file_path']) regex = re.compile(file_regex) - checkpoint_files = [ - Path(file_path, fname) for fname in os.listdir(file_path) - if regex.match(fname) - ] + try: + checkpoint_files = [ + Path(file_path, fname) for fname in os.listdir(file_path) + if regex.match(fname) + ] + except FileNotFoundError: + raise CheckpointRestartError( + f'Checkpoint file_path ("{file_path}") not found' + ) from None checkpoint_files.sort(key=os.path.getmtime) try: checkpoint_file = checkpoint_files[-1] return str(checkpoint_file) except IndexError: - raise RuntimeError('Missing checkpoint file for task') from None + raise CheckpointRestartError('Missing checkpoint file for task') from None diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index a2ee2aa36..5fe70ed32 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -96,11 +96,13 @@ def put(self): task_info = jsonpickle.decode(data['task_info']) checkpoint_file = task_info['checkpoint_file'] new_task = wfi.restart_task(task, checkpoint_file) - db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") if new_task is None: log.info('No more restarts') - wf_state = wfi.get_task_state(task) + wf_state = wfi.get_workflow_state() + wf_utils.update_wf_status(wf_id, 'Failed') + db.workflows.update_workflow_state(wf_id, 'Failed') return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) + db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING") # Submit the restart task tasks = [new_task] wf_utils.schedule_submit_tasks(wf_id, tasks) diff --git a/ci/test_workflows/checkpoint-too-long/workflow.cwl b/ci/test_workflows/checkpoint-too-long/workflow.cwl index 0fa9086e6..b8ea7f009 100644 --- a/ci/test_workflows/checkpoint-too-long/workflow.cwl +++ b/ci/test_workflows/checkpoint-too-long/workflow.cwl @@ -20,8 +20,8 @@ steps: hints: beeflow:CheckpointRequirement: enabled: true - file_path: checkpoint_output - container_path: checkpoint_output + file_path: . + container_path: . file_regex: backup[0-9]*.crx restart_parameters: -R num_tries: 1 diff --git a/ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml b/ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml index 7f57a05ee..5609cfe69 100644 --- a/ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml +++ b/ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml @@ -1,7 +1,7 @@ { "grid_resolution": 32, "max_levels": 3, - "time_steps": 80000, + "time_steps": 150000, "steps_between_outputs": 10, "steps_between_graphics": 25, "graphics_type": "png", diff --git a/pyproject.toml b/pyproject.toml index 5a0aa8c51..e926c1f67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,8 +47,8 @@ beeflow = 'beeflow.client.bee_client:main' beecloud = 'beeflow.cloud_launcher:main' [tool.poetry.dependencies] -# Python version (>=3.8.3, <3.11) -python = ">=3.8.3,<=3.11" +# Python version (>=3.8.3, <=3.12.2) +python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" }