From 77f9a89b461e065bd39e5316ad310002fb719b37 Mon Sep 17 00:00:00 2001 From: Fabian Peschel Date: Tue, 10 Dec 2024 11:24:26 +0100 Subject: [PATCH 1/5] option for stored data --- src/jobflow_remote/cli/formatting.py | 24 ++++++++++++++++++++++-- src/jobflow_remote/cli/job.py | 23 ++++++++++++++++++++++- src/jobflow_remote/jobs/data.py | 1 + 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/jobflow_remote/cli/formatting.py b/src/jobflow_remote/cli/formatting.py index b8121654..34219a20 100644 --- a/src/jobflow_remote/cli/formatting.py +++ b/src/jobflow_remote/cli/formatting.py @@ -26,14 +26,20 @@ from jobflow_remote.jobs.upgrade import UpgradeAction -def get_job_info_table(jobs_info: list[JobInfo], verbosity: int) -> Table: +def get_job_info_table( + jobs_info: list[JobInfo], + verbosity: int, + stored_data_keys: list[str] | None = None, + skip_job_id: bool = False, +) -> Table: time_zone_str = f" [{time.tzname[0]}]" table = Table(title="Jobs info") table.add_column("DB id") table.add_column("Name") table.add_column("State") - table.add_column("Job id (Index)") + if not skip_job_id: + table.add_column("Job id (Index)") table.add_column("Worker") table.add_column("Last updated" + time_zone_str) @@ -50,6 +56,10 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int) -> Table: table.add_column("Lock id") table.add_column("Lock time" + time_zone_str) + if stored_data_keys: + for key in stored_data_keys: + table.add_column(f"{key}") + for ji in jobs_info: state = ji.state.name @@ -66,6 +76,8 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int) -> Table: ji.worker, convert_utc_time(ji.updated_on).strftime(fmt_datetime), ] + if skip_job_id: + row.pop(3) if verbosity >= 1: row.append(ji.remote.process_id) @@ -98,6 +110,14 @@ def get_job_info_table(jobs_info: list[JobInfo], verbosity: int) -> Table: else None ) + if stored_data_keys: + default_value = Text.from_markup("[italic grey58]none[/]") + stored_data = ji.stored_data + row += [ + stored_data.get(key, default_value) if stored_data else default_value + for key in stored_data_keys + ] + table.add_row(*row) return table diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index 6a9cec18..9e13d486 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -105,6 +105,22 @@ def jobs_list( help="Select the jobs in FAILED and REMOTE_ERROR state. Incompatible with the --state option", ), ] = False, + stored_data_keys: Annotated[ + Optional[list[str]], + typer.Option( + "--stored-data-key", + "-sdk", + help="Key to be shown from the stored_data field.", + ), + ] = None, + skip_job_id: Annotated[ + bool, + typer.Option( + "--skip-job-id", + "-sji", + help="Skip the UUID field in the output table.", + ), + ] = False, ): """ Get the list of Jobs in the database @@ -163,7 +179,12 @@ def jobs_list( sort=db_sort, ) - table = get_job_info_table(jobs_info, verbosity=verbosity) + table = get_job_info_table( + jobs_info, + verbosity=verbosity, + stored_data_keys=stored_data_keys, + skip_job_id=skip_job_id, + ) out_console.print(table) if SETTINGS.cli_suggestions: diff --git a/src/jobflow_remote/jobs/data.py b/src/jobflow_remote/jobs/data.py index a2ef47a0..2fb54d4c 100644 --- a/src/jobflow_remote/jobs/data.py +++ b/src/jobflow_remote/jobs/data.py @@ -147,6 +147,7 @@ class JobInfo(BaseModel): end_time: Optional[datetime] = None priority: int = 0 metadata: Optional[dict] = None + stored_data: Optional[dict] = None @property def is_locked(self) -> bool: From 221b0c5c941f7e9cc574de1c877d5884cc1c578d Mon Sep 17 00:00:00 2001 From: Fabian Peschel Date: Thu, 19 Dec 2024 11:11:53 +0100 Subject: [PATCH 2/5] implement the option to specify all columns explicitly --- src/jobflow_remote/cli/flow.py | 2 +- src/jobflow_remote/cli/formatting.py | 165 +++++++++++++------------- src/jobflow_remote/cli/job.py | 22 ++-- src/jobflow_remote/config/settings.py | 4 + tests/db/cli/test_job.py | 8 ++ 5 files changed, 110 insertions(+), 91 deletions(-) diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index 57590c4c..81490749 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -74,7 +74,7 @@ def flows_list( sort: sort_opt = SortOption.UPDATED_ON, reverse_sort: reverse_sort_flag_opt = False, ) -> None: - """Get the list of Jobs in the database.""" + """Get the list of Flows in the database.""" check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours}) check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours}) diff --git a/src/jobflow_remote/cli/formatting.py b/src/jobflow_remote/cli/formatting.py index 34219a20..1becb569 100644 --- a/src/jobflow_remote/cli/formatting.py +++ b/src/jobflow_remote/cli/formatting.py @@ -26,99 +26,100 @@ from jobflow_remote.jobs.upgrade import UpgradeAction +def format_state(ji: JobInfo) -> Text: + state = ji.state.name + if ji.state in (JobState.REMOTE_ERROR, JobState.FAILED): + state = f"[bold red]{state}[/]" + elif ji.remote.retry_time_limit is not None: + state = f"[bold orange3]{state}[/]" + return Text.from_markup(state) + + +def format_run_time(ji: JobInfo) -> str: + prefix = "" + if ji.state == JobState.RUNNING: + run_time = ji.estimated_run_time + prefix = "~" + else: + run_time = ji.run_time + if not run_time: + return "" + m, s = divmod(run_time, 60) + h, m = divmod(m, 60) + return prefix + f"{h:g}:{m:02g}" + + +time_zone_str = f" [{time.tzname[0]}]" +header_name_data_getter_map = { + "db_id": ("DB id", lambda ji: str(ji.db_id)), + "name": ("Name", lambda ji: ji.name), + "state": ("State", format_state), + "job_id": ("Job id (Index)", lambda ji: f"{ji.uuid} ({ji.index})"), + "worker": ("Worker", lambda ji: ji.worker), + "last_updated": ( + "Last updated" + time_zone_str, + lambda ji: convert_utc_time(ji.updated_on).strftime(fmt_datetime), + ), + "queue_id": ("Queue id", lambda ji: ji.remote.process_id), + "run_time": ("Run time", format_run_time), + "retry_time": ( + "Retry time" + time_zone_str, + lambda ji: convert_utc_time(ji.remote.retry_time_limit).strftime(fmt_datetime) + if ji.remote.retry_time_limit + else None, + ), + "prev_state": ( + "Prev state", + lambda ji: ji.previous_state.name if ji.previous_state else None, + ), + "locked": ("Locked", lambda ji: "*" if ji.lock_id is not None else None), + "lock_id": ("Lock id", lambda ji: str(ji.lock_id)), + "lock_time": ( + "Lock time" + time_zone_str, + lambda ji: convert_utc_time(ji.lock_time).strftime(fmt_datetime) + if ji.lock_time + else None, + ), +} + + def get_job_info_table( jobs_info: list[JobInfo], verbosity: int, + header_keys: list[str] | None = None, stored_data_keys: list[str] | None = None, - skip_job_id: bool = False, ) -> Table: - time_zone_str = f" [{time.tzname[0]}]" + stored_data_keys = stored_data_keys or [] + all_header_keys = list(header_name_data_getter_map) + if not header_keys: + header_keys = all_header_keys[:6] + if verbosity >= 1: + header_keys += all_header_keys[6:10] + if verbosity == 1: + header_keys.append(all_header_keys[10]) + if verbosity >= 2: + header_keys += all_header_keys[11:13] table = Table(title="Jobs info") - table.add_column("DB id") - table.add_column("Name") - table.add_column("State") - if not skip_job_id: - table.add_column("Job id (Index)") + for key in header_keys: + table.add_column(header_name_data_getter_map[key][0]) + for key in stored_data_keys: + table.add_column(key) - table.add_column("Worker") - table.add_column("Last updated" + time_zone_str) - - if verbosity >= 1: - table.add_column("Queue id") - table.add_column("Run time") - table.add_column("Retry time" + time_zone_str) - table.add_column("Prev state") - if verbosity < 2: - table.add_column("Locked") - - if verbosity >= 2: - table.add_column("Lock id") - table.add_column("Lock time" + time_zone_str) - - if stored_data_keys: - for key in stored_data_keys: - table.add_column(f"{key}") + default_stored_data_value = Text.from_markup("[italic grey58]none[/]") for ji in jobs_info: - state = ji.state.name - - if ji.state in (JobState.REMOTE_ERROR, JobState.FAILED): - state = f"[bold red]{state}[/]" - elif ji.remote.retry_time_limit is not None: - state = f"[bold orange3]{state}[/]" - - row = [ - str(ji.db_id), - ji.name, - Text.from_markup(state), - f"{ji.uuid} ({ji.index})", - ji.worker, - convert_utc_time(ji.updated_on).strftime(fmt_datetime), - ] - if skip_job_id: - row.pop(3) - - if verbosity >= 1: - row.append(ji.remote.process_id) - prefix = "" - if ji.state == JobState.RUNNING: - run_time = ji.estimated_run_time - prefix = "~" - else: - run_time = ji.run_time - if run_time: - m, s = divmod(run_time, 60) - h, m = divmod(m, 60) - row.append(prefix + f"{h:g}:{m:02g}") - else: - row.append("") - row.append( - convert_utc_time(ji.remote.retry_time_limit).strftime(fmt_datetime) - if ji.remote.retry_time_limit - else None - ) - row.append(ji.previous_state.name if ji.previous_state else None) - if verbosity < 2: - row.append("*" if ji.lock_id is not None else None) - - if verbosity >= 2: - row.append(str(ji.lock_id)) - row.append( - convert_utc_time(ji.lock_time).strftime(fmt_datetime) - if ji.lock_time - else None + table.add_row( + *( + [header_name_data_getter_map[key][1](ji) for key in header_keys] + + [ + ji.stored_data.get(key, default_stored_data_value) + if ji.stored_data + else default_stored_data_value + for key in stored_data_keys + ] ) - - if stored_data_keys: - default_value = Text.from_markup("[italic grey58]none[/]") - stored_data = ji.stored_data - row += [ - stored_data.get(key, default_value) if stored_data else default_value - for key in stored_data_keys - ] - - table.add_row(*row) + ) return table diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index 9e13d486..c525f7dd 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -15,6 +15,7 @@ format_job_info, get_job_info_table, get_job_report_components, + header_name_data_getter_map, ) from jobflow_remote.cli.jf import app from jobflow_remote.cli.jfr_typer import JFRTyper @@ -113,17 +114,17 @@ def jobs_list( help="Key to be shown from the stored_data field.", ), ] = None, - skip_job_id: Annotated[ - bool, + header_keys: Annotated[ + Optional[list[str]], typer.Option( - "--skip-job-id", - "-sji", - help="Skip the UUID field in the output table.", + "--header-keys", + "-hk", + help="Table columns to be shown. Overrides verbosity option. Can also be set in the config file", ), - ] = False, + ] = None, ): """ - Get the list of Jobs in the database + Get the list of Jobs in the database. """ check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours}) check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours}) @@ -144,6 +145,11 @@ def jobs_list( worker_name, ], ) + header_keys = header_keys or SETTINGS.cli_job_list_columns + if not set(header_keys).issubset(header_name_data_getter_map): + exit_with_error_msg( + f"Header keys not supported: {set(header_keys).difference(header_name_data_getter_map)}" + ) job_ids_indexes = get_job_ids_indexes(job_id) @@ -182,8 +188,8 @@ def jobs_list( table = get_job_info_table( jobs_info, verbosity=verbosity, + header_keys=header_keys, stored_data_keys=stored_data_keys, - skip_job_id=skip_job_id, ) out_console.print(table) diff --git a/src/jobflow_remote/config/settings.py b/src/jobflow_remote/config/settings.py index 763e3660..4c9cfc41 100644 --- a/src/jobflow_remote/config/settings.py +++ b/src/jobflow_remote/config/settings.py @@ -30,6 +30,10 @@ class JobflowRemoteSettings(BaseSettings): cli_log_level: LogLevel = Field( LogLevel.WARN, description="The level set for logging in the CLI" ) + cli_job_list_columns: list[str] = Field( + default_factory=list, + description="The list of columns to show in the `jf job list` command.", + ) model_config = SettingsConfigDict(env_prefix="jfremote_") diff --git a/tests/db/cli/test_job.py b/tests/db/cli/test_job.py index c891a902..9636a320 100644 --- a/tests/db/cli/test_job.py +++ b/tests/db/cli/test_job.py @@ -32,6 +32,14 @@ def test_jobs_list(job_controller, two_flows_four_jobs) -> None: ["job", "list"], required_out=["Get more information about the errors"] ) + outputs = ["WAITING", "READY", "none", "State", "DB id", "whatever"] + excluded = ["add1", "add2", "Name", "Job id"] + run_check_cli( + ["job", "list", "-hk", "state", "-hk", "db_id", "-sdk", "whatever"], + required_out=outputs, + excluded_out=excluded, + ) + def test_job_info(job_controller, two_flows_four_jobs) -> None: from jobflow_remote.testing.cli import run_check_cli From 7beb476220c0f76633bf542698b4e44ce4532cf0 Mon Sep 17 00:00:00 2001 From: Fabian Peschel Date: Sun, 22 Dec 2024 03:30:29 +0100 Subject: [PATCH 3/5] implement Guido's feedback --- src/jobflow_remote/cli/formatting.py | 43 +++++++++++---------------- src/jobflow_remote/cli/job.py | 29 ++++++++++++------ src/jobflow_remote/config/settings.py | 5 ++-- tests/db/cli/test_job.py | 14 +++++++-- 4 files changed, 52 insertions(+), 39 deletions(-) diff --git a/src/jobflow_remote/cli/formatting.py b/src/jobflow_remote/cli/formatting.py index 1becb569..d7743f01 100644 --- a/src/jobflow_remote/cli/formatting.py +++ b/src/jobflow_remote/cli/formatting.py @@ -61,7 +61,7 @@ def format_run_time(ji: JobInfo) -> str: lambda ji: convert_utc_time(ji.updated_on).strftime(fmt_datetime), ), "queue_id": ("Queue id", lambda ji: ji.remote.process_id), - "run_time": ("Run time", format_run_time), + "run_time": (Text("Run time [h:mm]", no_wrap=True), format_run_time), "retry_time": ( "Retry time" + time_zone_str, lambda ji: convert_utc_time(ji.remote.retry_time_limit).strftime(fmt_datetime) @@ -86,40 +86,33 @@ def format_run_time(ji: JobInfo) -> str: def get_job_info_table( jobs_info: list[JobInfo], verbosity: int, - header_keys: list[str] | None = None, + output_keys: list[str] | None = None, stored_data_keys: list[str] | None = None, ) -> Table: stored_data_keys = stored_data_keys or [] - all_header_keys = list(header_name_data_getter_map) - if not header_keys: - header_keys = all_header_keys[:6] + if not output_keys or verbosity > 0: + all_output_keys = list(header_name_data_getter_map) + output_keys = all_output_keys[:6] if verbosity >= 1: - header_keys += all_header_keys[6:10] + output_keys += all_output_keys[6:10] if verbosity == 1: - header_keys.append(all_header_keys[10]) + output_keys.append(all_output_keys[10]) if verbosity >= 2: - header_keys += all_header_keys[11:13] + output_keys += all_output_keys[11:13] + all_display_keys = output_keys + stored_data_keys - table = Table(title="Jobs info") - for key in header_keys: - table.add_column(header_name_data_getter_map[key][0]) - for key in stored_data_keys: - table.add_column(key) + sdk_map = { + k: (k, lambda x, k=k: x.stored_data.get(k) if x.stored_data else None) + for k in stored_data_keys + } + full_map = header_name_data_getter_map | sdk_map - default_stored_data_value = Text.from_markup("[italic grey58]none[/]") + table = Table(title="Jobs info") + for key in all_display_keys: + table.add_column(full_map[key][0]) for ji in jobs_info: - table.add_row( - *( - [header_name_data_getter_map[key][1](ji) for key in header_keys] - + [ - ji.stored_data.get(key, default_stored_data_value) - if ji.stored_data - else default_stored_data_value - for key in stored_data_keys - ] - ) - ) + table.add_row(*(full_map[key][1](ji) for key in all_display_keys)) return table diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index c525f7dd..5c5b48d0 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -114,12 +114,14 @@ def jobs_list( help="Key to be shown from the stored_data field.", ), ] = None, - header_keys: Annotated[ - Optional[list[str]], + cli_output_keys: Annotated[ + Optional[str], typer.Option( - "--header-keys", - "-hk", - help="Table columns to be shown. Overrides verbosity option. Can also be set in the config file", + "--output", + "-o", + help=f"Table columns to be shown. Needs to be specified as string with comma separated keys, e.g." + f"'state,db_id,name'. Overrides the verbosity option. Can also be set in the config file. " + f"Available options are: {', '.join(header_name_data_getter_map)}", ), ] = None, ): @@ -129,6 +131,7 @@ def jobs_list( check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours}) check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours}) check_incompatible_opt({"state": state, "error": error}) + check_incompatible_opt({"output": cli_output_keys, "verbosity": verbosity}) check_query_incompatibility( custom_query, [ @@ -145,10 +148,18 @@ def jobs_list( worker_name, ], ) - header_keys = header_keys or SETTINGS.cli_job_list_columns - if not set(header_keys).issubset(header_name_data_getter_map): + output_keys = ( + cli_output_keys.split(",") + if cli_output_keys + else SETTINGS.cli_job_list_columns or [] + ) + if not set(output_keys).issubset(header_name_data_getter_map): + exit_with_error_msg( + f"Header keys not supported: {set(output_keys).difference(header_name_data_getter_map)}" + ) + if stored_data_keys and not set(output_keys).isdisjoint(stored_data_keys): exit_with_error_msg( - f"Header keys not supported: {set(header_keys).difference(header_name_data_getter_map)}" + "Specifying a stored data key which is a standard column is disallowed." ) job_ids_indexes = get_job_ids_indexes(job_id) @@ -188,7 +199,7 @@ def jobs_list( table = get_job_info_table( jobs_info, verbosity=verbosity, - header_keys=header_keys, + output_keys=output_keys, stored_data_keys=stored_data_keys, ) diff --git a/src/jobflow_remote/config/settings.py b/src/jobflow_remote/config/settings.py index 4c9cfc41..2bd9c8fd 100644 --- a/src/jobflow_remote/config/settings.py +++ b/src/jobflow_remote/config/settings.py @@ -30,9 +30,8 @@ class JobflowRemoteSettings(BaseSettings): cli_log_level: LogLevel = Field( LogLevel.WARN, description="The level set for logging in the CLI" ) - cli_job_list_columns: list[str] = Field( - default_factory=list, - description="The list of columns to show in the `jf job list` command.", + cli_job_list_columns: Optional[list[str]] = Field( + None, description="The list of columns to show in the `jf job list` command." ) model_config = SettingsConfigDict(env_prefix="jfremote_") diff --git a/tests/db/cli/test_job.py b/tests/db/cli/test_job.py index 9636a320..5fe6e630 100644 --- a/tests/db/cli/test_job.py +++ b/tests/db/cli/test_job.py @@ -32,14 +32,24 @@ def test_jobs_list(job_controller, two_flows_four_jobs) -> None: ["job", "list"], required_out=["Get more information about the errors"] ) - outputs = ["WAITING", "READY", "none", "State", "DB id", "whatever"] + outputs = ["WAITING", "READY", "State", "DB id", "whatever"] excluded = ["add1", "add2", "Name", "Job id"] run_check_cli( - ["job", "list", "-hk", "state", "-hk", "db_id", "-sdk", "whatever"], + ["job", "list", "-o", "state,db_id", "-sdk", "whatever"], required_out=outputs, excluded_out=excluded, ) + output = "Header keys not supported: {'not_existing_key'}" + run_check_cli( + ["job", "list", "-o", "state,not_existing_key"], required_out=output, error=True + ) + + output = "Options output, verbosity are incompatible" + run_check_cli( + ["job", "list", "-o", "state,name", "-vv"], required_out=output, error=True + ) + def test_job_info(job_controller, two_flows_four_jobs) -> None: from jobflow_remote.testing.cli import run_check_cli From af25cf9d85bcdd64b5d358475152b16048f74f02 Mon Sep 17 00:00:00 2001 From: Fabian Peschel Date: Wed, 25 Dec 2024 00:45:20 +0100 Subject: [PATCH 4/5] add referring to help --- src/jobflow_remote/config/settings.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/jobflow_remote/config/settings.py b/src/jobflow_remote/config/settings.py index 2bd9c8fd..80571f91 100644 --- a/src/jobflow_remote/config/settings.py +++ b/src/jobflow_remote/config/settings.py @@ -31,7 +31,9 @@ class JobflowRemoteSettings(BaseSettings): LogLevel.WARN, description="The level set for logging in the CLI" ) cli_job_list_columns: Optional[list[str]] = Field( - None, description="The list of columns to show in the `jf job list` command." + None, + description="The list of columns to show in the `jf job list` command. For available " + "options check the corresponding help: `jf job list -h`.", ) model_config = SettingsConfigDict(env_prefix="jfremote_") From c3904e814814371d86b23ff7561d48e089a92bc0 Mon Sep 17 00:00:00 2001 From: Fabian Peschel Date: Wed, 25 Dec 2024 00:58:20 +0100 Subject: [PATCH 5/5] add test for settings --- tests/db/cli/test_job.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/db/cli/test_job.py b/tests/db/cli/test_job.py index 5fe6e630..62d87f73 100644 --- a/tests/db/cli/test_job.py +++ b/tests/db/cli/test_job.py @@ -51,6 +51,30 @@ def test_jobs_list(job_controller, two_flows_four_jobs) -> None: ) +def test_jobs_list_settings(job_controller, two_flows_four_jobs, monkeypatch) -> None: + from jobflow_remote import SETTINGS + from jobflow_remote.testing.cli import run_check_cli + + with monkeypatch.context() as m: + m.setattr(SETTINGS, "cli_job_list_columns", ["state", "db_id"]) + + outputs = ["WAITING", "READY", "State", "DB id"] + excluded = ["add1", "add2", "Name", "Job id"] + run_check_cli( + ["job", "list"], + required_out=outputs, + excluded_out=excluded, + ) + + # using -v will lead to a very large table which isn't fully displayed + columns = ["DB", "id", "Name", "Sta", "Job", "id", "Wor", "Last", "Loc"] + outputs = columns + [f"add{i}" for i in range(1, 5)] + ["REA", "WAI"] + run_check_cli( + ["job", "list", "-v"], + required_out=outputs, + ) + + def test_job_info(job_controller, two_flows_four_jobs) -> None: from jobflow_remote.testing.cli import run_check_cli