Skip to content

Commit

Permalink
Merge pull request #6472 from MetRonnie/cylc-remove
Browse files Browse the repository at this point in the history
Implement `cylc remove` proposal
  • Loading branch information
oliver-sanders authored Nov 28, 2024
2 parents 354c7d5 + 7d91a99 commit 9ff50f8
Show file tree
Hide file tree
Showing 75 changed files with 2,163 additions and 786 deletions.
5 changes: 5 additions & 0 deletions changes.d/6472.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
`cylc remove` improvements:
- It can now remove tasks that are no longer active, making it look like they never ran.
- Removing a submitted/running task will kill it.
- Added the `--flow` option.
- Removed tasks are now demoted to `flow=none` but retained in the workflow database for provenance.
30 changes: 26 additions & 4 deletions cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,34 @@
)

from cylc.flow.exceptions import InputError
from cylc.flow.id import IDTokens, Tokens
from cylc.flow.flow_mgr import (
FLOW_ALL,
FLOW_NEW,
FLOW_NONE,
)
from cylc.flow.id import (
IDTokens,
Tokens,
)
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE


ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_VAL = (
f"Flow values must be integers, or '{FLOW_ALL}', '{FLOW_NEW}', "
f"or '{FLOW_NONE}'"
)
ERR_OPT_FLOW_VAL_2 = f"Flow values must be integers, or '{FLOW_ALL}'"
ERR_OPT_FLOW_COMBINE = "Cannot combine --flow={0} with other flow values"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def flow_opts(flows: List[str], flow_wait: bool) -> None:
def flow_opts(
flows: List[str],
flow_wait: bool,
allow_new_or_none: bool = True
) -> None:
"""Check validity of flow-related CLI options.
Note the schema defaults flows to [].
Expand All @@ -63,16 +78,23 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None:
cylc.flow.exceptions.InputError: --wait is not compatible with
--flow=new or --flow=none
>>> flow_opts(["new"], False, allow_new_or_none=False)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... must be integers, or 'all'
"""
if not flows:
return

flows = [val.strip() for val in flows]

for val in flows:
val = val.strip()
if val in {FLOW_NONE, FLOW_NEW, FLOW_ALL}:
if len(flows) != 1:
raise InputError(ERR_OPT_FLOW_COMBINE.format(val))
if not allow_new_or_none and val in {FLOW_NEW, FLOW_NONE}:
raise InputError(ERR_OPT_FLOW_VAL_2)
else:
try:
int(val)
Expand Down
43 changes: 27 additions & 16 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,24 @@
"""

from contextlib import suppress
from time import sleep, time
from time import (
sleep,
time,
)
from typing import (
TYPE_CHECKING,
AsyncGenerator,
Callable,
Dict,
Iterable,
List,
Optional,
TYPE_CHECKING,
TypeVar,
Union,
)

from metomi.isodatetime.parsers import TimePointParser

from cylc.flow import LOG
import cylc.flow.command_validation as validate
from cylc.flow.exceptions import (
Expand All @@ -73,38 +79,40 @@
CylcConfigError,
)
import cylc.flow.flags
from cylc.flow.flow_mgr import get_flow_nums_set
from cylc.flow.log_level import log_level_to_verbosity
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.task_id import TaskID
from cylc.flow.workflow_status import RunMode, StopMode
from cylc.flow.workflow_status import (
RunMode,
StopMode,
)

from metomi.isodatetime.parsers import TimePointParser

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler

# define a type for command implementations
Command = Callable[
...,
AsyncGenerator,
]
Command = Callable[..., AsyncGenerator]
# define a generic type needed for the @_command decorator
_TCommand = TypeVar('_TCommand', bound=Command)

# a directory of registered commands (populated on module import)
COMMANDS: 'Dict[str, Command]' = {}


def _command(name: str):
"""Decorator to register a command."""
def _command(fcn: 'Command'):
def _command(fcn: '_TCommand') -> '_TCommand':
nonlocal name
COMMANDS[name] = fcn
fcn.command_name = name # type: ignore
fcn.command_name = name # type: ignore[attr-defined]
return fcn
return _command


async def run_cmd(fcn, *args, **kwargs):
async def run_cmd(bound_fcn: AsyncGenerator):
"""Run a command outside of the scheduler's main loop.
Normally commands are run via the Scheduler's command_queue (which is
Expand All @@ -119,10 +127,9 @@ async def run_cmd(fcn, *args, **kwargs):
For these purposes use "run_cmd", otherwise, queue commands via the
scheduler as normal.
"""
cmd = fcn(*args, **kwargs)
await cmd.__anext__() # validate
await bound_fcn.__anext__() # validate
with suppress(StopAsyncIteration):
return await cmd.__anext__() # run
return await bound_fcn.__anext__() # run


@_command('set')
Expand Down Expand Up @@ -310,11 +317,15 @@ async def set_verbosity(schd: 'Scheduler', level: Union[int, str]):


@_command('remove_tasks')
async def remove_tasks(schd: 'Scheduler', tasks: Iterable[str]):
async def remove_tasks(
schd: 'Scheduler', tasks: Iterable[str], flow: List[str]
):
"""Remove tasks."""
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait=False, allow_new_or_none=False)
yield
yield schd.pool.remove_tasks(tasks)
flow_nums = get_flow_nums_set(flow)
schd.remove_tasks(tasks, flow_nums)


@_command('reload_workflow')
Expand Down
36 changes: 28 additions & 8 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2356,22 +2356,42 @@ def delta_task_held(
self.updates_pending = True

def delta_task_flow_nums(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy flow_nums.
"""Create delta for change in task proxy flow numbers.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
itask: TaskProxy with updated flow numbers.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
self._delta_task_flow_nums(tp_id, itask.flow_nums)

def delta_remove_task_flow_nums(
self, task: str, removed: 'FlowNums'
) -> None:
"""Create delta for removal of flow numbers from a task proxy.
Args:
task: Relative ID of task.
removed: Flow numbers to remove from the task proxy in the
data store.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(
Tokens(task, relative=True).duplicate(**self.id_)
)
if not tproxy:
return
new_flow_nums = deserialise_set(tproxy.flow_nums).difference(removed)
self._delta_task_flow_nums(tp_id, new_flow_nums)

def _delta_task_flow_nums(self, tp_id: str, flow_nums: 'FlowNums') -> None:
tp_delta: PbTaskProxy = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id)
)
tp_delta.stamp = f'{tp_id}@{time()}'
tp_delta.flow_nums = serialise_set(itask.flow_nums)
tp_delta.flow_nums = serialise_set(flow_nums)
self.updates_pending = True

def delta_task_output(
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
IntegerPoint,
IntegerInterval
)
from cylc.flow.flow_mgr import stringify_flow_nums
from cylc.flow.flow_mgr import repr_flow_nums
from cylc.flow.pathutil import expand_path
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.task_outputs import (
Expand Down Expand Up @@ -318,7 +318,7 @@ def workflow_state_query(
if flow_num is not None and flow_num not in flow_nums:
# skip result, wrong flow
continue
fstr = stringify_flow_nums(flow_nums)
fstr = repr_flow_nums(flow_nums)
if fstr:
res.append(fstr)
db_res.append(res)
Expand Down
65 changes: 49 additions & 16 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

"""Manage flow counter and flow metadata."""

from typing import Dict, Set, Optional, TYPE_CHECKING
import datetime
from typing import (
TYPE_CHECKING,
Dict,
Iterable,
List,
Optional,
Set,
)

from cylc.flow import LOG

Expand Down Expand Up @@ -55,36 +62,62 @@ def add_flow_opts(parser):
)


def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers
def get_flow_nums_set(flow: List[str]) -> FlowNums:
"""Return set of integer flow numbers from list of strings.
Return:
- "none" for no flow
- "" for the original flow (flows only matter if there are several)
- otherwise e.g. "(flow=1,2,3)"
Returns an empty set if the input is empty or contains only "all".
>>> get_flow_nums_set(["1", "2", "3"])
{1, 2, 3}
>>> get_flow_nums_set([])
set()
>>> get_flow_nums_set(["all"])
set()
"""
if flow == [FLOW_ALL]:
return set()
return {int(val.strip()) for val in flow}


def stringify_flow_nums(flow_nums: Iterable[int]) -> str:
"""Return the canonical string for a set of flow numbers.
Examples:
>>> stringify_flow_nums({1})
'1'
>>> stringify_flow_nums({3, 1, 2})
'1,2,3'
>>> stringify_flow_nums({})
''
"""
return ','.join(str(i) for i in sorted(flow_nums))


def repr_flow_nums(flow_nums: FlowNums, full: bool = False) -> str:
"""Return a representation of a set of flow numbers
If `full` is False, return an empty string for flows=1.
Examples:
>>> repr_flow_nums({})
'(flows=none)'
>>> stringify_flow_nums({1})
>>> repr_flow_nums({1})
''
>>> stringify_flow_nums({1}, True)
>>> repr_flow_nums({1}, full=True)
'(flows=1)'
>>> stringify_flow_nums({1,2,3})
>>> repr_flow_nums({1,2,3})
'(flows=1,2,3)'
"""
if not full and flow_nums == {1}:
return ""
else:
return (
"(flows="
f"{','.join(str(i) for i in flow_nums) or 'none'}"
")"
)
return f"(flows={stringify_flow_nums(flow_nums) or 'none'})"


class FlowMgr:
Expand Down
9 changes: 7 additions & 2 deletions cylc/flow/id.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from enum import Enum
import re
from typing import (
TYPE_CHECKING,
Iterable,
List,
Optional,
Expand All @@ -33,6 +34,10 @@
from cylc.flow import LOG


if TYPE_CHECKING:
from cylc.flow.cycling import PointBase


class IDTokens(Enum):
"""Cylc object identifier tokens."""

Expand Down Expand Up @@ -524,14 +529,14 @@ def duplicate(
)


def quick_relative_detokenise(cycle, task):
def quick_relative_id(cycle: Union[str, int, 'PointBase'], task: str) -> str:
"""Generate a relative ID for a task.
This is a more efficient solution to `Tokens` for cases where
you only want the ID string and don't have any use for a Tokens object.
Example:
>>> q = quick_relative_detokenise
>>> q = quick_relative_id
>>> q('1', 'a') == Tokens(cycle='1', task='a').relative_id
True
Expand Down
Loading

0 comments on commit 9ff50f8

Please sign in to comment.