Skip to content

Commit

Permalink
Merge branch 'develop' into issue742/archive-should-contain-submit-co…
Browse files Browse the repository at this point in the history
…mmand
  • Loading branch information
pagrubel committed Feb 15, 2024
2 parents 59dcf9f + 113561b commit d1249ef
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 23 deletions.
3 changes: 1 addition & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ License can be found `here <https://github.com/lanl/BEE/blob/master/LICENSE>`_
Publications
==========================

- An HPC-Container Based Continuous Integration Tool for Detecting Scaling and Performance Issues in HPC Applications, IEEE Transactions on Services Computing, 2024, `DOI: 10.1109/TSC.2023.3337662 <https://doi.ieeecomputersociety.org/10.1109/TSC.2023.3337662>`_
- BEE Orchestrator: Running Complex Scientific Workflows on Multiple Systems, HiPC, 2021, `DOI: 10.1109/HiPC53243.2021.00052 <https://doi.org/10.1109/HiPC53243.2021.00052>`_
- "BeeSwarm: Enabling Parallel Scaling Performance Measurement in Continuous Integration for HPC Applications", ASE, 2021, `DOI: 10.1109/ASE51524.2021.9678805 <https://www.computer.org/csdl/proceedings-article/ase/2021/033700b136/1AjTjgnW2pa#:~:text=10.1109/ASE51524.2021.9678805>`_
- "BeeFlow: A Workflow Management System for In Situ Processing across HPC and Cloud Systems", ICDCS, 2018, `DOI: 10.1109/ICDCS.2018.00103 <https://ieeexplore.ieee.org/abstract/document/8416366>`_
- "Build and execution environment (BEE): an encapsulated environment enabling HPC applications running everywhere", IEEE BigData, 2018, `DOI: 10.1109/BigData.2018.8622572 <https://ieeexplore.ieee.org/document/8622572>`_


8 changes: 7 additions & 1 deletion beeflow/common/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,13 @@ def parse_requirements(self, requirements, as_hints=False):
return reqs
if as_hints:
for req in requirements:
items = {k: str(v) for k, v in req.items() if k != "class"}
items = {}
for k, v in req.items():
if k != 'class':
if isinstance(v, (int, float)):
items[k] = v
else:
items[k] = str(v)
# Load in the dockerfile at parse time
if 'dockerFile' in items:
self._read_requirement_file('dockerFile', items)
Expand Down
5 changes: 3 additions & 2 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def restart_task(self, task, checkpoint_file):
:param task: the task to restart
:type task: Task
:param checkpoint_file: the task checkpoint file
:rtype: Task or None
"""
for hint in task.hints:
Expand All @@ -102,8 +103,8 @@ def restart_task(self, task, checkpoint_file):
hint.params["num_tries"] -= 1
hint.params["bee_checkpoint_file__"] = checkpoint_file
break
state = self.get_task_state(task)
self.set_task_state(task, f"FAILED RESTART: {state}")
self.set_task_state(task, "FAILED")
self.set_workflow_state("FAILED")
return None
else:
raise ValueError("invalid task for checkpoint restart")
Expand Down
12 changes: 8 additions & 4 deletions beeflow/task_manager/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,14 @@ def update_jobs():
log.info(f'state: {new_job_state}')
log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}')
if task_checkpoint:
checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir)
task_info = {'checkpoint_file': checkpoint_file, 'restart': True}
update_task_state(task.workflow_id, task.id, new_job_state,
task_info=task_info)
try:
checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir)
task_info = {'checkpoint_file': checkpoint_file, 'restart': True}
update_task_state(task.workflow_id, task.id, new_job_state,
task_info=task_info)
except utils.CheckpointRestartError as err:
log.error(f'Checkpoint restart failed for {task.name} ({task.id}): {err}')
update_task_state(task.workflow_id, task.id, 'FAILED')
else:
update_task_state(task.workflow_id, task.id, new_job_state)
else:
Expand Down
23 changes: 16 additions & 7 deletions beeflow/task_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,31 @@ def wfm_conn():
return Connection(paths.wfm_socket())


class CheckpointRestartError(Exception):
"""Exception to be thrown on checkpoint-restart failure."""


def get_restart_file(task_checkpoint, task_workdir):
"""Find latest checkpoint file."""
if 'file_regex' not in task_checkpoint:
raise RuntimeError('file_regex is required for checkpointing')
raise CheckpointRestartError('file_regex is required for checkpointing')
if 'file_path' not in task_checkpoint:
raise RuntimeError('file_path is required for checkpointing')
raise CheckpointRestartError('file_path is required for checkpointing')
file_regex = task_checkpoint['file_regex']
file_path = Path(task_workdir, task_checkpoint['file_path'])
regex = re.compile(file_regex)
checkpoint_files = [
Path(file_path, fname) for fname in os.listdir(file_path)
if regex.match(fname)
]
try:
checkpoint_files = [
Path(file_path, fname) for fname in os.listdir(file_path)
if regex.match(fname)
]
except FileNotFoundError:
raise CheckpointRestartError(
f'Checkpoint file_path ("{file_path}") not found'
) from None
checkpoint_files.sort(key=os.path.getmtime)
try:
checkpoint_file = checkpoint_files[-1]
return str(checkpoint_file)
except IndexError:
raise RuntimeError('Missing checkpoint file for task') from None
raise CheckpointRestartError('Missing checkpoint file for task') from None
6 changes: 4 additions & 2 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ def put(self):
task_info = jsonpickle.decode(data['task_info'])
checkpoint_file = task_info['checkpoint_file']
new_task = wfi.restart_task(task, checkpoint_file)
db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING")
if new_task is None:
log.info('No more restarts')
wf_state = wfi.get_task_state(task)
wf_state = wfi.get_workflow_state()
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
return make_response(jsonify(status=f'Task {task_id} set to {job_state}'))
db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING")
# Submit the restart task
tasks = [new_task]
wf_utils.schedule_submit_tasks(wf_id, tasks)
Expand Down
4 changes: 2 additions & 2 deletions ci/test_workflows/checkpoint-too-long/workflow.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ steps:
hints:
beeflow:CheckpointRequirement:
enabled: true
file_path: checkpoint_output
container_path: checkpoint_output
file_path: .
container_path: .
file_regex: backup[0-9]*.crx
restart_parameters: -R
num_tries: 1
Expand Down
2 changes: 1 addition & 1 deletion ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"grid_resolution": 32,
"max_levels": 3,
"time_steps": 80000,
"time_steps": 150000,
"steps_between_outputs": 10,
"steps_between_graphics": 25,
"graphics_type": "png",
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ beeflow = 'beeflow.client.bee_client:main'
beecloud = 'beeflow.cloud_launcher:main'

[tool.poetry.dependencies]
# Python version (>=3.8.3, <3.11)
python = ">=3.8.3,<=3.11"
# Python version (>=3.8.3, <=3.12.2)
python = ">=3.8.3,<=3.12.2"

# Package dependencies
Flask = { version = "^2.0" }
Expand Down

0 comments on commit d1249ef

Please sign in to comment.