diff --git a/pyproject.toml b/pyproject.toml index 244c15c6..baf3abd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ ] requires-python = ">=3.9" dependencies =[ - "jobflow[strict]", + "jobflow", "pydantic>=2.0.1", "fabric", "tomlkit", diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 4501038e..48e3535d 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -143,7 +143,11 @@ verbosity_opt = Annotated[ int, typer.Option( - "--verbosity", "-v", help="Set the verbosity of the output", count=True + "--verbosity", + "-v", + help="Set the verbosity of the output. Multiple -v options " + "increase the verbosity. (e.g. -vvv)", + count=True, ), ] diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index afcec745..ffcde5bb 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -80,6 +80,8 @@ class JobInfo(BaseModel): worker: str name: str state: JobState + created_on: datetime + updated_on: datetime remote: RemoteInfo = RemoteInfo() parents: Optional[list[str]] = None previous_state: Optional[JobState] = None @@ -89,8 +91,6 @@ class JobInfo(BaseModel): run_dir: Optional[str] = None start_time: Optional[datetime] = None end_time: Optional[datetime] = None - created_on: datetime = datetime.utcnow() - updated_on: datetime = datetime.utcnow() priority: int = 0 metadata: Optional[dict] = None @@ -157,8 +157,8 @@ class JobDoc(BaseModel): run_dir: Optional[str] = None start_time: Optional[datetime] = None end_time: Optional[datetime] = None - created_on: datetime = datetime.utcnow() - updated_on: datetime = datetime.utcnow() + created_on: datetime = Field(default_factory=datetime.utcnow) + updated_on: datetime = Field(default_factory=datetime.utcnow) priority: int = 0 # store: Optional[JobStore] = None exec_config: Optional[Union[ExecutionConfig, str]] = None @@ -189,8 +189,8 @@ class FlowDoc(BaseModel): name: str lock_id: Optional[str] = None lock_time: Optional[datetime] = None - created_on: datetime = datetime.utcnow() - updated_on: datetime = datetime.utcnow() + created_on: datetime = Field(default_factory=datetime.utcnow) + updated_on: datetime = Field(default_factory=datetime.utcnow) metadata: dict = Field(default_factory=dict) # parents need to include both the uuid and the index. # When dynamically replacing a Job with a Flow some new Jobs will @@ -266,6 +266,7 @@ class FlowInfo(BaseModel): flow_id: str state: FlowState name: str + created_on: datetime updated_on: datetime workers: list[str] job_states: list[JobState] @@ -275,6 +276,7 @@ class FlowInfo(BaseModel): @classmethod def from_query_dict(cls, d): + created_on = d["created_on"] updated_on = d["updated_on"] flow_id = d["uuid"] jobs_data = d.get("jobs_list") or [] @@ -314,6 +316,7 @@ def from_query_dict(cls, d): flow_id=flow_id, state=state, name=d["name"], + created_on=created_on, updated_on=updated_on, workers=workers, job_states=job_states, diff --git a/src/jobflow_remote/jobs/runner.py b/src/jobflow_remote/jobs/runner.py index 7aab0a39..95aad8c2 100644 --- a/src/jobflow_remote/jobs/runner.py +++ b/src/jobflow_remote/jobs/runner.py @@ -1,3 +1,5 @@ +"""The Runner orchestrating the Jobs execution""" + from __future__ import annotations import json @@ -7,7 +9,7 @@ import time import traceback import uuid -from collections import defaultdict, namedtuple +from collections import defaultdict from datetime import datetime from pathlib import Path @@ -43,19 +45,42 @@ logger = logging.getLogger(__name__) -JobFWData = namedtuple( - "JobFWData", - ["fw", "task", "job", "store", "worker_name", "worker", "host", "original_store"], -) +class Runner: + """ + Object orchestrating the execution of all the Jobs. + Advances the status of the Jobs, handles the communication with the workers + and updates the queue and output databases. + + The main entry point is the `run` method. It is mainly supposed to be executed + if a daemon, but can also be run directly for testing purposes. + It allows to run all the steps required to advance the Job's states or even + a subset of them, to parallelize the different tasks. + + The runner instantiates a pool of workers and hosts given in the project + definition. A single connection will be opened if multiple workers share + the same host. + """ -class Runner: def __init__( self, project_name: str | None = None, log_level: LogLevel | None = None, runner_id: str | None = None, ): + """ + Parameters + ---------- + project_name + Name of the project. Used to retrieve all the configurations required + to execute the runner. + log_level + Logging level of the Runner. + runner_id + A unique identifier for the Runner process. Used to identify the + runner process in logging and in the DB locks. + If None a uuid will be generated. + """ self.stop_signal = False self.runner_id: str = runner_id or str(uuid.uuid4()) self.config_manager: ConfigManager = ConfigManager() @@ -100,26 +125,68 @@ def __init__( @property def runner_options(self) -> RunnerOptions: + """ + The Runner options defined in the project. + """ return self.project.runner def handle_signal(self, signum, frame): + """ + Handle the SIGTERM signal in the Runner. + Sets a variable that will stop the Runner loop. + """ logger.info(f"Received signal: {signum}") self.stop_signal = True def get_worker(self, worker_name: str) -> WorkerBase: + """ + Get the worker from the pool of workers instantiated by the Runner. + + Parameters + ---------- + worker_name + The name of the worker. + + Returns + ------- + An instance of the corresponding worker. + """ if worker_name not in self.workers: raise ConfigError( f"No worker {worker_name} is defined in project {self.project_name}" ) return self.workers[worker_name] - def get_host(self, worker_name: str): + def get_host(self, worker_name: str) -> BaseHost: + """ + Get the host associated to a worker from the pool of hosts instantiated + by the Runner. + + Parameters + ---------- + worker_name + The name of the worker. + Returns + ------- + An instance of the Host associated to the worker. + """ host = self.hosts[worker_name] if not host.is_connected: host.connect() return host def get_queue_manager(self, worker_name: str) -> QueueManager: + """ + Get an instance of the queue manager associated to a worker, based on its host. + + Parameters + ---------- + worker_name + The name of the worker. + Returns + ------- + An instance of the QueueManager associated to the worker. + """ if worker_name not in self.queue_managers: worker = self.get_worker(worker_name) self.queue_managers[worker_name] = QueueManager( diff --git a/src/jobflow_remote/utils/log.py b/src/jobflow_remote/utils/log.py index 2c5703fa..d5fbf42a 100644 --- a/src/jobflow_remote/utils/log.py +++ b/src/jobflow_remote/utils/log.py @@ -27,7 +27,6 @@ def initialize_runner_logger( # runner is started. makedirs_p(log_folder) - print("!!!", runner_id) if runner_id: msg_format = f"%(asctime)s [%(levelname)s] ID {runner_id} %(name)s: %(message)s" else: