diff --git a/doc/source/user/errors.rst b/doc/source/user/errors.rst index 87c230e6..7c66a82c 100644 --- a/doc/source/user/errors.rst +++ b/doc/source/user/errors.rst @@ -157,6 +157,12 @@ the ``READY`` state, this can be achieved by running:: the Jobs in the ``REMOTE_ERROR`` state. Check ``jf job retry -h`` for the full list of options available. +.. warning:: + Rerunning by default also marks the Job so that the folder on the worker where it + was executed will be deleted. The deletion is performed by the Runner before + reuploading the inputs, i.e. before the Job gets to the `UPLOADED` state. + To avoid it use the ``--no-delete`` option. + .. warning:: It is impossible to provide an exhaustive list of potential issues that could lead to a ``REMOTE_ERROR`` state. So except in some well defined cases, the error messages will be @@ -248,6 +254,13 @@ job with:: will solve the issue. +.. warning:: + Rerunning by default also marks the Job so that the folder on the worker where it + was executed will be deleted. The deletion is performed by the Runner before + reuploading the inputs, i.e. before the Job gets to the `UPLOADED` state. If, as effect + of a rerun children Jobs are also rerun, their folders will be marked for deletion as + well. To avoid it use the ``--no-delete`` option. + .. note:: Only ``jf job rerun`` should be applied to ``FAILED`` Jobs. ``jf job retry`` is **not** diff --git a/pyproject.toml b/pyproject.toml index bc5f31f8..99439265 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,6 +155,9 @@ filterwarnings = [ "ignore:.*magmom.*:UserWarning", "ignore::DeprecationWarning", ] +addopts = [ + "--import-mode=importlib", +] [tool.coverage.run] include = ["src/*"] diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index 5fa34af8..57590c4c 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -1,3 +1,4 @@ +import contextlib from datetime import datetime from typing import Annotated, Optional @@ -194,8 +195,17 @@ def delete( raise typer.Exit(0) to_delete = [fi.flow_id for fi in flows_info] - with loading_spinner(processing=False) as progress: - progress.add_task(description="Deleting flows...", total=None) + + # if potentially interactive do not start the spinner. + spinner_cm: contextlib.AbstractContextManager + if delete_files and jc.project.has_interactive_workers: + spinner_cm = contextlib.nullcontext() + out_console.print("Deleting flows...") + else: + spinner_cm = loading_spinner(processing=False) + with spinner_cm as progress: + if progress: + progress.add_task(description="Deleting flows...", total=None) jc.delete_flows( flow_ids=to_delete, diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index 437a7d44..6a9cec18 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -305,6 +305,14 @@ def rerun( ), ] = False, raise_on_error: raise_on_error_opt = False, + no_delete: Annotated[ + bool, + typer.Option( + "--no-delete", + "-nd", + help=("Skip the delete of the files on the worker."), + ), + ] = False, ) -> None: """ Rerun a Job. By default, this is limited to jobs that failed and children did @@ -313,6 +321,8 @@ def rerun( will be cancelled. Most of the limitations can be overridden by the 'force' option. This could lead to inconsistencies in the overall state of the Jobs of the Flow. + All the folders of the Jobs whose state are modified will also be deleted on + the worker. """ if force or break_lock: check_stopped_runner(error=False) @@ -341,6 +351,8 @@ def rerun( break_lock=break_lock, force=force, raise_on_error=raise_on_error, + interactive=True, + delete_files=not no_delete, ) @@ -600,6 +612,7 @@ def delete( verbosity=verbosity, wait=wait, raise_on_error=raise_on_error, + interactive=True, delete_output=delete_output, delete_files=delete_files, ) @@ -1055,7 +1068,7 @@ def output( ), ] = False, ) -> None: - """Detailed information on a specific job.""" + """Fetch the output of a Job from the output Store.""" db_id, job_id = get_job_db_ids(job_db_id, job_index) with loading_spinner(): diff --git a/src/jobflow_remote/cli/utils.py b/src/jobflow_remote/cli/utils.py index 87180561..9ec83fbf 100644 --- a/src/jobflow_remote/cli/utils.py +++ b/src/jobflow_remote/cli/utils.py @@ -2,6 +2,7 @@ from __future__ import annotations +import contextlib import functools import json import logging @@ -370,8 +371,61 @@ def execute_multi_jobs_cmd( custom_query: dict | None = None, verbosity: int = 0, raise_on_error: bool = False, + interactive: bool = True, **kwargs, ) -> None: + """ + Utility function to execute a command on a single job or on a set of jobs. + Checks the query options, determine whether the command is single or multi + and call the corresponding function. + + Parameters + ---------- + single_cmd + A function that takes the job_id and job_index as arguments and performs an + action on a single job. + multi_cmd + A function that takes a set of standard arguments and performs an + action on multiple jobs. + job_db_id + The job id or db_id of a job to be considered. If specified, all the other + query options should be disabled. + job_index + The index of the job in the DB. + job_ids + A list of job ids to be considered. + db_ids + A list of db ids to be considered. + flow_ids + A list of flow ids to be considered. + states + The states of the jobs to be considered. + start_date + The start date of the jobs to be considered. + end_date + The end date of the jobs to be considered. + name + The name of the jobs to be considered. + metadata + The metadata of the jobs to be considered. + days + Set the start_date based on the number of past days. + hours + Set the start_date based on the number of past hours. + workers + Workers associated with the jobs to be considered. + custom_query + A custom query to be used to filter the jobs. + verbosity + The verbosity of the output. + raise_on_error + If True, an error will be raised if the operation fails. + interactive + If True, a spinner will be shown even if the operation is not interactive while + the operation is being performed. + **kwargs : dict + Additional arguments to be passed to the single_cmd and multi_cmd functions. + """ query_values = [ job_ids, db_ids, @@ -386,13 +440,21 @@ def execute_multi_jobs_cmd( workers, custom_query, ] + # if potentially interactive do not start the spinner. + cm = get_config_manager() + spinner_cm: contextlib.AbstractContextManager + if interactive and cm.get_project().has_interactive_workers: + spinner_cm = contextlib.nullcontext() + out_console.print("Processing...") + else: + spinner_cm = loading_spinner() try: if job_db_id is not None: if any(query_values): msg = "If job_db_id is defined all the other query options should be disabled" exit_with_error_msg(msg) db_id, job_id = get_job_db_ids(job_db_id, job_index) - with loading_spinner(): + with spinner_cm: modified_ids = single_cmd( job_id=job_id, job_index=job_index, db_id=db_id, **kwargs ) @@ -448,7 +510,7 @@ def execute_multi_jobs_cmd( if not confirmed: raise typer.Exit(0) # noqa: TRY301 - with loading_spinner(): + with spinner_cm: modified_ids = multi_cmd( job_ids=job_ids_indexes, db_ids=db_ids, diff --git a/src/jobflow_remote/config/base.py b/src/jobflow_remote/config/base.py index 18bfc91f..6b92e49e 100644 --- a/src/jobflow_remote/config/base.py +++ b/src/jobflow_remote/config/base.py @@ -166,6 +166,13 @@ class WorkerBase(BaseModel): None, description="String with commands that will be executed after the execution of the Job", ) + execution_cmd: Optional[str] = Field( + None, + description="String with commands to execute the Job on the worker. By default will be " + "set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to " + "the execution directory and it is mandatory. Change only for specific needs (e.g. the" + "jf command needs to be executed in a container).", + ) timeout_execute: int = Field( 60, description="Timeout for the execution of the commands in the worker " @@ -208,6 +215,16 @@ def check_work_dir(cls, v) -> Path: raise ValueError("`work_dir` must be an absolute path") return v + @field_validator("execution_cmd") + @classmethod + def check_execution_cmd(cls, v) -> Optional[str]: + if v is not None and "{}" not in v: + raise ValueError( + "`execution_cmd` must contain a '{}' part to allow setting " + "the execution folder at runtime" + ) + return v + def get_scheduler_io(self) -> BaseSchedulerIO: """ Get the BaseSchedulerIO from QToolKit depending on scheduler_type. @@ -638,6 +655,16 @@ def check_jobstore(cls, jobstore: dict) -> dict: ) from e return jobstore + @property + def has_interactive_workers(self) -> bool: + """ + True if any of the workers have interactive_login set to True. + """ + for worker in self.workers.values(): + if isinstance(worker, RemoteWorker) and worker.interactive_login: + return True + return False + model_config = ConfigDict(extra="forbid") diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index 3ffa1499..a2ef47a0 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -119,6 +119,7 @@ class RemoteInfo(BaseModel): process_id: Optional[str] = None retry_time_limit: Optional[datetime] = None error: Optional[str] = None + prerun_cleanup: bool = False class JobInfo(BaseModel): diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 0688aa1f..6eecb101 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -68,14 +68,13 @@ pymongo_dump, pymongo_restore, ) +from jobflow_remote.utils.remote import SharedHosts, safe_remove_job_files if TYPE_CHECKING: from collections.abc import Generator, Sequence from maggma.stores import MongoStore - from jobflow_remote.remote.host import BaseHost - logger = logging.getLogger(__name__) @@ -786,6 +785,7 @@ def rerun_jobs( force: bool = False, wait: int | None = None, break_lock: bool = False, + delete_files: bool = True, ) -> list[str]: """ Rerun a list of selected Jobs, i.e. bring their state back to READY. @@ -831,6 +831,8 @@ def rerun_jobs( Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. + delete_files + Delete all the files in the worker folder of the Jobs that are rerun. Returns ------- @@ -854,6 +856,7 @@ def rerun_jobs( force=force, wait=wait, break_lock=break_lock, + delete_files=delete_files, ) def rerun_job( @@ -864,6 +867,7 @@ def rerun_job( force: bool = False, wait: int | None = None, break_lock: bool = False, + delete_files: bool = True, ) -> list[str]: """ Rerun a single Job, i.e. bring its state back to READY. @@ -904,6 +908,9 @@ def rerun_job( Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. + delete_files + Delete all the files in the worker folder of the rerun Job. + Note that the deletion will not be performed directly but only when the job effectively restarts. Returns ------- @@ -922,7 +929,7 @@ def rerun_job( filter=lock_filter, break_lock=break_lock, sort=sort, - projection=["uuid", "index", "db_id", "state"], + projection=["uuid", "index", "db_id", "state", "worker", "run_dir"], sleep=sleep, max_wait=wait, get_locked_doc=True, @@ -939,7 +946,7 @@ def rerun_job( if job_state in RESETTABLE_STATES: # if in one of the resettable states no need to lock the flow or # update children. - doc_update = self._reset_remote(job_doc_dict) + doc_update = self._reset_remote(job_doc_dict, delete_files=delete_files) modified_jobs = [] elif ( job_state not in [JobState.FAILED, JobState.REMOTE_ERROR] and not force @@ -956,6 +963,7 @@ def rerun_job( wait=wait, break_lock=break_lock, force=force, + delete_files=delete_files, ) modified_jobs.append(job_doc_dict["db_id"]) @@ -972,6 +980,7 @@ def _full_rerun( wait: int | None = None, break_lock: bool = False, force: bool = False, + delete_files: bool = True, ) -> tuple[dict, list[str]]: """ Perform the full rerun of Job, in case a Job is FAILED or in one of the @@ -994,6 +1003,8 @@ def _full_rerun( Forcibly break the lock on locked documents. force Bypass the limitation that only Jobs in a certain state can be rerun. + delete_files + Delete all the files in the worker folder of the children Jobs that are modified. Returns ------- @@ -1069,7 +1080,14 @@ def _full_rerun( self.lock_job( filter={"uuid": dep_id, "index": dep_index}, break_lock=break_lock, - projection=["uuid", "index", "db_id", "state"], + projection=[ + "uuid", + "index", + "db_id", + "state", + "worker", + "run_dir", + ], sleep=sleep, max_wait=wait, get_locked_doc=True, @@ -1106,10 +1124,12 @@ def _full_rerun( # Set the new state for all of them. for child_lock in children_locks: child_doc = child_lock.locked_document - if child_doc["state"] != JobState.WAITING.value: - modified_jobs.append(child_doc["db_id"]) child_doc_update = get_reset_job_base_dict() child_doc_update["state"] = JobState.WAITING.value + if child_doc["state"] != JobState.WAITING.value: + modified_jobs.append(child_doc["db_id"]) + if delete_files: + child_doc_update["remote.prerun_cleanup"] = True child_lock.update_on_release = {"$set": child_doc_update} updated_states[child_doc["uuid"]][child_doc["index"]] = ( JobState.WAITING @@ -1126,10 +1146,12 @@ def _full_rerun( job_doc_update = get_reset_job_base_dict() job_doc_update["state"] = JobState.READY.value + if delete_files: + job_doc_update["remote.prerun_cleanup"] = True return job_doc_update, modified_jobs - def _reset_remote(self, doc: dict) -> dict: + def _reset_remote(self, doc: dict, delete_files: bool = True) -> dict: """ Simple reset of a Job in a running state or REMOTE_ERROR. Does not require additional locking on the Flow or other Jobs. @@ -1139,6 +1161,8 @@ def _reset_remote(self, doc: dict) -> dict: doc The dict of the JobDoc associated to the Job to rerun. Just the "uuid", "index", "state" values are required. + delete_files + Delete all the files in the worker folder of the rerun Job. Returns ------- @@ -1157,6 +1181,8 @@ def _reset_remote(self, doc: dict) -> dict: job_doc_update = get_reset_job_base_dict() job_doc_update["state"] = JobState.CHECKED_OUT.value + if delete_files: + job_doc_update["remote.prerun_cleanup"] = True return job_doc_update @@ -2321,12 +2347,14 @@ def delete_flows( f"limit ({max_limit}). Increase the limit to delete the Flows." ) deleted = 0 - for fid in flow_ids: - # TODO should it catch errors? - if self.delete_flow( - fid, delete_output=delete_output, delete_files=delete_files - ): - deleted += 1 + # Open the SharedHosts so that hosts will be shared for all the Flows + with SharedHosts(self.project): + for fid in flow_ids: + # TODO should it catch errors? + if self.delete_flow( + fid, delete_output=delete_output, delete_files=delete_files + ): + deleted += 1 return deleted @@ -2435,7 +2463,9 @@ def unlock_jobs( ) return result.modified_count - def _safe_delete_files(self, jobs_info: list[JobInfo]) -> list[JobInfo]: + def _safe_delete_files( + self, jobs_info: Sequence[JobInfo | dict] + ) -> list[JobInfo | dict]: """ Delete the files associated to the selected Jobs. @@ -2452,37 +2482,21 @@ def _safe_delete_files(self, jobs_info: list[JobInfo]) -> list[JobInfo]: list The list of JobInfo whose files have been actually deleted. """ - hosts: dict[str, BaseHost] = {} deleted = [] - for job_info in jobs_info: - if job_info.run_dir: - if job_info.worker in hosts: - host = hosts[job_info.worker] + with SharedHosts(self.project) as shared_hosts: + for job_info in jobs_info: + if isinstance(job_info, JobInfo): + run_dir = job_info.run_dir + worker = job_info.worker else: - host = self.project.workers[job_info.worker].get_host() - hosts[job_info.worker] = host - host.connect() - remote_files = host.listdir(job_info.run_dir) - # safety measure to avoid mistakenly deleting other folders - # maybe too much? - if any( - fn in remote_files - for fn in ("jfremote_in.json", "jfremote_in.json.gz") - ): - if host.rmtree(path=job_info.run_dir, raise_on_error=False): + run_dir = job_info["run_dir"] + worker = job_info["worker"] + if run_dir: + host = shared_hosts.get_host(worker) + if safe_remove_job_files( + host=host, run_dir=run_dir, raise_on_error=False + ): deleted.append(job_info) - else: - logger.warning( - f"Did not delete folder {job_info.run_dir} " - f"since it may not contain a jobflow-remote execution", - ) - - for host in hosts.values(): - try: - host.close() - except Exception: - pass - return deleted def unlock_flows( @@ -3868,6 +3882,15 @@ def lock_job_for_update( no_retry = True except RemoteError as e: error = f"Remote error: {e.msg}" + cause = e.__cause__ + if cause: + # this is required for support of python 3.9. In 3.10 the API + # changed and format_exception(e) could be used instead. + # Do that if/when support for 3.9 is dropped. + trace = traceback.format_exception( + type(cause), cause, cause.__traceback__ + ) + error += "\ncaused by:\n" + "".join(trace) no_retry = e.no_retry except Exception: error = traceback.format_exc() @@ -4048,22 +4071,16 @@ def _cancel_queue_process(self, job_doc: dict) -> None: queue_process_id = job_doc["remote"]["process_id"] if not queue_process_id: raise ValueError("The process id is not defined in the job document") - worker = self.project.workers[job_doc["worker"]] - host = worker.get_host() - try: - host.connect() + with SharedHosts(self.project) as shared_hosts: + worker = self.project.workers[job_doc["worker"]] + host = shared_hosts.get_host(job_doc["worker"]) + queue_manager = QueueManager(worker.get_scheduler_io(), host) cancel_result = queue_manager.cancel(queue_process_id) if cancel_result.status != CancelStatus.SUCCESSFUL: raise RuntimeError( - f"Cancelling queue process {queue_process_id} failed. stdout: {cancel_result.stdout}. stderr: {cancel_result.stderr}" - ) - finally: - try: - host.close() - except Exception: - logger.warning( - f"The connection to host {host} could not be closed.", exc_info=True + f"Cancelling queue process {queue_process_id} failed. " + f"stdout: {cancel_result.stdout}. stderr: {cancel_result.stderr}" ) def get_batch_processes( @@ -4347,25 +4364,27 @@ def delete_jobs( list List of db_ids of the deleted Jobs. """ - return self._many_jobs_action( - method=self.delete_job, - action_description="deleting", - job_ids=job_ids, - db_ids=db_ids, - flow_ids=flow_ids, - states=states, - start_date=start_date, - end_date=end_date, - name=name, - metadata=metadata, - workers=workers, - custom_query=custom_query, - raise_on_error=raise_on_error, - wait=wait, - delete_output=delete_output, - delete_files=delete_files, - max_limit=max_limit, - ) + # Open the SharedHosts so that hosts will be shared for all the Jobs + with SharedHosts(self.project): + return self._many_jobs_action( + method=self.delete_job, + action_description="deleting", + job_ids=job_ids, + db_ids=db_ids, + flow_ids=flow_ids, + states=states, + start_date=start_date, + end_date=end_date, + name=name, + metadata=metadata, + workers=workers, + custom_query=custom_query, + raise_on_error=raise_on_error, + wait=wait, + delete_output=delete_output, + delete_files=delete_files, + max_limit=max_limit, + ) def backup_dump( self, diff --git a/src/jobflow_remote/jobs/runner.py b/src/jobflow_remote/jobs/runner.py index 13f9759b..a19e45c7 100644 --- a/src/jobflow_remote/jobs/runner.py +++ b/src/jobflow_remote/jobs/runner.py @@ -43,6 +43,7 @@ from jobflow_remote.remote.queue import ERR_FNAME, OUT_FNAME, QueueManager, set_name_out from jobflow_remote.utils.data import suuid from jobflow_remote.utils.log import initialize_runner_logger +from jobflow_remote.utils.remote import UnsafeDeletionError, safe_remove_job_files from jobflow_remote.utils.schedule import SafeScheduler if TYPE_CHECKING: @@ -574,6 +575,31 @@ def upload(self, lock: MongoLock) -> None: worker = self.get_worker(doc["worker"]) host = self.get_host(doc["worker"]) + + # if prerun_cleanup is specified (job was likely rerun) delete the folder before + # reuploading the data + run_dir = doc["run_dir"] + if doc.get("remote", {}).get("prerun_cleanup", False): + try: + deleted = safe_remove_job_files( + host=host, run_dir=run_dir, raise_on_error=True + ) + # Log as debug. If deletion failed because of an error it will raise. + not_string = "" if deleted else "not " + logger.debug( + f"Folder for job {db_id} ({run_dir}) was {not_string}deleted" + ) + except UnsafeDeletionError as e: + raise RemoteError( + f"Error while performing cleanup of the run_dir folder for job {db_id}: {run_dir}", + no_retry=True, + ) from e + except Exception as e: + raise RemoteError( + f"Error while performing cleanup of the run_dir folder for job {db_id}: {run_dir}", + no_retry=False, + ) from e + store = self.jobstore # TODO would it be better/feasible to keep a pool of the required # Stores already connected, to avoid opening and closing them? @@ -610,7 +636,11 @@ def upload(self, lock: MongoLock) -> None: host.put(serialized_input, str(path_file)) set_output = { - "$set": {"run_dir": remote_path, "state": JobState.UPLOADED.value} + "$set": { + "run_dir": remote_path, + "state": JobState.UPLOADED.value, + "remote.prerun_cleanup": False, + } } lock.update_on_release = set_output @@ -637,7 +667,8 @@ def submit(self, lock: MongoLock) -> None: remote_path = Path(doc["run_dir"]) - script_commands = [f"jf -fe execution run {remote_path}"] + execution_cmd = worker.execution_cmd or "jf -fe execution run {}" + script_commands = [execution_cmd.format(remote_path)] queue_manager = self.get_queue_manager(worker_name) qout_fpath = remote_path / OUT_FNAME diff --git a/src/jobflow_remote/utils/remote.py b/src/jobflow_remote/utils/remote.py new file mode 100644 index 00000000..913f4ed8 --- /dev/null +++ b/src/jobflow_remote/utils/remote.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar + +from jobflow_remote import ConfigManager + +if TYPE_CHECKING: + from pathlib import Path + + from jobflow_remote.config.base import Project + from jobflow_remote.remote.host.base import BaseHost + +logger = logging.getLogger(__name__) + + +class SharedHosts: + """ + A singleton context manager to allow sharing the same host objects. + + Hosts are stored internally, associated to the worker name + Being a singleton, opening the context manager multiple times allows + to share the hosts across different sections of the code, if needed. + Hosts connections are all closed only when leaving the last context + manager. + + Examples + -------- + + >>> with SharedHosts(project) as shared_hosts: + ... host = shared_hosts.get_host("worker_name") + ... # Use host as required + """ + + _instance: SharedHosts = None + _ref_count: int = 0 + _hosts: ClassVar[dict[str, BaseHost]] = {} + _project: Project | None = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, project: Project | None = None): + """ + Parameters + ---------- + project + The project configuration. + """ + if self._project is None: + if project is None: + config_manager: ConfigManager = ConfigManager() + project = config_manager.get_project(None) + self._project = project + + def get_host(self, worker: str) -> BaseHost: + """ + Return the shared host, if already defined, otherwise retrieve + the host from the project and connect it. + + Parameters + ---------- + worker + The name of a worker defined in the project + Returns + ------- + BaseHost + The shared host. + """ + if worker not in self._project.workers: + raise ValueError(f"Worker {worker} not defined in {self._project.name}") + if worker in self._hosts: + return self._hosts[worker] + + host = self._project.workers[worker].get_host() + host.connect() + self._hosts[worker] = host + return host + + def close_hosts(self) -> None: + """Close the connection to all the connected hosts""" + for worker in list(self._hosts): + try: + self._hosts[worker].close() + except Exception: + logger.exception( + f"Error while closing the connection to the {worker} worker" + ) + finally: + self._hosts.pop(worker) + + def __enter__(self): + # Increment reference count + self.__class__._ref_count += 1 + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Decrement reference count + self.__class__._ref_count -= 1 + + # Cleanup only when the last context exits + if self.__class__._ref_count == 0: + self.close_hosts() + + +class UnsafeDeletionError(Exception): + """ + Error to signal that Job files could not be deleted as the safety check + did not pass. + """ + + +def safe_remove_job_files( + host: BaseHost, run_dir: str | Path | None, raise_on_error: bool = False +) -> bool: + if not run_dir: + return False + + remote_files = host.listdir(run_dir) + # safety measure to avoid mistakenly deleting other folders + if not remote_files: + return False + if any(fn in remote_files for fn in ("jfremote_in.json", "jfremote_in.json.gz")): + return host.rmtree(path=run_dir, raise_on_error=raise_on_error) + + if raise_on_error: + raise UnsafeDeletionError( + f"Could not delete folder {run_dir} " + "since it may not contain a jobflow-remote execution. Some files are present, " + "but jfremote_in.json is missing", + ) + + logger.warning( + f"Did not delete folder {run_dir} " + "since it may not contain a jobflow-remote executionSome files are present, " + "but jfremote_in.json is missing", + ) + return False diff --git a/tests/db/cli/test_job.py b/tests/db/cli/test_job.py index 6fa84ab7..c891a902 100644 --- a/tests/db/cli/test_job.py +++ b/tests/db/cli/test_job.py @@ -1,3 +1,6 @@ +import os + + def test_jobs_list(job_controller, two_flows_four_jobs) -> None: from jobflow_remote.jobs.state import JobState from jobflow_remote.testing.cli import run_check_cli @@ -96,19 +99,56 @@ def test_set_state(job_controller, two_flows_four_jobs) -> None: def test_rerun(job_controller, two_flows_four_jobs) -> None: + from jobflow_remote.jobs.runner import Runner from jobflow_remote.jobs.state import JobState from jobflow_remote.testing.cli import run_check_cli - assert job_controller.set_job_state(JobState.COMPLETED, db_id="1") + runner = Runner() + runner.run_one_job(db_id="1") + + job_info = job_controller.get_job_info(db_id="1") + assert job_info.state == JobState.COMPLETED + assert len(os.listdir(job_info.run_dir)) > 0 + # rerun without deleting files run_check_cli( - ["job", "rerun", "-did", "1", "-f"], - required_out="Operation completed: 1 jobs modified", + ["job", "rerun", "-did", "1", "-f", "-nd"], + required_out="Operation completed: 2 jobs modified", ) + + assert len(os.listdir(job_info.run_dir)) > 0 assert job_controller.get_job_info(db_id="1").state == JobState.READY + # fails because already READY run_check_cli(["job", "rerun", "-did", "1"], required_out="Error while rerunning") + # set the job back to completed to try deleting the files as well + assert job_controller.set_job_state(JobState.COMPLETED, db_id="1") + + # note: here only 1 job is modified, since the child was not set to READY + run_check_cli( + ["job", "rerun", "-did", "1", "-f"], + required_out="Operation completed: 1 jobs modified", + ) + + assert os.path.isdir(job_info.run_dir) + ji = job_controller.get_job_info(db_id="1") + assert ji.state == JobState.READY + assert ji.remote.prerun_cleanup + + # set the job back to completed and set remote.cleanup=False. rerun with --no-delete + assert job_controller.set_job_state(JobState.COMPLETED, db_id="1") + assert job_controller.set_job_doc_properties( + {"remote.prerun_cleanup": False}, db_id="1" + ) + run_check_cli( + ["job", "rerun", "-did", "1", "-f", "-nd"], + ) + + ji = job_controller.get_job_info(db_id="1") + assert ji.state == JobState.READY + assert not ji.remote.prerun_cleanup + def test_retry(job_controller, two_flows_four_jobs) -> None: from jobflow_remote.jobs.state import JobState diff --git a/tests/db/jobs/test_jobcontroller.py b/tests/db/jobs/test_jobcontroller.py index 6397f127..d707bea8 100644 --- a/tests/db/jobs/test_jobcontroller.py +++ b/tests/db/jobs/test_jobcontroller.py @@ -120,6 +120,8 @@ def test_queries(job_controller, runner) -> None: def test_rerun_completed(job_controller, runner) -> None: + from pathlib import Path + from jobflow import Flow from jobflow_remote import submit_flow @@ -138,6 +140,7 @@ def test_rerun_completed(job_controller, runner) -> None: j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) j2_info = job_controller.get_job_info(job_id=j2.uuid, job_index=j2.index) j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index) + j1_path = Path(j1_info.run_dir) assert j1_info.state == JobState.COMPLETED assert j2_info.state == JobState.READY assert j3_info.state == JobState.WAITING @@ -146,6 +149,8 @@ def test_rerun_completed(job_controller, runner) -> None: with pytest.raises(ValueError, match="The Job is in the READY state"): job_controller.rerun_job(job_id=j2.uuid, job_index=j2.index) + assert len(list(j1_path.iterdir())) > 0 + # since the first job is completed, the force option is required with pytest.raises(ValueError, match="Job in state COMPLETED cannot be rerun"): job_controller.rerun_job(job_id=j1.uuid, job_index=j1.index) @@ -156,6 +161,15 @@ def test_rerun_completed(job_controller, runner) -> None: j1_info.db_id, j2_info.db_id, } + + # the folder should still exist + # create a file there so that it will be checked after the execution. It should + # be deleted by the runner. + assert j1_path.exists() + check_file_j1 = j1_path / "test_test.json" + check_file_j1.touch(exist_ok=True) + assert check_file_j1.exists() + assert ( job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index).state == JobState.READY @@ -172,11 +186,23 @@ def test_rerun_completed(job_controller, runner) -> None: with pytest.raises(ValueError, match="Job in state COMPLETED cannot be rerun"): job_controller.rerun_job(job_id=j3.uuid, job_index=j3.index) - # The last job can be rerun, but still needs the "force" option + assert not check_file_j1.exists() + + j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index) + j3_path = Path(j3_info.run_dir) + + assert len(list(j3_path.iterdir())) > 0 + + # The last job can be rerun, but still needs the "force" option. + # Don't delete files assert set( - job_controller.rerun_job(job_id=j3.uuid, job_index=j3.index, force=True) + job_controller.rerun_job( + job_id=j3.uuid, job_index=j3.index, force=True, delete_files=False + ) ) == {j3_info.db_id} + assert len(list(j3_path.iterdir())) > 0 + # The remaining tests are to verify that everything is correct with locked jobs # as well with job_controller.lock_job(filter={"uuid": j2.uuid}): @@ -231,6 +257,8 @@ def test_rerun_completed(job_controller, runner) -> None: def test_rerun_failed(job_controller, runner) -> None: + from pathlib import Path + from jobflow import Flow, OnMissing from jobflow_remote import submit_flow @@ -282,6 +310,14 @@ def test_rerun_failed(job_controller, runner) -> None: ): job_controller.rerun_job(job_id=j1.uuid, job_index=j1.index) + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index) + j1_path = Path(j1_info.run_dir) + j3_path = Path(j3_info.run_dir) + + assert len(list(j1_path.iterdir())) > 0 + assert len(list(j3_path.iterdir())) > 0 + # can be rerun with the "force" option assert set( job_controller.rerun_job(job_id=j1.uuid, job_index=j1.index, force=True) @@ -289,11 +325,39 @@ def test_rerun_failed(job_controller, runner) -> None: assert job_controller.count_jobs(states=JobState.READY) == 1 + # check that also the cleanup of the child was set + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index) + assert j1_path.exists() + assert j3_path.exists() + assert j1_info.remote.prerun_cleanup + assert j3_info.remote.prerun_cleanup + + # create a file in the folders, to be sure that in the meanwhile the folders have been removed + # during the upload phase + check_file_j1 = j1_path / "test_test.json" + check_file_j1.touch(exist_ok=True) + assert check_file_j1.exists() + check_file_j3 = j3_path / "test_test.json" + check_file_j3.touch(exist_ok=True) + assert check_file_j3.exists() + # run again the jobs with j4. This generates a replace + # check that the fake additional files are not present, meaning the + # folder was deleted by the cleanup assert runner.run_one_job(max_seconds=10, job_id=[j1.uuid, j1.index]) + assert not check_file_j1.exists() assert runner.run_one_job(max_seconds=10, job_id=[j3.uuid, j3.index]) + assert not check_file_j3.exists() assert runner.run_one_job(max_seconds=10, job_id=[j4.uuid, j4.index]) + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index) + + # also check that the cleanup has be set to False + assert not j1_info.remote.prerun_cleanup + assert not j3_info.remote.prerun_cleanup + assert job_controller.count_jobs(job_ids=(j4.uuid, 2)) == 1 # At this point it is impossible to rerun, even with the "force" option @@ -319,6 +383,8 @@ def test_rerun_failed(job_controller, runner) -> None: def test_rerun_remote_error(job_controller, monkeypatch, runner) -> None: + from pathlib import Path + from jobflow import Flow from jobflow_remote import submit_flow @@ -333,11 +399,11 @@ def test_rerun_remote_error(job_controller, monkeypatch, runner) -> None: submit_flow(flow, worker="test_local_worker") # patch the upload method of the runner to trigger a remote error - def upload_error(self, lock) -> NoReturn: + def submit_error(self, lock) -> NoReturn: raise RuntimeError("FAKE ERROR") with monkeypatch.context() as m: - m.setattr(Runner, "upload", upload_error) + m.setattr(Runner, "submit", submit_error) # patch this to 1 to avoid retrying multiple times m.setattr(runner.runner_options, "max_step_attempts", 1) with pytest.warns(match="FAKE ERROR"): @@ -349,6 +415,8 @@ def upload_error(self, lock) -> NoReturn: job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.RUNNING ) + assert len(list(Path(j1_info.run_dir).iterdir())) > 0 + # can rerun without "force" assert job_controller.rerun_job(job_id=j1.uuid, job_index=j1.index, force=True) == [ j1_info.db_id @@ -359,6 +427,12 @@ def upload_error(self, lock) -> NoReturn: ) assert job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.READY + # check that files are marked for deletion also in case of remote_error. + # do not run the job explicitly, as it is already checked in other tests + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + assert Path(j1_info.run_dir).exists() + assert j1_info.remote.prerun_cleanup + def test_retry(job_controller, monkeypatch, runner) -> None: from jobflow import Flow diff --git a/tests/db/jobs/test_runner.py b/tests/db/jobs/test_runner.py new file mode 100644 index 00000000..aa02d5c9 --- /dev/null +++ b/tests/db/jobs/test_runner.py @@ -0,0 +1,68 @@ +def test_upload_cleanup_error(job_controller, runner, monkeypatch): + from pathlib import Path + + from jobflow import Flow + + from jobflow_remote import submit_flow + from jobflow_remote.jobs.state import JobState + from jobflow_remote.remote.host.local import LocalHost + from jobflow_remote.testing import add + + j1 = add(1, 2) + flow = Flow([j1]) + + submit_flow(flow, worker="test_local_worker") + + assert runner.run_one_job(max_seconds=10, job_id=[j1.uuid, j1.index]) + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + j1_path = Path(j1_info.run_dir) + assert j1_info.state == JobState.COMPLETED + + assert set(job_controller.rerun_jobs(job_ids=(j1.uuid, j1.index), force=True)) == { + j1_info.db_id + } + assert j1_path.exists() + + check_file_j1 = j1_path / "test_test.json" + check_file_j1.touch(exist_ok=True) + assert check_file_j1.exists() + + # remove the jfremote_in.json and check that the remove fails + jfremote_in = j1_path / "jfremote_in.json" + jfremote_in.unlink(missing_ok=True) + + assert runner.run_one_job(max_seconds=10, job_id=[j1.uuid, j1.index]) + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + assert j1_info.state == JobState.REMOTE_ERROR + + # since it fails immediately, step attempts is left to zero + assert j1_info.remote.step_attempts == 0 + assert ( + "Error while performing cleanup of the run_dir folder for job 1" + in j1_info.remote.error + ) + assert ( + f"Could not delete folder {j1_info.run_dir} since it may not contain a jobflow-remote execution." + in j1_info.remote.error + ) + + # rerun the job create again the file and mock to have it failed in a different way + job_controller.rerun_job(db_id="1") + jfremote_in.touch(exist_ok=True) + + def raise_rmtree(*args, **kwargs): + raise RuntimeError("FAKE ERROR") + + with monkeypatch.context() as m: + m.setattr(LocalHost, "rmtree", raise_rmtree) + m.setattr(runner.runner_options, "max_step_attempts", 2) + assert runner.run_one_job(max_seconds=20, job_id=[j1.uuid, j1.index]) + + j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index) + assert j1_info.state == JobState.REMOTE_ERROR + assert j1_info.remote.step_attempts == 2 + assert ( + "Error while performing cleanup of the run_dir folder for job 1" + in j1_info.remote.error + ) + assert "FAKE ERROR" in j1_info.remote.error diff --git a/tests/db/utils/test_data.py b/tests/db/utils/test_data.py index 19443a85..f174be63 100644 --- a/tests/db/utils/test_data.py +++ b/tests/db/utils/test_data.py @@ -55,8 +55,8 @@ def test_get_past_time_rounded(): def test_get_utc_offset(): from jobflow_remote.utils.data import get_utc_offset - assert get_utc_offset("America/Los_Angeles") == "-07:00" - assert get_utc_offset("Europe/Paris") == "+02:00" + assert get_utc_offset("America/Buenos_Aires") == "-03:00" + assert get_utc_offset("Europe/Moscow") == "+03:00" assert get_utc_offset("UTC") == "+00:00" assert get_utc_offset("Asia/Shanghai") == "+08:00" with pytest.raises(ValueError, match="Could not determine the timezone for XXX"): diff --git a/tests/db/utils/test_remote.py b/tests/db/utils/test_remote.py new file mode 100644 index 00000000..e4f45b06 --- /dev/null +++ b/tests/db/utils/test_remote.py @@ -0,0 +1,21 @@ +def test_share_hosts(mocker): + import jobflow_remote + from jobflow_remote.utils.remote import SharedHosts + + mocker.patch("jobflow_remote.remote.host.local.LocalHost.close") + + with SharedHosts() as shared_hosts1: + host1 = shared_hosts1.get_host("test_local_worker") + assert len(shared_hosts1._hosts) == 1 + with SharedHosts() as shared_hosts2: + assert len(shared_hosts2._hosts) == 1 + host2 = shared_hosts2.get_host("test_local_worker") + # hosts should be the same instance + assert shared_hosts1 is shared_hosts2 + assert host1 is host2 + + # after leaving the first context manager the close should not be called + jobflow_remote.remote.host.local.LocalHost.close.assert_not_called() + + # close() called only once after leaving the outermost context manager + jobflow_remote.remote.host.local.LocalHost.close.assert_called_once()