From 557a0f89cf18a66c59bad23b15d9a079c0d6bf85 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 14 Nov 2023 14:48:41 +0000 Subject: [PATCH] completion_server: support "cylc set" arguments * Support the `--pre` and `--out` arguments to `cylc set`. * This requires the task ID(s) to be provided *before* the `--pre` / `--out` option because otherwise we don't have the required information to complete the arguments. * This lists prereqs/outputs from `cylc show` which is currently restricted to n=1 tasks. * This does not support completing comma separared prereqs/outputs, use the `--pre` / `--out` options multiple times to do this. --- cylc/flow/scripts/completion_server.py | 143 +++++++++++- .../scripts/test_completion_server.py | 207 ++++++++++++++++++ tests/unit/scripts/test_completion_server.py | 32 ++- 3 files changed, 363 insertions(+), 19 deletions(-) create mode 100644 tests/integration/scripts/test_completion_server.py diff --git a/cylc/flow/scripts/completion_server.py b/cylc/flow/scripts/completion_server.py index ead311955da..556e5834476 100644 --- a/cylc/flow/scripts/completion_server.py +++ b/cylc/flow/scripts/completion_server.py @@ -40,6 +40,8 @@ # Which provide possible values to the completion functions. import asyncio +from contextlib import suppress +import inspect import os from pathlib import Path import select @@ -50,6 +52,7 @@ from packaging.specifiers import SpecifierSet from cylc.flow.cfgspec.glbl_cfg import glbl_cfg +from cylc.flow.exceptions import CylcError from cylc.flow.id import tokenise, IDTokens, Tokens from cylc.flow.network.scan import scan from cylc.flow.option_parsers import CylcOptionParser as COP @@ -193,7 +196,12 @@ async def complete_cylc(_root: str, *items: str) -> t.List[str]: if ret is not None: return ret if previous and previous.startswith('-'): - ret = await complete_option_value(command, previous, partial) + ret = await complete_option_value( + command, + previous, + partial, + items=items, + ) if ret is not None: return ret @@ -256,10 +264,11 @@ async def complete_option( async def complete_option_value( command: str, option: str, - partial: t.Optional[str] = None + partial: t.Optional[str] = None, + items: t.Optional[t.Iterable[str]] = None, ) -> t.Optional[t.List[str]]: """Complete values for --options.""" - vals = await list_option_values(command, option, partial) + vals = await list_option_values(command, option, partial, items=items) if vals is not None: return complete(partial, vals) return None @@ -331,9 +340,21 @@ async def list_option_values( command: str, option: str, partial: t.Optional[str] = '', + items: t.Optional[t.Iterable[str]] = None, ) -> t.Optional[t.List[str]]: """List values for an option in a Cylc command. + Args: + command: + The Cylc sub-command. + option: + The --option to list possible values for. + partial: + The part of the command the user is completing. + items: + The CLI context, i.e. everything that has been typed on the CLI + before the partial. + E.G. --flow ['all', 'new', 'none'] """ if option in OPTION_MAP: @@ -341,7 +362,22 @@ async def list_option_values( if not list_option: # do not perform completion for this option return [] - return await list_option(None, partial) + kwargs = {} + if 'tokens_list' in inspect.getfullargspec(list_option).args: + # the function requires information about tokens already specified + # on the CLI + # (e.g. the workflow//cycle/task the command is operating on) + tokens_list = [] + for item in items or []: + # pull out things from the command which look like IDs + if '//' in item: + with suppress(ValueError): + tokens_list.append(Tokens(item)) + continue + with suppress(ValueError): + tokens_list.append(Tokens(item, relative=True)) + kwargs['tokens_list'] = tokens_list + return await list_option(partial, **kwargs) return None @@ -413,7 +449,6 @@ async def list_resources(_partial: str) -> t.List[str]: async def list_dir( - _workflow: t.Optional[str], partial: t.Optional[str] ) -> t.List[str]: """List an arbitrary dir on the filesystem. @@ -460,7 +495,6 @@ def list_rel_dir(path: Path, base: Path) -> t.List[str]: async def list_flows( - _workflow: t.Optional[str], _partial: t.Optional[str] ) -> t.List[str]: """List values for the --flow option.""" @@ -468,13 +502,96 @@ async def list_flows( async def list_colours( - _workflow: t.Optional[str], _partial: t.Optional[str] ) -> t.List[str]: """List values for the --color option.""" return ['never', 'auto', 'always'] +async def list_outputs( + _partial: t.Optional[str], + tokens_list: t.Optional[t.List[Tokens]], +): + """List task outputs.""" + return (await _list_prereqs_and_outputs(tokens_list))[1] + + +async def list_prereqs( + _partial: t.Optional[str], + tokens_list: t.Optional[t.List[Tokens]], +): + """List task prerequisites.""" + return (await _list_prereqs_and_outputs(tokens_list))[0] + ['all'] + + +async def _list_prereqs_and_outputs( + tokens_list: t.Optional[t.List[Tokens]], +) -> t.Tuple[t.List[str], t.List[str]]: + """List task prerequisites and outputs. + + Returns: + tuple - (prereqs, outputs) + + """ + if not tokens_list: + # no context information available on the CLI + # we can't list prereqs/outputs + return ([], []) + + # dynamic import for this relatively unlikely case to avoid slowing down + # server startup unnecessarily + from cylc.flow.network.client_factory import get_client + from cylc.flow.scripts.show import prereqs_and_outputs_query + from types import SimpleNamespace + + workflows: t.Dict[str, t.List[Tokens]] = {} + current_workflow = None + for tokens in tokens_list: + workflow = tokens['workflow'] + task = tokens['task'] + if workflow: + workflows.setdefault(workflow, []) + current_workflow = workflow + if current_workflow and task: + workflows[current_workflow].append(tokens.task) + + clients = {} + for workflow in workflows: + with suppress(CylcError): + clients[workflow] = get_client(workflow) + + if not workflows: + return ([], []) + + json: dict = {} + await asyncio.gather(*( + prereqs_and_outputs_query( + workflow, + workflows[workflow], + pclient, + SimpleNamespace(json=True), + json, + ) + for workflow, pclient in clients.items() + )) + + if not json: + return ([], []) + return ( + [ + f"{cond['taskId']}:{cond['reqState']}" + for value in json.values() + for prerequisite in value['prerequisites'] + for cond in prerequisite['conditions'] + ], + [ + output['label'] + for value in json.values() + for output in value['outputs'] + ], + ) + + # non-exhaustive list of Cylc commands which take non-workflow arguments COMMAND_MAP: t.Dict[str, t.Optional[t.Callable]] = { # register commands which have special positional arguments @@ -513,6 +630,8 @@ async def list_colours( '--flow': list_flows, '--colour': list_colours, '--color': list_colours, + '--out': list_outputs, + '--pre': list_prereqs, # options for which we should not attempt to complete values for '--rm': None, '--run-name': None, @@ -528,7 +647,7 @@ async def list_colours( } -def cli_detokenise(tokens: Tokens) -> str: +def cli_detokenise(tokens: Tokens, relative=False) -> str: """Format tokens for use on the command line. I.E. add the trailing slash[es] onto the end. @@ -536,9 +655,13 @@ def cli_detokenise(tokens: Tokens) -> str: if tokens.is_null: # shouldn't happen but prevents possible error return '' + if relative: + id_ = tokens.relative_id + else: + id_ = tokens.id if tokens.lowest_token == IDTokens.Workflow.value: - return f'{tokens.id}//' - return f'{tokens.id}/' + return f'{id_}//' + return f'{id_}/' def next_token(tokens: Tokens) -> t.Optional[str]: diff --git a/tests/integration/scripts/test_completion_server.py b/tests/integration/scripts/test_completion_server.py new file mode 100644 index 00000000000..24375c31f27 --- /dev/null +++ b/tests/integration/scripts/test_completion_server.py @@ -0,0 +1,207 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Integration tests for the "cylc completion-server command. + +See also the more extensive unit tests for this module. +""" + +from cylc.flow.scripts.completion_server import ( + _list_prereqs_and_outputs, + complete_cylc, +) + + +def setify(coro): + """Cast returned lists to sets for coroutines. + + Convenience function to use when you want to test output not order. + """ + async def _coro(*args, **kwargs): + nonlocal coro + ret = await coro(*args, **kwargs) + if isinstance(ret, list): + return set(ret) + return ret + return _coro + + +async def test_list_prereqs_and_outputs(flow, scheduler, start): + """Test the success cases for listing task prereqs/outputs. + + The error cases are tested in a unit test (doesn't require a running + scheduler). + """ + _complete_cylc = setify(complete_cylc) # Note: results are un-ordered + + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'graph': { + 'P1': ''' + a => b + c => d + b[-P1] => b + ''' + }, + }, + 'runtime': { + 'a': {}, + 'b': { + 'outputs': { + 'foo': 'abc def ghi', + } + } + } + }) + schd = scheduler(id_) + async with start(schd): + await schd.update_data_structure() + b1 = schd.tokens.duplicate(cycle='1', task='b') + d1 = schd.tokens.duplicate(cycle='1', task='d') + e1 = schd.tokens.duplicate(cycle='1', task='e') # does not exist + + # list prereqs (b1) + assert await _complete_cylc('cylc', 'set', b1.id, '--pre', '') == { + # keywords + 'all', + # intra-cycle dependency + '1/a:succeeded', + # inter-cycle dependency + '0/b:succeeded', + } + + # list outputs (b1) + assert await _complete_cylc('cylc', 'set', b1.id, '--out', '') == { + # regular task outputs + 'expired', + 'failed', + 'started', + 'submit-failed', + 'submitted', + 'succeeded', + # custom task outputs + 'foo', + } + + # list prereqs (d1) + assert await _complete_cylc('cylc', 'set', d1.id, '--pre', '') == { + # keywords + 'all', + # d1 prereqs + '1/c:succeeded', + } + + # list prereqs for multiple (b1, d1) + assert await _complete_cylc( + 'cylc', + 'set', + b1.id, + d1.id, + '--pre', + '', + ) == { + # keywords + 'all', + # b1 prereqs + '1/a:succeeded', + '0/b:succeeded', + # d1 prereqs + '1/c:succeeded', + } + + # list prereqs for multiple (b1, d1) - alternative format + assert await _complete_cylc( + 'cylc', + 'set', + f'{schd.id}//', + f'//{b1.relative_id}', + f'//{d1.relative_id}', + '--pre', + '', + ) == { + # keywords + 'all', + # b1 prereqs + '1/a:succeeded', + '0/b:succeeded', + # d1 prereqs + '1/c:succeeded', + } + + # list outputs for a non-existant task + assert await _complete_cylc('cylc', 'set', e1.id, '--out', '') == set() + + # list outputs for a non-existant workflow + assert await _complete_cylc( + 'cylc', + 'set', + # this invalid workflow shouldn't prevent it from returning values + # for the valid one + 'no-such-workflow//', + f'{schd.id}//', + f'//{b1.relative_id}', + f'//{d1.relative_id}', + '--pre', + '', + ) == { + # keywords + 'all', + # b1 prereqs + '1/a:succeeded', + '0/b:succeeded', + # d1 prereqs + '1/c:succeeded', + } + + # start a second workflow to test multi-workflow functionality + id2 = flow({ + 'scheduling': { + 'graph': { + 'R1': ''' + x => z + ''' + } + }, + 'runtime': {'x': {}, 'z': {}}, + }) + schd2 = scheduler(id2) + async with start(schd2): + await schd2.update_data_structure() + z1 = schd2.tokens.duplicate(cycle='1', task='z') + + # list prereqs for multiple tasks in multiple workflows + # (it should combine the results from both workflows) + assert await _complete_cylc( + 'cylc', + 'set', + b1.id, + z1.id, + '--pre', + '', + ) == { + # keywords + 'all', + # workflow1//1/b prereqs + '0/b:succeeded', + '1/a:succeeded', + # workflow2//1/z prereqs + '1/x:succeeded' + } diff --git a/tests/unit/scripts/test_completion_server.py b/tests/unit/scripts/test_completion_server.py index 186e13b7272..5d1c5394e8f 100644 --- a/tests/unit/scripts/test_completion_server.py +++ b/tests/unit/scripts/test_completion_server.py @@ -20,6 +20,7 @@ from cylc.flow.id import Tokens from cylc.flow.network.scan import scan from cylc.flow.scripts.completion_server import ( + _list_prereqs_and_outputs, server, complete_cylc, complete_command, @@ -540,7 +541,7 @@ async def test_list_dir(tmp_path, monkeypatch): # => list $PWD assert { str(path) - for path in await _list_dir(None, None) + for path in await _list_dir(None) } == {'x/'} # no trailing `/` at the end of the path @@ -548,7 +549,7 @@ async def test_list_dir(tmp_path, monkeypatch): # => list the parent assert { str(path) - for path in await _list_dir(None, 'x') + for path in await _list_dir('x') } == {'x/'} # # trailing `/` at the end of the path @@ -556,14 +557,14 @@ async def test_list_dir(tmp_path, monkeypatch): # # => list dir path assert { str(path) - for path in await _list_dir(None, 'x/') + for path in await _list_dir('x/') } == {'x/y/', 'x/z'} # "y" is a dir, "z" is a file # listing a file # => noting to list, just return the file assert { str(path) - for path in await _list_dir(None, 'x/z/') + for path in await _list_dir('x/z/') } == {'x/z'} # --- absolute paths --- @@ -574,7 +575,7 @@ async def test_list_dir(tmp_path, monkeypatch): assert { # '/'.join(path.rsplit('/', 2)[-2:]) path.replace(str(tmp_path), '') - for path in await _list_dir(None, str(tmp_path / 'x')) + for path in await _list_dir(str(tmp_path / 'x')) } == {'/x/'} # trailing `/` at the end of the path @@ -582,14 +583,14 @@ async def test_list_dir(tmp_path, monkeypatch): # => list dir path assert { path.replace(str(tmp_path), '') - for path in await _list_dir(None, str(tmp_path / 'x') + '/') + for path in await _list_dir(str(tmp_path / 'x') + '/') } == {'/x/y/', '/x/z'} # "y" is a dir, "z" is a file # listing a file # => noting to list, just return the file assert { path.replace(str(tmp_path), '') - for path in await _list_dir(None, str(tmp_path / 'x' / 'z') + '/') + for path in await _list_dir(str(tmp_path / 'x' / 'z') + '/') } == {'/x/z'} @@ -599,12 +600,12 @@ async def test_list_flows(): Currently this only provides the textural options i.e. it doesn't list "flows" running in a workflow, yet... """ - assert 'all' in await list_flows(None, None) + assert 'all' in await list_flows(None) async def test_list_colours(): """Test listing values for the --color option.""" - assert 'always' in await list_colours(None, None) + assert 'always' in await list_colours(None) async def test_cli_detokenise(): @@ -715,3 +716,16 @@ def _get_current_completion_script_version(_script, lang): out, err = capsys.readouterr() assert not out # never write to stdout assert not err + + +async def test_prereqs_and_outputs(): + """Test the error cases for listing task prereqs/outputs. + + The succeess cases are tested in an integration test (requires a running + scheduler). + """ + # if no tokens are provided, no prereqs or outputs are returned + assert await _list_prereqs_and_outputs([]) == ([], []) + + # if an invalid workflow is provided, we can't list anything + assert await _list_prereqs_and_outputs([Tokens(workflow='no-such-workflow')]) == ([], [])