diff --git a/CHANGES.md b/CHANGES.md
index 929f845f55d..b8d7b3ac993 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,9 @@ ones in. -->
### Enhancements
+[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active
+runs of the same workflow at install time.
+
[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of
100 for the "default" queue.
diff --git a/cylc/flow/scripts/install.py b/cylc/flow/scripts/install.py
index 761bc7dec23..c157a49d4a8 100755
--- a/cylc/flow/scripts/install.py
+++ b/cylc/flow/scripts/install.py
@@ -80,21 +80,37 @@
multiple workflow run directories that link to the same workflow definition.
"""
+from ansimarkup import ansiprint as cprint
+import asyncio
+from optparse import Values
from pathlib import Path
-from typing import Optional, TYPE_CHECKING, Dict, Any
+from typing import Optional, Dict, Any
+from cylc.flow.scripts.scan import (
+ get_pipe,
+ _format_plain,
+ FLOW_STATE_SYMBOLS,
+ FLOW_STATE_CMAP
+)
from cylc.flow import iter_entry_points
from cylc.flow.exceptions import PluginError, InputError
-from cylc.flow.option_parsers import CylcOptionParser as COP
-from cylc.flow.pathutil import EXPLICIT_RELATIVE_PATH_REGEX, expand_path
+from cylc.flow.loggingutil import CylcLogFormatter
+from cylc.flow.option_parsers import (
+ CylcOptionParser as COP,
+ Options
+)
+from cylc.flow.pathutil import (
+ EXPLICIT_RELATIVE_PATH_REGEX,
+ expand_path,
+ get_workflow_run_dir
+)
from cylc.flow.workflow_files import (
- install_workflow, search_install_source_dirs, parse_cli_sym_dirs
+ install_workflow,
+ parse_cli_sym_dirs,
+ search_install_source_dirs
)
from cylc.flow.terminal import cli_function
-if TYPE_CHECKING:
- from optparse import Values
-
def get_option_parser() -> COP:
parser = COP(
@@ -150,6 +166,16 @@ def get_option_parser() -> COP:
default=False,
dest="no_run_name")
+ parser.add_option(
+ "--no-ping",
+ help=(
+ "When scanning for active instances of the workflow, "
+ "do not attempt to contact the schedulers to get status."
+ ),
+ action="store_true",
+ default=False,
+ dest="no_ping")
+
parser.add_cylc_rose_options()
return parser
@@ -162,7 +188,7 @@ def get_source_location(path: Optional[str]) -> Path:
"""
if path is None:
return Path.cwd()
- path = path.strip()
+ path = str(path).strip()
expanded_path = Path(expand_path(path))
if expanded_path.is_absolute():
return expanded_path
@@ -171,14 +197,79 @@ def get_source_location(path: Optional[str]) -> Path:
return search_install_source_dirs(expanded_path)
+async def scan(wf_name: str, ping: bool = True) -> None:
+ """Print any instances of wf_name that are already active."""
+ opts = Values({
+ 'name': [f'{wf_name}/*'],
+ 'states': {'running', 'paused', 'stopping'},
+ 'source': False,
+ 'ping': ping, # get status of scanned workflows
+ })
+ active = [
+ item async for item in get_pipe(
+ opts, None,
+ scan_dir=get_workflow_run_dir(wf_name) # restricted scan
+ )
+ ]
+ if active:
+ n = len(active)
+ grammar = (
+ ["s", "are", "them all"]
+ if n > 1 else
+ ["", "is", "it"]
+ )
+ print(
+ CylcLogFormatter.COLORS['WARNING'].format(
+ f'NOTE: {n} run%s of "{wf_name}"'
+ ' %s already active:' % tuple(grammar[:2])
+ )
+ )
+ for item in active:
+ if opts.ping:
+ status = item['status']
+ tag = FLOW_STATE_CMAP[status]
+ symbol = f" <{tag}>{FLOW_STATE_SYMBOLS[status]}{tag}>"
+ else:
+ symbol = " "
+ cprint(symbol, _format_plain(item, opts))
+ pattern = (
+ f"'{wf_name}/*'"
+ if n > 1 else
+ f"{item['name']}"
+ )
+ print(
+ f'You can stop %s with:\n cylc stop {pattern}'
+ '\nSee "cylc stop --help" for options.' % grammar[-1]
+ )
+
+
+InstallOptions = Options(get_option_parser())
+
+
@cli_function(get_option_parser)
-def main(parser, opts, reg=None):
- install(parser, opts, reg)
+def main(
+ _parser: COP,
+ opts: 'Values',
+ reg: Optional[str] = None
+) -> None:
+ """CLI wrapper."""
+ install_cli(opts, reg)
-def install(
- parser: COP, opts: 'Values', reg: Optional[str] = None
+def install_cli(
+ opts: 'Values',
+ reg: Optional[str] = None
) -> None:
+ """Install workflow and scan for already-running instances."""
+ wf_name = install(opts, reg)
+ asyncio.run(
+ scan(wf_name, not opts.no_ping)
+ )
+
+
+def install(
+ opts: 'Values', reg: Optional[str] = None
+) -> str:
if opts.no_run_name and opts.run_name:
raise InputError(
"options --no-run-name and --run-name are mutually exclusive."
@@ -204,7 +295,7 @@ def install(
elif opts.symlink_dirs:
cli_symdirs = parse_cli_sym_dirs(opts.symlink_dirs)
- source_dir, rundir, _workflow_name = install_workflow(
+ source_dir, rundir, workflow_name = install_workflow(
source=source,
workflow_name=opts.workflow_name,
run_name=opts.run_name,
@@ -229,3 +320,5 @@ def install(
entry_point.name,
exc
) from None
+
+ return workflow_name
diff --git a/tests/integration/test_install.py b/tests/integration/test_install.py
new file mode 100644
index 00000000000..22ff88a55ba
--- /dev/null
+++ b/tests/integration/test_install.py
@@ -0,0 +1,150 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Test cylc install."""
+
+import pytest
+from pathlib import Path
+
+from .test_scan import init_flows
+
+from cylc.flow.async_util import pipe
+from cylc.flow.scripts import scan
+from cylc.flow.workflow_files import WorkflowFiles
+from cylc.flow.scripts.install import (
+ InstallOptions,
+ install_cli
+)
+
+from typing import Callable, Tuple
+
+SRV_DIR = Path(WorkflowFiles.Service.DIRNAME)
+CONTACT = Path(WorkflowFiles.Service.CONTACT)
+RUN_N = Path(WorkflowFiles.RUN_N)
+INSTALL = Path(WorkflowFiles.Install.DIRNAME)
+
+INSTALLED_MSG = "INSTALLED {wfrun} from"
+WF_ACTIVE_MSG = '1 run of "{wf}" is already active:'
+BAD_CONTACT_MSG = "Bad contact file:"
+
+
+@pytest.fixture()
+def patch_graphql_query(
+ monkeypatch: pytest.MonkeyPatch
+):
+ # Define a mocked graphql_query pipe function.
+ @pipe
+ async def _graphql_query(flow, fields, filters=None):
+ flow.update({"status": "running"})
+ return flow
+
+ # Swap out the function that cylc.flow.scripts.scan.
+ monkeypatch.setattr(
+ 'cylc.flow.scripts.scan.graphql_query',
+ _graphql_query,
+ )
+
+
+@pytest.fixture()
+def src_run_dirs(
+ mock_glbl_cfg: Callable,
+ monkeypatch: pytest.MonkeyPatch,
+ tmp_path: Path
+) -> Tuple[Path, Path]:
+ """Create some workflow source and run dirs for testing.
+
+ Source dirs:
+ /w1
+ /w2
+
+ Run dir:
+ /w1/run1
+
+ """
+ tmp_src_path = tmp_path / 'cylc-src'
+ tmp_run_path = tmp_path / 'cylc-run'
+ tmp_src_path.mkdir()
+ tmp_run_path.mkdir()
+
+ init_flows(
+ tmp_run_path=tmp_run_path,
+ running=('w1/run1',),
+ tmp_src_path=tmp_src_path,
+ src=('w1', 'w2')
+ )
+ mock_glbl_cfg(
+ 'cylc.flow.workflow_files.glbl_cfg',
+ f'''
+ [install]
+ source dirs = {tmp_src_path}
+ '''
+ )
+ monkeypatch.setattr('cylc.flow.pathutil._CYLC_RUN_DIR', tmp_run_path)
+
+ return tmp_src_path, tmp_run_path
+
+
+def test_install_scan_no_ping(
+ src_run_dirs: Callable,
+ capsys: pytest.CaptureFixture,
+ caplog: pytest.LogCaptureFixture
+) -> None:
+ """At install, running intances should be reported.
+
+ Ping = False case: don't query schedulers.
+ """
+
+ opts = InstallOptions()
+ opts.no_ping = True
+
+ install_cli(opts, reg='w1')
+ out = capsys.readouterr().out
+ assert INSTALLED_MSG.format(wfrun='w1/run2') in out
+ assert WF_ACTIVE_MSG.format(wf='w1') in out
+ # Empty contact file faked with "touch":
+ assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text
+
+ install_cli(opts, reg='w2')
+ out = capsys.readouterr().out
+ assert WF_ACTIVE_MSG.format(wf='w2') not in out
+ assert INSTALLED_MSG.format(wfrun='w2/run1') in out
+
+
+def test_install_scan_ping(
+ src_run_dirs: Callable,
+ capsys: pytest.CaptureFixture,
+ caplog: pytest.LogCaptureFixture,
+ patch_graphql_query: Callable
+) -> None:
+ """At install, running intances should be reported.
+
+ Ping = True case: but mock scan's scheduler query method.
+ """
+ opts = InstallOptions()
+ opts.no_ping = False
+
+ install_cli(opts, reg='w1')
+ out = capsys.readouterr().out
+ assert INSTALLED_MSG.format(wfrun='w1/run2') in out
+ assert WF_ACTIVE_MSG.format(wf='w1') in out
+ assert scan.FLOW_STATE_SYMBOLS["running"] in out
+ # Empty contact file faked with "touch":
+ assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text
+
+ install_cli(opts, reg='w2')
+ out = capsys.readouterr().out
+ assert INSTALLED_MSG.format(wfrun='w2/run1') in out
+ assert WF_ACTIVE_MSG.format(wf='w2') not in out
diff --git a/tests/integration/test_scan.py b/tests/integration/test_scan.py
index 79d8ac01475..f4c6837067d 100644
--- a/tests/integration/test_scan.py
+++ b/tests/integration/test_scan.py
@@ -42,17 +42,20 @@
INSTALL = Path(WorkflowFiles.Install.DIRNAME)
-def init_flows(tmp_path, running=None, registered=None, un_registered=None):
+def init_flows(tmp_run_path=None, running=None, registered=None,
+ un_registered=None, tmp_src_path=None, src=None):
"""Create some dummy workflows for scan to discover.
Assume "run1, run2, ..., runN" structure if flow name constains "run".
+ Optionally create workflow source dirs in a give location too.
+
"""
def make_registered(name, running=False):
- run_d = Path(tmp_path, name)
+ run_d = Path(tmp_run_path, name)
run_d.mkdir(parents=True, exist_ok=True)
(run_d / "flow.cylc").touch()
if "run" in name:
- root = Path(tmp_path, name).parent
+ root = Path(tmp_run_path, name).parent
with suppress(FileExistsError):
(root / "runN").symlink_to(run_d, target_is_directory=True)
else:
@@ -63,12 +66,19 @@ def make_registered(name, running=False):
if running:
(srv_d / CONTACT).touch()
+ def make_src(name):
+ src_d = Path(tmp_src_path, name)
+ src_d.mkdir(parents=True, exist_ok=True)
+ (src_d / "flow.cylc").touch()
+
for name in (running or []):
make_registered(name, running=True)
for name in (registered or []):
make_registered(name)
for name in (un_registered or []):
- Path(tmp_path, name).mkdir(parents=True, exist_ok=True)
+ Path(tmp_run_path, name).mkdir(parents=True, exist_ok=True)
+ for name in (src or []):
+ make_src(name)
@pytest.fixture(scope='session')
@@ -157,14 +167,14 @@ def source_dirs(mock_glbl_cfg):
src1 = src / '1'
src1.mkdir()
init_flows(
- src1,
- registered=('a', 'b/c')
+ tmp_src_path=src1,
+ src=('a', 'b/c')
)
src2 = src / '2'
src2.mkdir()
init_flows(
- src2,
- registered=('d', 'e/f')
+ tmp_src_path=src2,
+ src=('d', 'e/f')
)
mock_glbl_cfg(
'cylc.flow.scripts.scan.glbl_cfg',