Skip to content

Commit

Permalink
fix date inizialization
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Dec 12, 2023
1 parent d6ec17c commit 1a664cc
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
]
requires-python = ">=3.9"
dependencies =[
"jobflow[strict]",
"jobflow",
"pydantic>=2.0.1",
"fabric",
"tomlkit",
Expand Down
6 changes: 5 additions & 1 deletion src/jobflow_remote/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@
verbosity_opt = Annotated[
int,
typer.Option(
"--verbosity", "-v", help="Set the verbosity of the output", count=True
"--verbosity",
"-v",
help="Set the verbosity of the output. Multiple -v options "
"increase the verbosity. (e.g. -vvv)",
count=True,
),
]

Expand Down
15 changes: 9 additions & 6 deletions src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class JobInfo(BaseModel):
worker: str
name: str
state: JobState
created_on: datetime
updated_on: datetime
remote: RemoteInfo = RemoteInfo()
parents: Optional[list[str]] = None
previous_state: Optional[JobState] = None
Expand All @@ -89,8 +91,6 @@ class JobInfo(BaseModel):
run_dir: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
created_on: datetime = datetime.utcnow()
updated_on: datetime = datetime.utcnow()
priority: int = 0
metadata: Optional[dict] = None

Expand Down Expand Up @@ -157,8 +157,8 @@ class JobDoc(BaseModel):
run_dir: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
created_on: datetime = datetime.utcnow()
updated_on: datetime = datetime.utcnow()
created_on: datetime = Field(default_factory=datetime.utcnow)
updated_on: datetime = Field(default_factory=datetime.utcnow)
priority: int = 0
# store: Optional[JobStore] = None
exec_config: Optional[Union[ExecutionConfig, str]] = None
Expand Down Expand Up @@ -189,8 +189,8 @@ class FlowDoc(BaseModel):
name: str
lock_id: Optional[str] = None
lock_time: Optional[datetime] = None
created_on: datetime = datetime.utcnow()
updated_on: datetime = datetime.utcnow()
created_on: datetime = Field(default_factory=datetime.utcnow)
updated_on: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)
# parents need to include both the uuid and the index.
# When dynamically replacing a Job with a Flow some new Jobs will
Expand Down Expand Up @@ -266,6 +266,7 @@ class FlowInfo(BaseModel):
flow_id: str
state: FlowState
name: str
created_on: datetime
updated_on: datetime
workers: list[str]
job_states: list[JobState]
Expand All @@ -275,6 +276,7 @@ class FlowInfo(BaseModel):

@classmethod
def from_query_dict(cls, d):
created_on = d["created_on"]
updated_on = d["updated_on"]
flow_id = d["uuid"]
jobs_data = d.get("jobs_list") or []
Expand Down Expand Up @@ -314,6 +316,7 @@ def from_query_dict(cls, d):
flow_id=flow_id,
state=state,
name=d["name"],
created_on=created_on,
updated_on=updated_on,
workers=workers,
job_states=job_states,
Expand Down
81 changes: 74 additions & 7 deletions src/jobflow_remote/jobs/runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""The Runner orchestrating the Jobs execution"""

from __future__ import annotations

import json
Expand All @@ -7,7 +9,7 @@
import time
import traceback
import uuid
from collections import defaultdict, namedtuple
from collections import defaultdict
from datetime import datetime
from pathlib import Path

Expand Down Expand Up @@ -43,19 +45,42 @@
logger = logging.getLogger(__name__)


JobFWData = namedtuple(
"JobFWData",
["fw", "task", "job", "store", "worker_name", "worker", "host", "original_store"],
)
class Runner:
"""
Object orchestrating the execution of all the Jobs.
Advances the status of the Jobs, handles the communication with the workers
and updates the queue and output databases.
The main entry point is the `run` method. It is mainly supposed to be executed
if a daemon, but can also be run directly for testing purposes.
It allows to run all the steps required to advance the Job's states or even
a subset of them, to parallelize the different tasks.
The runner instantiates a pool of workers and hosts given in the project
definition. A single connection will be opened if multiple workers share
the same host.
"""

class Runner:
def __init__(
self,
project_name: str | None = None,
log_level: LogLevel | None = None,
runner_id: str | None = None,
):
"""
Parameters
----------
project_name
Name of the project. Used to retrieve all the configurations required
to execute the runner.
log_level
Logging level of the Runner.
runner_id
A unique identifier for the Runner process. Used to identify the
runner process in logging and in the DB locks.
If None a uuid will be generated.
"""
self.stop_signal = False
self.runner_id: str = runner_id or str(uuid.uuid4())
self.config_manager: ConfigManager = ConfigManager()
Expand Down Expand Up @@ -100,26 +125,68 @@ def __init__(

@property
def runner_options(self) -> RunnerOptions:
"""
The Runner options defined in the project.
"""
return self.project.runner

def handle_signal(self, signum, frame):
"""
Handle the SIGTERM signal in the Runner.
Sets a variable that will stop the Runner loop.
"""
logger.info(f"Received signal: {signum}")
self.stop_signal = True

def get_worker(self, worker_name: str) -> WorkerBase:
"""
Get the worker from the pool of workers instantiated by the Runner.
Parameters
----------
worker_name
The name of the worker.
Returns
-------
An instance of the corresponding worker.
"""
if worker_name not in self.workers:
raise ConfigError(
f"No worker {worker_name} is defined in project {self.project_name}"
)
return self.workers[worker_name]

def get_host(self, worker_name: str):
def get_host(self, worker_name: str) -> BaseHost:
"""
Get the host associated to a worker from the pool of hosts instantiated
by the Runner.
Parameters
----------
worker_name
The name of the worker.
Returns
-------
An instance of the Host associated to the worker.
"""
host = self.hosts[worker_name]
if not host.is_connected:
host.connect()
return host

def get_queue_manager(self, worker_name: str) -> QueueManager:
"""
Get an instance of the queue manager associated to a worker, based on its host.
Parameters
----------
worker_name
The name of the worker.
Returns
-------
An instance of the QueueManager associated to the worker.
"""
if worker_name not in self.queue_managers:
worker = self.get_worker(worker_name)
self.queue_managers[worker_name] = QueueManager(
Expand Down
1 change: 0 additions & 1 deletion src/jobflow_remote/utils/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def initialize_runner_logger(
# runner is started.
makedirs_p(log_folder)

print("!!!", runner_id)
if runner_id:
msg_format = f"%(asctime)s [%(levelname)s] ID {runner_id} %(name)s: %(message)s"
else:
Expand Down

0 comments on commit 1a664cc

Please sign in to comment.