Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete job files when rerunning; customizable execution command #201

Merged
merged 7 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions doc/source/user/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ the ``READY`` state, this can be achieved by running::
the Jobs in the ``REMOTE_ERROR`` state. Check ``jf job retry -h`` for the full
list of options available.

.. warning::
Rerunning by default also marks the Job so that the folder on the worker where it
was executed will be deleted. The deletion is performed by the Runner before
reuploading the inputs, i.e. before the Job gets to the `UPLOADED` state.
To avoid it use the ``--no-delete`` option.

.. warning::
It is impossible to provide an exhaustive list of potential issues that could lead to
a ``REMOTE_ERROR`` state. So except in some well defined cases, the error messages will be
Expand Down Expand Up @@ -248,6 +254,13 @@ job with::

will solve the issue.

.. warning::
Rerunning by default also marks the Job so that the folder on the worker where it
was executed will be deleted. The deletion is performed by the Runner before
reuploading the inputs, i.e. before the Job gets to the `UPLOADED` state. If, as effect
of a rerun children Jobs are also rerun, their folders will be marked for deletion as
well. To avoid it use the ``--no-delete`` option.

.. note::

Only ``jf job rerun`` should be applied to ``FAILED`` Jobs. ``jf job retry`` is **not**
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ filterwarnings = [
"ignore:.*magmom.*:UserWarning",
"ignore::DeprecationWarning",
]
addopts = [
"--import-mode=importlib",
]

[tool.coverage.run]
include = ["src/*"]
Expand Down
14 changes: 12 additions & 2 deletions src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
from datetime import datetime
from typing import Annotated, Optional

Expand Down Expand Up @@ -194,8 +195,17 @@ def delete(
raise typer.Exit(0)

to_delete = [fi.flow_id for fi in flows_info]
with loading_spinner(processing=False) as progress:
progress.add_task(description="Deleting flows...", total=None)

# if potentially interactive do not start the spinner.
spinner_cm: contextlib.AbstractContextManager
if delete_files and jc.project.has_interactive_workers:
spinner_cm = contextlib.nullcontext()
out_console.print("Deleting flows...")
else:
spinner_cm = loading_spinner(processing=False)
with spinner_cm as progress:
if progress:
progress.add_task(description="Deleting flows...", total=None)

jc.delete_flows(
flow_ids=to_delete,
Expand Down
15 changes: 14 additions & 1 deletion src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ def rerun(
),
] = False,
raise_on_error: raise_on_error_opt = False,
no_delete: Annotated[
bool,
typer.Option(
"--no-delete",
"-nd",
help=("Skip the delete of the files on the worker."),
),
] = False,
) -> None:
"""
Rerun a Job. By default, this is limited to jobs that failed and children did
Expand All @@ -313,6 +321,8 @@ def rerun(
will be cancelled. Most of the limitations can be overridden by the 'force'
option. This could lead to inconsistencies in the overall state of the Jobs of
the Flow.
All the folders of the Jobs whose state are modified will also be deleted on
the worker.
"""
if force or break_lock:
check_stopped_runner(error=False)
Expand Down Expand Up @@ -341,6 +351,8 @@ def rerun(
break_lock=break_lock,
force=force,
raise_on_error=raise_on_error,
interactive=True,
delete_files=not no_delete,
)


Expand Down Expand Up @@ -600,6 +612,7 @@ def delete(
verbosity=verbosity,
wait=wait,
raise_on_error=raise_on_error,
interactive=True,
delete_output=delete_output,
delete_files=delete_files,
)
Expand Down Expand Up @@ -1055,7 +1068,7 @@ def output(
),
] = False,
) -> None:
"""Detailed information on a specific job."""
"""Fetch the output of a Job from the output Store."""
db_id, job_id = get_job_db_ids(job_db_id, job_index)

with loading_spinner():
Expand Down
66 changes: 64 additions & 2 deletions src/jobflow_remote/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import contextlib
import functools
import json
import logging
Expand Down Expand Up @@ -370,8 +371,61 @@ def execute_multi_jobs_cmd(
custom_query: dict | None = None,
verbosity: int = 0,
raise_on_error: bool = False,
interactive: bool = True,
**kwargs,
) -> None:
"""
Utility function to execute a command on a single job or on a set of jobs.
Checks the query options, determine whether the command is single or multi
and call the corresponding function.

Parameters
----------
single_cmd
A function that takes the job_id and job_index as arguments and performs an
action on a single job.
multi_cmd
A function that takes a set of standard arguments and performs an
action on multiple jobs.
job_db_id
The job id or db_id of a job to be considered. If specified, all the other
query options should be disabled.
job_index
The index of the job in the DB.
job_ids
A list of job ids to be considered.
db_ids
A list of db ids to be considered.
flow_ids
A list of flow ids to be considered.
states
The states of the jobs to be considered.
start_date
The start date of the jobs to be considered.
end_date
The end date of the jobs to be considered.
name
The name of the jobs to be considered.
metadata
The metadata of the jobs to be considered.
days
Set the start_date based on the number of past days.
hours
Set the start_date based on the number of past hours.
workers
Workers associated with the jobs to be considered.
custom_query
A custom query to be used to filter the jobs.
verbosity
The verbosity of the output.
raise_on_error
If True, an error will be raised if the operation fails.
interactive
If True, a spinner will be shown even if the operation is not interactive while
the operation is being performed.
**kwargs : dict
Additional arguments to be passed to the single_cmd and multi_cmd functions.
"""
query_values = [
job_ids,
db_ids,
Expand All @@ -386,13 +440,21 @@ def execute_multi_jobs_cmd(
workers,
custom_query,
]
# if potentially interactive do not start the spinner.
cm = get_config_manager()
spinner_cm: contextlib.AbstractContextManager
if interactive and cm.get_project().has_interactive_workers:
spinner_cm = contextlib.nullcontext()
out_console.print("Processing...")
else:
spinner_cm = loading_spinner()
try:
if job_db_id is not None:
if any(query_values):
msg = "If job_db_id is defined all the other query options should be disabled"
exit_with_error_msg(msg)
db_id, job_id = get_job_db_ids(job_db_id, job_index)
with loading_spinner():
with spinner_cm:
modified_ids = single_cmd(
job_id=job_id, job_index=job_index, db_id=db_id, **kwargs
)
Expand Down Expand Up @@ -448,7 +510,7 @@ def execute_multi_jobs_cmd(
if not confirmed:
raise typer.Exit(0) # noqa: TRY301

with loading_spinner():
with spinner_cm:
modified_ids = multi_cmd(
job_ids=job_ids_indexes,
db_ids=db_ids,
Expand Down
27 changes: 27 additions & 0 deletions src/jobflow_remote/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ class WorkerBase(BaseModel):
None,
description="String with commands that will be executed after the execution of the Job",
)
execution_cmd: Optional[str] = Field(
None,
description="String with commands to execute the Job on the worker. By default will be "
"set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to "
davidwaroquiers marked this conversation as resolved.
Show resolved Hide resolved
"the execution directory and it is mandatory. Change only for specific needs (e.g. the"
"jf command needs to be executed in a container).",
davidwaroquiers marked this conversation as resolved.
Show resolved Hide resolved
)
timeout_execute: int = Field(
60,
description="Timeout for the execution of the commands in the worker "
Expand Down Expand Up @@ -208,6 +215,16 @@ def check_work_dir(cls, v) -> Path:
raise ValueError("`work_dir` must be an absolute path")
return v

@field_validator("execution_cmd")
@classmethod
def check_execution_cmd(cls, v) -> Optional[str]:
if v is not None and "{}" not in v:
raise ValueError(
"`execution_cmd` must contain a '{}' part to allow setting "
"the execution folder at runtime"
)
return v

def get_scheduler_io(self) -> BaseSchedulerIO:
"""
Get the BaseSchedulerIO from QToolKit depending on scheduler_type.
Expand Down Expand Up @@ -638,6 +655,16 @@ def check_jobstore(cls, jobstore: dict) -> dict:
) from e
return jobstore

@property
def has_interactive_workers(self) -> bool:
"""
True if any of the workers have interactive_login set to True.
"""
for worker in self.workers.values():
if isinstance(worker, RemoteWorker) and worker.interactive_login:
return True
return False

model_config = ConfigDict(extra="forbid")


Expand Down
1 change: 1 addition & 0 deletions src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class RemoteInfo(BaseModel):
process_id: Optional[str] = None
retry_time_limit: Optional[datetime] = None
error: Optional[str] = None
cleanup: bool = False
gpetretto marked this conversation as resolved.
Show resolved Hide resolved


class JobInfo(BaseModel):
Expand Down
Loading