Skip to content

Commit

Permalink
Fix workflow_state xtrigger back-compat & add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jun 5, 2024
1 parent 8e4150f commit cb809ad
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 56 deletions.
93 changes: 58 additions & 35 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import os
import sqlite3
import sys
from typing import Optional, List
from textwrap import dedent
from typing import Dict, Iterable, Optional, List, Union

from cylc.flow import LOG
from cylc.flow.exceptions import InputError
from cylc.flow.cycling.util import add_offset
from cylc.flow.cycling.integer import (
Expand All @@ -33,13 +33,20 @@
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED
TASK_OUTPUT_FAILED,
TASK_OUTPUT_FINISHED,
)
from cylc.flow.util import deserialise_set
from metomi.isodatetime.parsers import TimePointParser
from metomi.isodatetime.exceptions import ISO8601SyntaxError


output_fallback_msg = (
"Unable to filter by task output label for tasks run in Cylc versions "
"between 8.0.0-8.3.0. Falling back to filtering by task message instead."
)


class CylcWorkflowDBChecker:
"""Object for querying task status or outputs from a workflow database.
Expand Down Expand Up @@ -70,12 +77,12 @@ def __init__(self, rund, workflow, db_path=None):
# Get workflow point format.
try:
self.db_point_fmt = self._get_db_point_format()
self.back_compat_mode = False
self.c7_back_compat_mode = False
except sqlite3.OperationalError as exc:
# BACK COMPAT: Cylc 7 DB (see method below).
try:
self.db_point_fmt = self._get_db_point_format_compat()
self.back_compat_mode = True
self.c7_back_compat_mode = True
except sqlite3.OperationalError:
raise exc # original error

Expand Down Expand Up @@ -194,7 +201,7 @@ def workflow_state_query(
]
For an output query:
[
[name, cycle, "[out1: msg1, out2: msg2, ...]"],
[name, cycle, "{out1: msg1, out2: msg2, ...}"],
...
]
"""
Expand All @@ -208,16 +215,16 @@ def workflow_state_query(
target_table = CylcWorkflowDAO.TABLE_TASK_STATES
mask = "name, cycle, status"

if not self.back_compat_mode:
if not self.c7_back_compat_mode:
# Cylc 8 DBs only
mask += ", flow_nums"

stmt = dedent(rf'''
stmt = rf'''
SELECT
{mask}
FROM
{target_table}
''') # nosec
''' # nosec
# * mask is hardcoded
# * target_table is a code constant

Expand All @@ -241,20 +248,20 @@ def workflow_state_query(
stmt_wheres.append("cycle==?")
stmt_args.append(cycle)

if selector is not None and not (is_output or is_message):
if (
selector is not None
and target_table == CylcWorkflowDAO.TABLE_TASK_STATES
):
# Can select by status in the DB but not outputs.
stmt_wheres.append("status==?")
stmt_args.append(selector)

if stmt_wheres:
stmt += "WHERE\n " + (" AND ").join(stmt_wheres)

if not (is_output or is_message):
if target_table == CylcWorkflowDAO.TABLE_TASK_STATES:
# (outputs table doesn't record submit number)
stmt += dedent("""
ORDER BY
submit_num
""")
stmt += r"ORDER BY submit_num"

# Query the DB and drop incompatible rows.
db_res = []
Expand All @@ -264,7 +271,7 @@ def workflow_state_query(
if row[2] is None:
# status can be None in Cylc 7 DBs
continue
if not self.back_compat_mode:
if not self.c7_back_compat_mode:
flow_nums = deserialise_set(row[3])
if flow_num is not None and flow_num not in flow_nums:
# skip result, wrong flow
Expand All @@ -274,34 +281,50 @@ def workflow_state_query(
res.append(fstr)
db_res.append(res)

if not (is_output or is_message):
if target_table == CylcWorkflowDAO.TABLE_TASK_STATES:
return db_res

warn_output_fallback = is_output
results = []
for row in db_res:
outputs_map = json.loads(row[2])
if is_message:
# task message
try:
outputs = list(outputs_map.values())
except AttributeError:
# Cylc 8 pre 8.3.0 back-compat: list of output messages
outputs = list(outputs_map)
outputs: Union[Dict[str, str], List[str]] = json.loads(row[2])
if isinstance(outputs, dict):
messages: Iterable[str] = outputs.values()
else:
# task output
outputs = list(outputs_map)
# Cylc 8 pre 8.3.0 back-compat: list of output messages
messages = outputs
if warn_output_fallback:
LOG.warning(output_fallback_msg)
warn_output_fallback = False

if (
selector is None or
selector in outputs or
(
selector in ("finished", "finish")
and (
TASK_OUTPUT_SUCCEEDED in outputs
or TASK_OUTPUT_FAILED in outputs
)
)
(is_message and selector in messages) or
(is_output and self._selector_in_outputs(selector, outputs))
):
results.append(row[:2] + [str(outputs)] + row[3:])

return results

@staticmethod
def _selector_in_outputs(selector: str, outputs: Iterable[str]) -> bool:
"""Check if a selector, including "finished", is in the outputs.
Examples:
>>> this = CylcWorkflowDBChecker._selector_in_outputs
>>> this('moop', ['started', 'moop'])
True
>>> this('moop', ['started'])
False
>>> this('finished', ['succeeded'])
True
>>> this('finish', ['failed'])
True
"""
return selector in outputs or (
selector in (TASK_OUTPUT_FINISHED, "finish")
and (
TASK_OUTPUT_SUCCEEDED in outputs
or TASK_OUTPUT_FAILED in outputs
)
)
23 changes: 23 additions & 0 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,27 @@
INTERVAL = 5


def unquote(s: str) -> str:
"""Remove leading & trailing quotes from a string.
Examples:
>>> unquote('"foo"')
'foo'
>>> unquote("'foo'")
'foo'
>>> unquote('foo')
'foo'
>>> unquote("'tis a fine morning")
"'tis a fine morning"
"""
if (
s.startswith('"') and s.endswith('"')
or s.startswith("'") and s.endswith("'")
):
return s[1:-1]
return s


class WorkflowPoller(Poller):
"""An object that polls for task states or outputs in a workflow DB."""

Expand All @@ -137,6 +158,8 @@ def __init__(
tokens = Tokens(self.id_)
self.workflow_id_raw = tokens.workflow_id
self.task_sel = tokens["task_sel"] or default_status
if self.task_sel:
self.task_sel = unquote(self.task_sel)
self.cycle_raw = tokens["cycle"]
self.task = tokens["task"]

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def put_insert_task_outputs(self, itask):
itask,
{
"flow_nums": serialise_set(itask.flow_nums),
"outputs": json.dumps([])
"outputs": json.dumps({})
}
)

Expand Down
6 changes: 3 additions & 3 deletions tests/functional/workflow-state/08-integer.t
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ TEST_NAME="${TEST_NAME_BASE}_check_1_outputs"
run_ok "${TEST_NAME}" cylc workflow-state --max-polls=1 --output "${WORKFLOW_NAME}"

contains_ok "${TEST_NAME}.stdout" <<__END__
1/foo:['submitted', 'started', 'succeeded', 'x']
2/foo:[]
1/bar:['submitted', 'started', 'succeeded']
1/foo:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded', 'x': 'hello'}
2/foo:{}
1/bar:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded'}
__END__

TEST_NAME="${TEST_NAME_BASE}_poll_fail"
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/workflow-state/09-datetime.t
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ TEST_NAME="${TEST_NAME_BASE}_check_1_outputs"
run_ok "${TEST_NAME}" cylc workflow-state --max-polls=1 --output "${WORKFLOW_NAME}"

contains_ok "${TEST_NAME}.stdout" <<__END__
2051/foo:['submitted', 'started', 'succeeded', 'x']
2052/foo:[]
2051/bar:['submitted', 'started', 'succeeded']
2051/foo:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded', 'x': 'hello'}
2052/foo:{}
2051/bar:{'submitted': 'submitted', 'started': 'started', 'succeeded': 'succeeded'}
__END__

TEST_NAME="${TEST_NAME_BASE}_poll_fail"
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/workflow-state/10-backcompat.t
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ contains_ok "${TEST_NAME}.stdout" <<__END__
__END__

# recreate Cylc 7 DB with one NULL status
rm "${WORKFLOW_RUN_DIR}/log/db"
rm "${WORKFLOW_RUN_DIR}/log/db"
run_ok "create-db" sqlite3 "${WORKFLOW_RUN_DIR}/log/db" < schema-2.sql

TEST_NAME="${TEST_NAME_BASE}_compat_2"
Expand All @@ -43,12 +43,12 @@ contains_ok "${TEST_NAME}.stdout" <<__END__
2051/foo:succeeded
__END__

# Cylc 7 DB only contains custom outputs, and only the task message.
# Cylc 7 DB only contains custom outputs
TEST_NAME="${TEST_NAME_BASE}_outputs"
run_ok "${TEST_NAME}" cylc workflow-state --max-polls=1 --message "${WORKFLOW_NAME}"

contains_ok "${TEST_NAME}.stdout" <<__END__
2051/foo:['the quick brown fox']
2051/foo:{'x': 'the quick brown fox'}
__END__

purge
Loading

0 comments on commit cb809ad

Please sign in to comment.