Skip to content

Commit

Permalink
Handle more states in jobcontroller commands
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed May 31, 2024
1 parent 3516dab commit 8589c55
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 118 deletions.
8 changes: 4 additions & 4 deletions src/jobflow_remote/cli/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def unlock(
jobs_info = jc.get_jobs_info(
job_ids=job_ids_indexes,
db_ids=db_id,
state=state,
states=state,
start_date=start_date,
locked=True,
end_date=end_date,
Expand All @@ -166,7 +166,7 @@ def unlock(
num_unlocked = jc.unlock_jobs(
job_ids=job_id,
db_ids=db_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
)
Expand Down Expand Up @@ -203,7 +203,7 @@ def unlock_flow(
job_ids=job_ids_indexes,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
locked=True,
end_date=end_date,
Expand All @@ -227,7 +227,7 @@ def unlock_flow(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
)
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def flows_list(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -129,7 +129,7 @@ def delete(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down
42 changes: 29 additions & 13 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,21 @@ def jobs_list(
reverse_sort: reverse_sort_flag_opt = False,
locked: locked_opt = False,
custom_query: query_opt = None,
error: Annotated[
bool,
typer.Option(
"--error",
"-e",
help="Select the jobs in FAILED and REMOTE_ERROR state. Incompatible with the --state option",
),
] = False,
):
"""
Get the list of Jobs in the database
"""
check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours})
check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours})
check_incompatible_opt({"state": state, "error": error})
metadata_dict = str_to_dict(metadata)

job_ids_indexes = get_job_ids_indexes(job_id)
Expand All @@ -98,6 +107,10 @@ def jobs_list(
start_date = get_start_date(start_date, days, hours)

db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]
print(error)

if error:
state = [JobState.REMOTE_ERROR, JobState.FAILED]

with loading_spinner():
if custom_query:
Expand All @@ -111,7 +124,7 @@ def jobs_list(
job_ids=job_ids_indexes,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
locked=locked,
end_date=end_date,
Expand Down Expand Up @@ -285,7 +298,7 @@ def rerun(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -337,7 +350,7 @@ def retry(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -383,7 +396,7 @@ def pause(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -428,7 +441,7 @@ def play(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -478,7 +491,7 @@ def stop(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -606,7 +619,8 @@ def worker(
raise_on_error: raise_on_error_opt = False,
):
"""
Set the worker for the selected Jobs. Only READY or WAITING Jobs.
Set the worker for the selected Jobs.
Only Jobs not in an evolving state (e.g. CHECKED_OUT, UPLOADED, ...).
"""

jc = get_job_controller()
Expand All @@ -618,7 +632,7 @@ def worker(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -654,7 +668,8 @@ def exec_config(
raise_on_error: raise_on_error_opt = False,
):
"""
Set the exec_config for the selected Jobs. Only READY or WAITING Jobs.
Set the exec_config for the selected Jobs.
Only Jobs not in an evolving state (e.g. CHECKED_OUT, UPLOADED, ...).
"""

jc = get_job_controller()
Expand All @@ -666,7 +681,7 @@ def exec_config(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -723,7 +738,8 @@ def resources(
raise_on_error: raise_on_error_opt = False,
):
"""
Set the resources for the selected Jobs. Only READY or WAITING Jobs.
Set the resources for the selected Jobs.
Only Jobs not in an evolving state (e.g. CHECKED_OUT, UPLOADED, ...)
"""

resources = str_to_dict(resources_value)
Expand All @@ -740,7 +756,7 @@ def resources(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down Expand Up @@ -794,7 +810,7 @@ def job_dump(
job_ids=job_ids_indexes,
db_ids=db_id,
flow_ids=flow_id,
state=state,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down
8 changes: 4 additions & 4 deletions src/jobflow_remote/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@


job_state_opt = Annotated[
Optional[JobState],
Optional[list[JobState]],
typer.Option(
"--state",
"-s",
help="One of the Job states",
help="One or more of the Job states",
),
]


flow_state_opt = Annotated[
Optional[FlowState],
Optional[list[FlowState]],
typer.Option(
"--state",
"-s",
help="One of the Flow states",
help="One or more of the Flow states",
),
]

Expand Down
8 changes: 4 additions & 4 deletions src/jobflow_remote/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def execute_multi_jobs_cmd(
job_ids: list[str] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | list[str] | None = None,
state: JobState | None = None,
states: JobState | list[JobState] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
name: str | None = None,
Expand All @@ -341,7 +341,7 @@ def execute_multi_jobs_cmd(
job_ids,
db_ids,
flow_ids,
state,
states,
start_date,
end_date,
name,
Expand Down Expand Up @@ -376,7 +376,7 @@ def execute_multi_jobs_cmd(
job_ids_indexes,
db_ids,
flow_ids,
state,
states,
start_date,
end_date,
name,
Expand All @@ -397,7 +397,7 @@ def execute_multi_jobs_cmd(
job_ids=job_ids_indexes,
db_ids=db_ids,
flow_ids=flow_ids,
state=state,
states=states,
start_date=start_date,
end_date=end_date,
name=name,
Expand Down
4 changes: 3 additions & 1 deletion src/jobflow_remote/jobs/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def get_graph(flow: FlowInfo, label: str = "name") -> DiGraph:
job_prop["job_name"] = job_prop.pop("name")
graph.add_node(db_id, **job_prop)

print(flow.parents)

# Add edges based on parents
for child_node, parents in zip(flow.db_ids, flow.parents):
for parent_uuid in parents:
Expand Down Expand Up @@ -207,7 +209,7 @@ def get_mermaid(flow: FlowInfo, show_subflows: bool = True):

# add replace edges
for parent_db_id, child_id in replace_edges:
line = f" {parent_db_id} -.-> {child_id}"
line = f" {parent_db_id}({nodes[parent_db_id]['name']}) -.-> {child_id}"
lines.append(line)

subgraph_styles = []
Expand Down
Loading

0 comments on commit 8589c55

Please sign in to comment.