Skip to content

Commit

Permalink
Prevent commands which take Tasks IDs taking Job IDs. (#6130)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim authored Jun 24, 2024
1 parent 62527e0 commit d715aff
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
1 change: 1 addition & 0 deletions changes.d/6130.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent commands accepting job IDs where it doesn't make sense.
35 changes: 34 additions & 1 deletion cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@


from typing import (
Iterable,
List,
Optional,
)

from cylc.flow.exceptions import InputError
from cylc.flow.id import Tokens
from cylc.flow.id import IDTokens, Tokens
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE

Expand Down Expand Up @@ -228,3 +229,35 @@ def consistency(
"""
if outputs and prereqs:
raise InputError("Use --prerequisite or --output, not both.")


def is_tasks(tasks: Iterable[str]):
"""All tasks in a list of tasks are task ID's without trailing job ID.
Examples:
# All legal
>>> is_tasks(['1/foo', '1/bar', '*/baz', '*/*'])
# Some legal
>>> is_tasks(['1/foo/NN', '1/bar', '*/baz', '*/*/42'])
Traceback (most recent call last):
...
cylc.flow.exceptions.InputError: This command does not take job ids:
* 1/foo/NN
* */*/42
# None legal
>>> is_tasks(['*/baz/12'])
Traceback (most recent call last):
...
cylc.flow.exceptions.InputError: This command does not take job ids:
* */baz/12
"""
bad_tasks: List[str] = []
for task in tasks:
tokens = Tokens('//' + task)
if tokens.lowest_token == IDTokens.Job.value:
bad_tasks.append(task)
if bad_tasks:
msg = 'This command does not take job ids:\n * '
raise InputError(msg + '\n * '.join(bad_tasks))
9 changes: 9 additions & 0 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def set_prereqs_and_outputs(
outputs = validate.outputs(outputs)
prerequisites = validate.prereqs(prerequisites)
validate.flow_opts(flow, flow_wait)
validate.is_tasks(tasks)

yield

Expand Down Expand Up @@ -172,6 +173,8 @@ async def stop(
task: Optional[str] = None,
flow_num: Optional[int] = None,
):
if task:
validate.is_tasks([task])
yield
if flow_num:
schd.pool.stop_flow(flow_num)
Expand Down Expand Up @@ -214,6 +217,7 @@ async def stop(
@_command('release')
async def release(schd: 'Scheduler', tasks: Iterable[str]):
"""Release held tasks."""
validate.is_tasks(tasks)
yield
yield schd.pool.release_held_tasks(tasks)

Expand All @@ -237,6 +241,7 @@ async def resume(schd: 'Scheduler'):
@_command('poll_tasks')
async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Poll pollable tasks or a task or family if options are provided."""
validate.is_tasks(tasks)
yield
if schd.get_run_mode() == RunMode.SIMULATION:
yield 0
Expand All @@ -248,6 +253,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
@_command('kill_tasks')
async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Kill all tasks or a task/family if options are provided."""
validate.is_tasks(tasks)
yield
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
if schd.get_run_mode() == RunMode.SIMULATION:
Expand All @@ -264,6 +270,7 @@ async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
@_command('hold')
async def hold(schd: 'Scheduler', tasks: Iterable[str]):
"""Hold specified tasks."""
validate.is_tasks(tasks)
yield
yield schd.pool.hold_tasks(tasks)

Expand Down Expand Up @@ -304,6 +311,7 @@ async def set_verbosity(schd: 'Scheduler', level: Union[int, str]):
@_command('remove_tasks')
async def remove_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Remove tasks."""
validate.is_tasks(tasks)
yield
yield schd.pool.remove_tasks(tasks)

Expand Down Expand Up @@ -430,5 +438,6 @@ async def force_trigger_tasks(
flow_descr: Optional[str] = None,
):
"""Manual task trigger."""
validate.is_tasks(tasks)
yield
yield schd.pool.force_trigger_tasks(tasks, flow, flow_wait, flow_descr)

0 comments on commit d715aff

Please sign in to comment.