Skip to content

Commit

Permalink
Fix issue n flow state when rerunning jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed May 31, 2024
1 parent 8589c55 commit 98922f6
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 14 deletions.
1 change: 0 additions & 1 deletion src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def jobs_list(
start_date = get_start_date(start_date, days, hours)

db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]
print(error)

if error:
state = [JobState.REMOTE_ERROR, JobState.FAILED]
Expand Down
2 changes: 0 additions & 2 deletions src/jobflow_remote/jobs/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ def get_graph(flow: FlowInfo, label: str = "name") -> DiGraph:
job_prop["job_name"] = job_prop.pop("name")
graph.add_node(db_id, **job_prop)

print(flow.parents)

# Add edges based on parents
for child_node, parents in zip(flow.db_ids, flow.parents):
for parent_uuid in parents:
Expand Down
41 changes: 31 additions & 10 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import traceback
import warnings
from collections import defaultdict
from contextlib import ExitStack
from datetime import datetime, timedelta, timezone
from pathlib import Path
Expand Down Expand Up @@ -974,6 +975,7 @@ def _full_rerun(
JobState.PAUSED.value,
]
# Update the state of the descendants
updated_states: dict[str, dict[int, JobState]] = defaultdict(dict)
with ExitStack() as stack:
# first acquire the lock on all the descendants and
# check their state if needed. Break immediately if
Expand Down Expand Up @@ -1022,17 +1024,21 @@ def _full_rerun(
# Here all the descendants are locked and could be set to WAITING.
# Set the new state for all of them.
for child_lock in children_locks:
if child_lock.locked_document["state"] != JobState.WAITING.value:
modified_jobs.append(child_lock.locked_document["db_id"])
child_doc = child_lock.locked_document
if child_doc["state"] != JobState.WAITING.value:
modified_jobs.append(child_doc["db_id"])
child_doc_update = get_reset_job_base_dict()
child_doc_update["state"] = JobState.WAITING.value
child_lock.update_on_release = {"$set": child_doc_update}
updated_states[child_doc["uuid"]][
child_doc["index"]
] = JobState.WAITING

# if everything is fine here, update the state of the flow
# before releasing its lock and set the update for the original job
# pass explicitly the new state of the job, since it is not updated
# in the DB. The Job is the last lock to be released.
updated_states = {job_id: {job_index: JobState.READY}}
updated_states[job_id][job_index] = JobState.READY
self.update_flow_state(
flow_uuid=flow_doc.uuid, updated_states=updated_states
)
Expand Down Expand Up @@ -1813,7 +1819,7 @@ def set_job_run_properties(
job_ids: tuple[str, int] | list[tuple[str, int]] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | list[str] | None = None,
states: JobState | None = None,
states: JobState | list[JobState] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
name: str | None = None,
Expand Down Expand Up @@ -1894,7 +1900,6 @@ def set_job_run_properties(
"else": {"$mergeObjects": ["$exec_config", exec_config]},
}
}
print(cond)
set_dict["exec_config"] = cond

else:
Expand Down Expand Up @@ -2147,7 +2152,7 @@ def unlock_jobs(
job_ids: tuple[str, int] | list[tuple[str, int]] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | list[str] | None = None,
states: JobState | None = None,
states: JobState | list[JobState] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
name: str | None = None,
Expand Down Expand Up @@ -2413,7 +2418,7 @@ def count_jobs(
job_ids: tuple[str, int] | list[tuple[str, int]] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | list[str] | None = None,
states: JobState | None = None,
states: JobState | list[JobState] | None = None,
locked: bool = False,
start_date: datetime | None = None,
end_date: datetime | None = None,
Expand Down Expand Up @@ -3139,17 +3144,33 @@ def update_flow_state(
flow_uuid=flow_uuid, projection=projection
)

# update the full list of states and those of the leafs according
# to the updated_states passed
jobs_states = [
updated_states.get(j["uuid"], {}).get(j["index"], JobState(j["state"]))
for j in flow_jobs
]
leafs = get_flow_leafs(flow_jobs)
leaf_states = [JobState(j["state"]) for j in leafs]
leaf_states = [
updated_states.get(j["uuid"], {}).get(j["index"], JobState(j["state"]))
for j in leafs
]
flow_state = FlowState.from_jobs_states(
jobs_states=jobs_states, leaf_states=leaf_states
)
set_state = {"$set": {"state": flow_state.value}}
self.flows.find_one_and_update({"uuid": flow_uuid}, set_state)

# update flow state. If it is changed update the updated_on
updated_cond = {
"$cond": {
"if": {"$eq": ["$state", flow_state.value]},
"then": "$updated_on",
"else": datetime.utcnow(),
}
}
self.flows.find_one_and_update(
{"uuid": flow_uuid},
[{"$set": {"state": flow_state.value, "updated_on": updated_cond}}],
)

@contextlib.contextmanager
def lock_job(self, **lock_kwargs) -> Generator[MongoLock, None, None]:
Expand Down
17 changes: 17 additions & 0 deletions tests/db/jobs/test_jobcontroller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest

from jobflow_remote.jobs.state import FlowState


def test_submit_flow(job_controller, runner):
from jobflow import Flow
Expand Down Expand Up @@ -156,6 +158,7 @@ def test_rerun_completed(job_controller, runner):
job_controller.get_job_info(job_id=j2.uuid, job_index=j2.index).state
== JobState.WAITING
)
assert job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.READY

# run all the jobs
runner.run_all_jobs(max_seconds=20)
Expand Down Expand Up @@ -196,6 +199,10 @@ def test_rerun_completed(job_controller, runner):
== JobState.COMPLETED
)

assert (
job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.RUNNING
)

# can rerun if breaking the lock
# catch the warning coming from MongoLock
with pytest.warns(UserWarning, match="Could not release lock for document"):
Expand Down Expand Up @@ -240,13 +247,17 @@ def test_rerun_failed(job_controller, runner):
assert j3_info.state == JobState.READY
assert j4_info.state == JobState.WAITING

assert job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.FAILED

# rerun without "force". Since the job is FAILED and the children are
# WAITING or READY
assert set(job_controller.rerun_job(job_id=j1.uuid, job_index=j1.index)) == {
j1_info.db_id,
j3_info.db_id,
}

assert job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.READY

assert job_controller.count_jobs(states=JobState.READY) == 1

# run the first job again and the job with OnMissing.None
Expand Down Expand Up @@ -293,6 +304,8 @@ def test_rerun_failed(job_controller, runner):
job_controller.get_job_info(job_id=j4.uuid, job_index=2).state == JobState.READY
)

assert job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.FAILED


def test_rerun_remote_error(job_controller, monkeypatch, runner):
from jobflow import Flow
Expand Down Expand Up @@ -321,6 +334,9 @@ def upload_error(self, lock):

j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index)
assert j1_info.state == JobState.REMOTE_ERROR
assert (
job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.RUNNING
)

# can rerun without "force"
assert job_controller.rerun_job(job_id=j1.uuid, job_index=j1.index, force=True) == [
Expand All @@ -330,6 +346,7 @@ def upload_error(self, lock):
job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index).state
== JobState.READY
)
assert job_controller.get_flows_info(job_ids=[j1.uuid])[0].state == FlowState.READY


def test_retry(job_controller, monkeypatch, runner):
Expand Down
5 changes: 4 additions & 1 deletion tests/integration/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,7 @@ def test_undefined_additional_stores(worker, job_controller):

# The job should fail, as the additional store is not defined
assert job_controller.count_jobs(states=JobState.COMPLETED) == 1
assert job_controller.count_jobs(states=JobState.REMOTE_ERROR) == 1
assert (
job_controller.count_jobs(states=[JobState.COMPLETED, JobState.REMOTE_ERROR])
== 2
)

0 comments on commit 98922f6

Please sign in to comment.