Skip to content

Commit

Permalink
more fix for job and flow queries
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Mar 6, 2024
1 parent 7709359 commit 5299e1e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 11 deletions.
4 changes: 3 additions & 1 deletion src/jobflow_remote/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
typer.Option(
"--name",
"-n",
help="The name. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*)",
help="The name. Default is an exact match, but all conventions from "
"python fnmatch can be used (e.g. *test*). Using * wildcard may require"
"enclosing the search string in quotation marks.",
),
]

Expand Down
88 changes: 78 additions & 10 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def _build_query_job(
query["$or"] = or_list

if flow_ids:
query["hosts"] = {"$in": flow_ids}
query["job.hosts"] = {"$in": flow_ids}

if state:
query["state"] = state.value
Expand All @@ -253,7 +253,7 @@ def _build_query_job(
# Add the beginning of the line, so that it will match the string
# exactly if no wildcard is given. Otherwise will match substrings.
mongo_regex = "^" + fnmatch.translate(name).replace("\\\\", "\\")
query["name"] = {"$regex": mongo_regex}
query["job.name"] = {"$regex": mongo_regex}

if metadata:
metadata_dict = {f"job.metadata.{k}": v for k, v in metadata.items()}
Expand All @@ -265,7 +265,7 @@ def _build_query_flow(
self,
job_ids: str | list[str] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | None = None,
flow_ids: str | list[str] | None = None,
state: FlowState | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
Expand Down Expand Up @@ -308,6 +308,8 @@ def _build_query_flow(
job_ids = [job_ids]
if db_ids is not None and not isinstance(db_ids, (list, tuple)):
db_ids = [db_ids]
if flow_ids is not None and not isinstance(flow_ids, (list, tuple)):
flow_ids = [flow_ids]

query: dict = {}

Expand Down Expand Up @@ -1941,7 +1943,7 @@ def get_flows_info(
self,
job_ids: str | list[str] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | None = None,
flow_ids: str | list[str] | None = None,
state: FlowState | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
Expand All @@ -1951,7 +1953,7 @@ def get_flows_info(
full: bool = False,
) -> list[FlowInfo]:
"""
Query for Flows based on standard parameters and return a list of JobFlows.
Query for Flows based on standard parameters and return a list of FlowInfo.
Parameters
----------
Expand Down Expand Up @@ -1985,7 +1987,7 @@ def get_flows_info(
Returns
-------
list
A list of JobFlows.
A list of FlowInfo.
"""
query = self._build_query_flow(
job_ids=job_ids,
Expand Down Expand Up @@ -2150,7 +2152,7 @@ def remove_lock_flow(
self,
job_ids: str | list[str] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | None = None,
flow_ids: str | list[str] | None = None,
state: FlowState | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
Expand Down Expand Up @@ -2360,7 +2362,43 @@ def count_jobs(
end_date: datetime | None = None,
name: str | None = None,
metadata: dict | None = None,
):
) -> int:
"""
Count Jobs based on filters.
Parameters
----------
query
A generic query. Will override all the other parameters.
job_ids
One or more tuples, each containing the (uuid, index) pair of the
Jobs to retrieve.
db_ids
One or more db_ids of the Jobs to retrieve.
flow_ids
One or more Flow uuids to which the Jobs to retrieve belong.
state
The state of the Jobs.
locked
If True only locked Jobs will be selected.
start_date
Filter Jobs that were updated_on after this date.
Should be in the machine local time zone. It will be converted to UTC.
end_date
Filter Jobs that were updated_on before this date.
Should be in the machine local time zone. It will be converted to UTC.
name
Pattern matching the name of Job. Default is an exact match, but all
conventions from python fnmatch can be used (e.g. *test*)
metadata
A dictionary of the values of the metadata to match. Should be an
exact match for all the values provided.
Returns
-------
int
Number of Jobs matching the criteria.
"""
if query is None:
query = self._build_query_job(
job_ids=job_ids,
Expand All @@ -2380,12 +2418,42 @@ def count_flows(
query: dict | None = None,
job_ids: str | list[str] | None = None,
db_ids: str | list[str] | None = None,
flow_ids: str | None = None,
flow_ids: str | list[str] | None = None,
state: FlowState | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
name: str | None = None,
):
) -> int:
"""
Count flows based on filter parameters.
Parameters
----------
query
A generic query. Will override all the other parameters.
job_ids
One or more strings with uuids of Jobs belonging to the Flow.
db_ids
One or more db_ids of Jobs belonging to the Flow.
flow_ids
One or more Flow uuids.
state
The state of the Flows.
start_date
Filter Flows that were updated_on after this date.
Should be in the machine local time zone. It will be converted to UTC.
end_date
Filter Flows that were updated_on before this date.
Should be in the machine local time zone. It will be converted to UTC.
name
Pattern matching the name of Flow. Default is an exact match, but all
conventions from python fnmatch can be used (e.g. *test*)
Returns
-------
int
Number of Flows matching the criteria.
"""
if not query:
query = self._build_query_flow(
job_ids=job_ids,
Expand Down

0 comments on commit 5299e1e

Please sign in to comment.