Skip to content

Commit

Permalink
tests and updates
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Mar 15, 2024
1 parent 3a6ee13 commit 42e6fae
Show file tree
Hide file tree
Showing 24 changed files with 1,917 additions and 64 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ jobs:
run: pre-commit run --all-files --show-diff-on-failure

test:
services:
local_mongodb:
image: mongo:4.4
ports:
- 27017:27017

runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand Down
67 changes: 64 additions & 3 deletions src/jobflow_remote/cli/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from jobflow_remote.cli.types import (
db_ids_opt,
end_date_opt,
flow_ids_opt,
flow_state_opt,
force_opt,
job_ids_indexes_opt,
job_state_opt,
Expand Down Expand Up @@ -158,17 +160,76 @@ def unlock(
if not confirmed:
raise typer.Exit(0)

with loading_spinner(False) as progress:
progress.add_task(description="Unlocking jobs...", total=None)

num_unlocked = jc.unlock_jobs(
job_ids=job_id,
db_ids=db_id,
state=state,
start_date=start_date,
end_date=end_date,
)

out_console.print(f"{num_unlocked} jobs were unlocked")


@app_admin.command()
def unlock_flow(
job_id: job_ids_indexes_opt = None,
db_id: db_ids_opt = None,
flow_id: flow_ids_opt = None,
state: flow_state_opt = None,
start_date: start_date_opt = None,
end_date: end_date_opt = None,
force: force_opt = False,
):
"""
Forcibly removes the lock from the documents of the selected jobs.
WARNING: can lead to inconsistencies if the processes is actually running
"""

job_ids_indexes = get_job_ids_indexes(job_id)

jc = get_job_controller()

if not force:
with loading_spinner(False) as progress:
progress.add_task(
description="Checking the number of locked documents...", total=None
)

num_unlocked = jc.remove_lock_job(
job_ids=job_id,
flows_info = jc.get_flows_info(
job_ids=job_ids_indexes,
db_ids=db_id,
flow_ids=flow_id,
state=state,
start_date=start_date,
locked=True,
end_date=end_date,
)

out_console.print(f"{num_unlocked} jobs were unlocked")
if not flows_info:
exit_with_error_msg("No data matching the request")

text = Text.from_markup(
f"[red]This operation will [bold]remove the lock[/bold] for (roughly) [bold]{len(flows_info)} Flow(s)[/bold]. Proceed anyway?[/red]"
)
confirmed = Confirm.ask(text, default=False)

if not confirmed:
raise typer.Exit(0)

with loading_spinner(False) as progress:
progress.add_task(description="Unlocking flows...", total=None)

num_unlocked = jc.unlock_flows(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
start_date=start_date,
end_date=end_date,
)

out_console.print(f"{num_unlocked} flows were unlocked")
2 changes: 1 addition & 1 deletion src/jobflow_remote/cli/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def format_flow_info(flow_info: FlowInfo):
table.title_style = "bold"
table.add_column("DB id")
table.add_column("Name")
table.add_column("State [Remote]")
table.add_column("State")
table.add_column("Job id (Index)")
table.add_column("Worker")

Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def set_state(
)

if not succeeded:
exit_with_error_msg("Could not reset the remote attempts")
exit_with_error_msg("Could not change the job state")

print_success_msg()

Expand Down Expand Up @@ -675,7 +675,7 @@ def exec_config(
hours=hours,
verbosity=verbosity,
raise_on_error=raise_on_error,
exec_config_value=exec_config_value,
exec_config=exec_config_value,
)


Expand Down
6 changes: 5 additions & 1 deletion src/jobflow_remote/cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def run(
if not (transfer or complete or queue or checkout):
transfer = complete = queue = checkout = True

runner.run(transfer=transfer, complete=complete, queue=queue, checkout=checkout)
try:
runner.run(transfer=transfer, complete=complete, queue=queue, checkout=checkout)
finally:
runner.cleanup()


@app_runner.command()
Expand Down Expand Up @@ -261,6 +264,7 @@ def status():
DaemonStatus.STOPPING: "gold1",
DaemonStatus.SHUT_DOWN: "red",
DaemonStatus.PARTIALLY_RUNNING: "gold1",
DaemonStatus.STARTING: "gold1",
DaemonStatus.RUNNING: "green",
}[current_status]
text = Text()
Expand Down
3 changes: 3 additions & 0 deletions src/jobflow_remote/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def cleanup_job_controller():
global _shared_job_controller
if _shared_job_controller is not None:
_shared_job_controller.close()
# set to None again, in case it needs to be used again in the same
# execution (e.g., in tests)
_shared_job_controller = None


def start_profiling():
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ class RunnerOptions(BaseModel):
delay_refresh_limited: int = Field(
600,
description="Delay between subsequent refresh from the DB of the number of submitted "
"and running jobs (seconds). Only use if a worker with max_jobs is present",
"and running jobs (seconds). Only used if a worker with max_jobs is present",
)
delay_update_batch: int = Field(
60,
description="Delay between subsequent refresh from the DB of the number of submitted "
"and running jobs (seconds). Only use if a worker with max_jobs is present",
"and running jobs (seconds). Only used if a batch worker is present",
)
lock_timeout: Optional[int] = Field(
86400,
Expand Down
37 changes: 30 additions & 7 deletions src/jobflow_remote/jobs/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class DaemonStatus(Enum):
STOPPED = "STOPPED"
STOPPING = "STOPPING"
PARTIALLY_RUNNING = "PARTIALLY_RUNNING"
STARTING = "STARTING"
RUNNING = "RUNNING"


Expand Down Expand Up @@ -224,7 +225,7 @@ def check_supervisord_process(self) -> bool:
logger.warning(
f"Process with pid {pid} is not running but daemon files are present. Cleaning them up."
)
self.clean_files()
self.clean_files()

return running

Expand Down Expand Up @@ -258,7 +259,10 @@ def check_status(self) -> DaemonStatus:
)

if all(pi.get("state") in RUNNING_STATES for pi in proc_info):
return DaemonStatus.RUNNING
if any(pi.get("state") == ProcessStates.STARTING for pi in proc_info):
return DaemonStatus.STARTING
else:
return DaemonStatus.RUNNING

if any(pi.get("state") in RUNNING_STATES for pi in proc_info):
return DaemonStatus.PARTIALLY_RUNNING
Expand Down Expand Up @@ -483,12 +487,31 @@ def _verify_call_result(
return None

def kill(self, raise_on_error: bool = False) -> bool:
status = self.check_status()
if status == DaemonStatus.SHUT_DOWN:
logger.info("supervisord is not running. No process is running")
return True
# If the daemon is shutting down supervisord may not be able to identify
# the state. Try proceeding in that case, since we really want to kill
# the process
status = None
try:
status = self.check_status()
if status == DaemonStatus.SHUT_DOWN:
logger.info("supervisord is not running. No process is running")
return True
if status == DaemonStatus.STOPPED:
logger.info("Processes are already stopped.")
return True
except DaemonError as e:
msg = (
f"Error while determining the state of the runner: {getattr(e, 'message', str(e))}."
f"Proceeding with the kill command."
)
logger.warning(msg)

if status in (DaemonStatus.RUNNING, DaemonStatus.STOPPING):
if status in (
None,
DaemonStatus.RUNNING,
DaemonStatus.STOPPING,
DaemonStatus.PARTIALLY_RUNNING,
):
interface = self.get_interface()
result = interface.supervisor.signalAllProcesses(9)
error = self._verify_call_result(result, "kill", raise_on_error)
Expand Down
44 changes: 34 additions & 10 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ def _full_rerun(
for dep_id, dep_index in descendants:
if max(flow_doc.ids_mapping[dep_id]) > dep_index:
raise ValueError(
f"Job {job_id} has a child job ({dep_id}) which is not the last index ({dep_index}. "
f"Job {job_id} has a child job ({dep_id}) which is not the last index ({dep_index}). "
"Rerunning the Job will lead to inconsistencies and is not allowed."
)

Expand Down Expand Up @@ -1078,6 +1078,7 @@ def _set_job_properties(
wait: int | None = None,
break_lock: bool = False,
acceptable_states: list[JobState] | None = None,
use_pipeline: bool = False,
) -> list[int]:
"""
Helper to set multiple values in a JobDoc while locking the Job.
Expand All @@ -1088,7 +1089,7 @@ def _set_job_properties(
----------
values
Dictionary with the values to be set. Will be passed to a pymongo
`find_one_and_update` method.
`update_one` method.
db_id
The db_id of the Job.
job_id
Expand All @@ -1105,6 +1106,8 @@ def _set_job_properties(
acceptable_states
List of JobState for which the Job values can be changed.
If None all states are acceptable.
use_pipeline
if True a pipeline will be used in the update of the document
Returns
-------
list
Expand Down Expand Up @@ -1135,7 +1138,9 @@ def _set_job_properties(
)
values = dict(values)
# values["updated_on"] = datetime.utcnow()
lock.update_on_release = {"$set": values}
lock.update_on_release = (
[{"$set": values}] if use_pipeline else {"$set": values}
)
return [doc["db_id"]]

return []
Expand Down Expand Up @@ -1256,7 +1261,7 @@ def retry_jobs(
"""
return self._many_jobs_action(
method=self.retry_job,
action_description="rerunning",
action_description="retrying",
job_ids=job_ids,
db_ids=db_ids,
flow_ids=flow_ids,
Expand Down Expand Up @@ -1876,17 +1881,31 @@ def set_job_run_properties(
exec_config = exec_config.model_dump()

if update and isinstance(exec_config, dict):
for k, v in exec_config.items():
set_dict[f"exec_config.{k}"] = v
# if the content is a string replace even if it is an update,
# merging is meaningless
cond = {
"$cond": {
"if": {"$eq": [{"$type": "$exec_config"}, "string"]},
"then": exec_config,
"else": {"$mergeObjects": ["$exec_config", exec_config]},
}
}
print(cond)
set_dict["exec_config"] = cond

else:
set_dict["exec_config"] = exec_config

if resources:
if isinstance(resources, QResources):
resources = resources.as_dict()
# if passing a QResources it is pointless to update
# all the keywords will be overwritten and if the previous
# value was a generic dictionary the merged dictionary will fail
# almost surely lead to failures
update = False
if update:
for k, v in resources.items():
set_dict[f"resources.{k}"] = v
set_dict["resources"] = {"$mergeObjects": ["$resources", resources]}
else:
set_dict["resources"] = resources

Expand All @@ -1904,6 +1923,7 @@ def set_job_run_properties(
raise_on_error=raise_on_error,
values=set_dict,
acceptable_states=[JobState.READY, JobState.WAITING],
use_pipeline=update,
)

def get_flow_job_aggreg(
Expand Down Expand Up @@ -1968,6 +1988,7 @@ def get_flows_info(
start_date: datetime | None = None,
end_date: datetime | None = None,
name: str | None = None,
locked: bool = False,
sort: list[tuple] | None = None,
limit: int = 0,
full: bool = False,
Expand All @@ -1994,6 +2015,8 @@ def get_flows_info(
name
Pattern matching the name of Flow. Default is an exact match, but all
conventions from python fnmatch can be used (e.g. *test*)
locked
If True only locked Flows will be selected.
sort
A list of (key, direction) pairs specifying the sort order for this
query. Follows pymongo conventions.
Expand All @@ -2017,6 +2040,7 @@ def get_flows_info(
start_date=start_date,
end_date=end_date,
name=name,
locked=locked,
)

# Only use the full aggregation if more job details are needed.
Expand Down Expand Up @@ -2105,7 +2129,7 @@ def delete_flow(self, flow_id: str, delete_output: bool = False):
self.flows.delete_one({"uuid": flow_id})
return True

def remove_lock_job(
def unlock_jobs(
self,
job_ids: tuple[str, int] | list[tuple[str, int]] | None = None,
db_ids: str | list[str] | None = None,
Expand Down Expand Up @@ -2168,7 +2192,7 @@ def remove_lock_job(
)
return result.modified_count

def remove_lock_flow(
def unlock_flows(
self,
job_ids: str | list[str] | None = None,
db_ids: str | list[str] | None = None,
Expand Down
Loading

0 comments on commit 42e6fae

Please sign in to comment.