Skip to content

Commit

Permalink
Implement Skip Mode
Browse files Browse the repository at this point in the history
* Add `[runtime][<namespace>]run mode` and `[runtime][<namespace>][skip]`.
* Spin run mode functionality into separate modules.
* Run sim mode check with every main loop - we don't know if any tasks are
  in sim mode from the scheduler, but it doesn't cost much to check
  if none are.
* Implemented separate job "submission" pathway switching.
* Implemented skip mode, including output control logic.
* Add a linter and a validation check for tasks in nonlive modes,
  and for combinations of outputs
* Enabled setting outputs as if task ran in skip mode using
  `cylc set --out skip`.
* Testing for the above.
  • Loading branch information
wxtim committed May 2, 2024
1 parent 90a0ca2 commit 19faa47
Show file tree
Hide file tree
Showing 51 changed files with 1,436 additions and 233 deletions.
1 change: 1 addition & 0 deletions changes.d/6039.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow setting of run mode on a task by task basis. Add a new mode "skip".
53 changes: 52 additions & 1 deletion cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,22 @@ def get_script_common_text(this: str, example: Optional[str] = None):
"[platforms][<platform name>]submission retry delays"
)
)
Conf(
'run mode', VDR.V_STRING,
options=['workflow', 'simulation', 'dummy', 'live', 'skip'],
default='workflow',
desc='''
Override the workflow's run mode.
By default workflows run in "live mode" - tasks run
in the way defined by the runtime config.
This setting allows individual tasks to be run using
a different run mode.
.. TODO: Reference updated documention.
.. versionadded:: 8.4.0
''')
with Conf('meta', desc=r'''
Metadata for the task or task family.
Expand Down Expand Up @@ -1404,9 +1420,44 @@ def get_script_common_text(this: str, example: Optional[str] = None):
determine how an event handler responds to task failure
events.
''')
with Conf('skip', desc='''
Task configuration for task :ref:`SkipMode`.
For a full description of skip run mode see
:ref:`SkipMode`.
.. versionadded:: 8.4.0
'''):
Conf(
'outputs',
VDR.V_STRING_LIST,
desc='''
Outputs to be emitted by a task in skip mode.
By default started, submitted, succeeded and all
required outputs will be emitted.
If outputs are specified, but neither succeeded or
failed are specified, succeeded will automatically be
emitted.
.. versionadded:: 8.4.0
'''
)
Conf(
'disable task event handlers',
VDR.V_BOOLEAN,
default=True,
desc='''
Task event handlers are turned off by default for
skip mode tasks. Changing this setting to ``False``
will re-enable task event handlers.
.. versionadded:: 8.4.0
'''
)
with Conf('simulation', desc='''
Task configuration for workflow *simulation* and *dummy* run
Task configuration for *simulation* and *dummy* run
modes.
For a full description of simulation and dummy run modes see
Expand Down
9 changes: 4 additions & 5 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
from cylc.flow.print_tree import print_tree
from cylc.flow.task_qualifiers import ALT_QUALIFIERS
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.run_modes.nonlive import mode_validate_checks
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
EventData,
Expand Down Expand Up @@ -114,7 +115,7 @@
WorkflowFiles,
check_deprecation,
)
from cylc.flow.workflow_status import RunMode
from cylc.flow.task_state import RunMode
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
Expand Down Expand Up @@ -508,10 +509,6 @@ def __init__(

self.process_runahead_limit()

run_mode = self.run_mode()
if run_mode in {RunMode.SIMULATION, RunMode.DUMMY}:
configure_sim_modes(self.taskdefs.values(), run_mode)

self.configure_workflow_state_polling_tasks()

self._check_task_event_handlers()
Expand Down Expand Up @@ -562,6 +559,8 @@ def __init__(

self.mem_log("config.py: end init config")

mode_validate_checks(self.taskdefs)

@staticmethod
def _warn_if_queues_have_implicit_tasks(
config, taskdefs, max_warning_lines
Expand Down
121 changes: 121 additions & 0 deletions cylc/flow/run_modes/dummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities supporting dummy mode.
"""

from logging import INFO
from typing import TYPE_CHECKING, Any, Dict, Tuple

from cylc.flow.task_outputs import TASK_OUTPUT_SUBMITTED
from cylc.flow.run_modes.simulation import (
ModeSettings,
disable_platforms,
get_simulated_run_len,
parse_fail_cycle_points
)
from cylc.flow.task_state import RunMode
from cylc.flow.platforms import get_platform


if TYPE_CHECKING:
from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_proxy import TaskProxy
from typing_extensions import Literal


def submit_task_job(
task_job_mgr: 'TaskJobManager',
itask: 'TaskProxy',
rtconfig: Dict[str, Any],
workflow: str,
now: Tuple[float, str]
) -> 'Literal[False]':
"""Submit a task in dummy mode.
Returns:
False - indicating that TaskJobManager needs to continue running the
live mode path.
"""
configure_dummy_mode(
rtconfig, itask.tdef.rtconfig['simulation']['fail cycle points'])

itask.summary['started_time'] = now[0]
task_job_mgr._set_retry_timers(itask, rtconfig)
itask.mode_settings = ModeSettings(
itask,
task_job_mgr.workflow_db_mgr,
rtconfig
)

itask.waiting_on_job_prep = False
itask.submit_num += 1

itask.platform = get_platform()
itask.platform['name'] = RunMode.DUMMY
itask.summary['job_runner_name'] = RunMode.DUMMY
itask.summary[task_job_mgr.KEY_EXECUTE_TIME_LIMIT] = (
itask.mode_settings.simulated_run_length)
itask.jobs.append(
task_job_mgr.get_simulation_job_conf(itask, workflow))
task_job_mgr.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED)
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
'try_num': itask.get_try_num(),
}
)
return False


def configure_dummy_mode(rtc, fallback):
"""Adjust task defs for simulation and dummy mode.
"""
rtc['submission retry delays'] = [1]
# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, get_simulated_run_len(rtc))
disable_platforms(rtc)
# Disable environment, in case it depends on env-script.
rtc['environment'] = {}
rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"], fallback
)


def build_dummy_script(rtc: Dict[str, Any], sleep_sec: int) -> str:
"""Create fake scripting for dummy mode.
This is for Dummy mode only.
"""
script = "sleep %d" % sleep_sec
# Dummy message outputs.
for msg in rtc['outputs'].values():
script += "\ncylc message '%s'" % msg
if rtc['simulation']['fail try 1 only']:
arg1 = "true"
else:
arg1 = "false"
arg2 = " ".join(rtc['simulation']['fail cycle points'])
script += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
return script
59 changes: 59 additions & 0 deletions cylc/flow/run_modes/nonlive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities supporting all nonlive modes
"""
from typing import TYPE_CHECKING, Dict, List

from cylc.flow import LOG
from cylc.flow.run_modes.skip import check_task_skip_config
from cylc.flow.task_state import RunMode

if TYPE_CHECKING:
from cylc.flow.taskdefs import TaskDefs


def mode_validate_checks(taskdefs: 'Dict[str, TaskDefs]'):
"""Warn user if any tasks has "run mode" set to a non-live value.
Additionally, run specific checks for each mode's config settings.
"""
warn_nonlive: Dict[str, List[str]] = {
RunMode.SKIP: [],
RunMode.SIMULATION: [],
RunMode.DUMMY: [],
}

# Run through taskdefs looking for those with nonlive modes
for taskname, taskdef in taskdefs.items():
# Add to list of tasks to be run in non-live modes:
if (
taskdef.rtconfig.get('run mode', 'workflow')
not in [RunMode.LIVE, 'workflow']
):
warn_nonlive[taskdef.rtconfig['run mode']].append(taskname)

# Run any mode specific validation checks:
check_task_skip_config(taskname, taskdef)

# Assemble warning message about any tasks in nonlive mode.
if any(warn_nonlive.values()):
message = 'The following tasks are set to run in non-live mode:'
for mode, tasknames in warn_nonlive.items():
if tasknames:
message += f'\n{mode} mode:'
for taskname in tasknames:
message += f'\n * {taskname}'
LOG.warning(message)
Loading

0 comments on commit 19faa47

Please sign in to comment.