From 502b63b542e1dfa2b80782c550ed3bd33fb3283b Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 9 Oct 2024 09:13:11 +0100
Subject: [PATCH 1/7] Log job failure even when it doesn't cause a change in
task state (i.e. when there is a retry set up).
---
changes.d/6169.fix | 1 +
cylc/flow/task_events_mgr.py | 20 ++++++++++++-----
tests/integration/test_task_events_mgr.py | 27 ++++++++++++++++++++---
3 files changed, 40 insertions(+), 8 deletions(-)
create mode 100644 changes.d/6169.fix
diff --git a/changes.d/6169.fix b/changes.d/6169.fix
new file mode 100644
index 00000000000..d116ea1ba75
--- /dev/null
+++ b/changes.d/6169.fix
@@ -0,0 +1 @@
+Ensure that job failure is logged, even when the presence of retries causes the task not to change state.
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 0a65baea1a9..7c3567448ef 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -744,7 +744,7 @@ def process_message(
# Already failed.
return True
if self._process_message_failed(
- itask, event_time, self.JOB_FAILED, forced
+ itask, event_time, self.JOB_FAILED, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)
@@ -795,7 +795,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
- itask, event_time, self.JOB_FAILED, forced
+ itask, event_time, self.JOB_FAILED, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)
@@ -812,7 +812,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
- itask, event_time, aborted_with, forced
+ itask, event_time, aborted_with, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)
@@ -1297,10 +1297,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)
- def _process_message_failed(self, itask, event_time, message, forced):
+ def _process_message_failed(
+ self, itask, event_time, message, forced, full_message
+ ):
"""Helper for process_message, handle a failed message.
Return True if no retries (hence go to the failed state).
+
+ Args:
+ full_message:
+ If we have retries lined up we still tell users what
+ happened to cause the this attempt to fail.
"""
no_retries = False
if event_time is None:
@@ -1332,7 +1339,10 @@ def _process_message_failed(self, itask, event_time, message, forced):
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
- LOG.warning(f"[{itask}] {delay_msg}")
+ LOG.warning(
+ f'[{itask}] {full_message or self.EVENT_FAILED} - '
+ f'{delay_msg}'
+ )
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index 7ac12274d7b..6e996bced6d 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -14,7 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from itertools import product
import logging
from typing import Any as Fixture
@@ -22,9 +21,7 @@
from cylc.flow.scheduler import Scheduler
from cylc.flow.data_store_mgr import (
JOBS,
- TASK_STATUSES_ORDERED,
TASK_STATUS_WAITING,
- TASK_STATUS_SUBMIT_FAILED,
)
@@ -170,3 +167,27 @@ async def test__always_insert_task_job(
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
}
+
+
+async def test__process_message_failed_with_retry(one, start):
+ """Log job failure, even if a retry is scheduled.
+
+ See: https://github.com/cylc/cylc-flow/pull/6169
+
+ """
+
+ async with start(one) as LOG:
+ fail_once = one.pool.get_tasks()[0]
+ # Add retry timers:
+ one.task_job_mgr._set_retry_timers(
+ fail_once, {
+ 'execution retry delays': [1],
+ 'submission retry delays': [1]
+ })
+
+ # Process failed message:
+ one.task_events_mgr._process_message_failed(
+ fail_once, None, 'failed', False, 'failed/OOK')
+
+ # Check that failure reported:
+ assert 'failed/OOK' in LOG.messages[-1]
From 7b0a7b1375c6efc4e76811b85319d22fedc13548 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 24 Oct 2024 13:01:16 +0100
Subject: [PATCH 2/7] move logging to _process_message_ for
submit/failure
---
changes.d/6169.fix | 2 +-
cylc/flow/task_events_mgr.py | 12 ++++++++++--
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/changes.d/6169.fix b/changes.d/6169.fix
index d116ea1ba75..c7ca6f74da8 100644
--- a/changes.d/6169.fix
+++ b/changes.d/6169.fix
@@ -1 +1 @@
-Ensure that job failure is logged, even when the presence of retries causes the task not to change state.
+Ensure that job submit/failure is logged, even when retries are planned.
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 7c3567448ef..d1c81085f42 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -928,11 +928,16 @@ def _process_message_check(
return False
severity_lvl: int = LOG_LEVELS.get(severity, INFO)
+ # Don't log submit/failure messages here:
+ if flag != self.FLAG_POLLED and message in {
+ self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
+ }:
+ return True
# Demote log level to DEBUG if this is a message that duplicates what
# gets logged by itask state change anyway (and not manual poll)
if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
- self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
+ self.EVENT_FAILED
}:
severity_lvl = DEBUG
LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
@@ -1334,6 +1339,9 @@ def _process_message_failed(
self.data_store_mgr.delta_task_output(
itask, TASK_OUTPUT_FAILED)
self.data_store_mgr.delta_task_state(itask)
+ LOG.error(
+ f'[{itask}] {full_message or self.EVENT_FAILED} - '
+ )
else:
# There is an execution retry lined up.
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
@@ -1414,7 +1422,6 @@ def _process_message_submit_failed(
Return True if no retries (hence go to the submit-failed state).
"""
no_retries = False
- LOG.critical(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
if event_time is None:
event_time = get_current_time_string()
self.workflow_db_mgr.put_update_task_jobs(itask, {
@@ -1440,6 +1447,7 @@ def _process_message_submit_failed(
self.data_store_mgr.delta_task_output(
itask, TASK_OUTPUT_SUBMIT_FAILED)
self.data_store_mgr.delta_task_state(itask)
+ LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
else:
# There is a submission retry lined up.
timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY]
From 00230a764de2c41aef23388dc082b9ad14225c9f Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 24 Oct 2024 13:18:45 +0100
Subject: [PATCH 3/7] update integration tests
---
tests/integration/test_task_events_mgr.py | 44 +++++++++++++++++------
1 file changed, 34 insertions(+), 10 deletions(-)
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index 6e996bced6d..a8da192b863 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -76,17 +76,22 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
with correct submit number.
"""
conf = {
- 'scheduling': {'graph': {'R1': 'rhenas'}},
- 'runtime': {'rhenas': {'simulation': {
- 'fail cycle points': '1',
- 'fail try 1 only': False,
- }}}}
+ "scheduling": {"graph": {"R1": "rhenas"}},
+ "runtime": {
+ "rhenas": {
+ "simulation": {
+ "fail cycle points": "1",
+ "fail try 1 only": False,
+ }
+ }
+ },
+ }
id_ = flow(conf)
schd = scheduler(id_)
async with start(schd):
# Set task to running:
- itask = schd.pool.get_tasks()[0]
- itask.state.status = 'running'
+ itask = schd.pool.get_tasks()[0]
+ itask.state.status = "running"
itask.submit_num += 1
# Not run _insert_task_job yet:
@@ -185,9 +190,28 @@ async def test__process_message_failed_with_retry(one, start):
'submission retry delays': [1]
})
- # Process failed message:
+ # Process submit failed message with and without retries:
+ one.task_events_mgr._process_message_submit_failed(
+ fail_once, None, 1, False)
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.WARNING
+ assert '1/one:waiting(queued)' in last_record.message
+
+ one.task_events_mgr._process_message_submit_failed(
+ fail_once, None, 2, False)
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.ERROR
+ assert 'submission failed' in last_record.message
+
+ # Process failed message with and without retries:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.WARNING
+ assert 'failed/OOK' in last_record.message
- # Check that failure reported:
- assert 'failed/OOK' in LOG.messages[-1]
+ one.task_events_mgr._process_message_failed(
+ fail_once, None, 'failed', False, 'failed/OOK')
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.ERROR
+ assert 'failed/OOK' in last_record.message
From 11b83802758af88f255c29fa5170158f2bfd0629 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 6 Nov 2024 10:08:14 +0000
Subject: [PATCH 4/7] remove EVENT_FAILED from debug demotion set
---
cylc/flow/task_events_mgr.py | 1 -
1 file changed, 1 deletion(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index d1c81085f42..121b4ae4e59 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -937,7 +937,6 @@ def _process_message_check(
# gets logged by itask state change anyway (and not manual poll)
if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
- self.EVENT_FAILED
}:
severity_lvl = DEBUG
LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
From 5b5c3910edfb68d9535f76c801da4917e4dc6aa2 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 6 Nov 2024 13:18:39 +0000
Subject: [PATCH 5/7] [skip ci]wip
---
tests/functional/cylc-remove/00-simple/flow.cylc | 2 +-
tests/functional/cylc-remove/02-cycling/flow.cylc | 4 ++--
.../functional/cylc-trigger/02-filter-failed/flow.cylc | 6 +++---
.../functional/cylc-trigger/04-filter-names/flow.cylc | 10 +++++-----
tests/functional/hold-release/11-retrying/flow.cylc | 2 +-
tests/functional/reload/10-runahead.t | 2 +-
tests/functional/reload/25-xtriggers.t | 2 +-
tests/functional/reload/runahead/flow.cylc | 2 +-
.../functional/spawn-on-demand/10-retrigger/flow.cylc | 2 +-
tests/functional/triggering/19-and-suicide/flow.cylc | 2 +-
10 files changed, 17 insertions(+), 17 deletions(-)
diff --git a/tests/functional/cylc-remove/00-simple/flow.cylc b/tests/functional/cylc-remove/00-simple/flow.cylc
index 84c740ad421..e1c23857b8f 100644
--- a/tests/functional/cylc-remove/00-simple/flow.cylc
+++ b/tests/functional/cylc-remove/00-simple/flow.cylc
@@ -15,7 +15,7 @@
script = false
[[cleaner]]
script = """
-cylc__job__poll_grep_workflow_log -E '1/b/01:running.* \(received\)failed'
+cylc__job__poll_grep_workflow_log -E '1/b/01.* failed'
# Remove the unhandled failed task
cylc remove "$CYLC_WORKFLOW_ID//1/b"
# Remove waiting 1/c
diff --git a/tests/functional/cylc-remove/02-cycling/flow.cylc b/tests/functional/cylc-remove/02-cycling/flow.cylc
index 3b6c1051493..5371d577394 100644
--- a/tests/functional/cylc-remove/02-cycling/flow.cylc
+++ b/tests/functional/cylc-remove/02-cycling/flow.cylc
@@ -17,8 +17,8 @@
[runtime]
[[remover]]
script = """
- cylc__job__poll_grep_workflow_log -E '2020/bar/01:running.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '2021/baz/01:running.* \(received\)failed'
+ cylc__job__poll_grep_workflow_log -E '2020/bar/01.* failed'
+ cylc__job__poll_grep_workflow_log -E '2021/baz/01.* failed'
# Remove the two unhandled failed tasks.
cylc remove "$CYLC_WORKFLOW_ID//*/ba*:failed"
# Remove the two unsatisfied waiting tasks.
diff --git a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc
index 7416cf5790d..342f1449980 100644
--- a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc
+++ b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc
@@ -18,9 +18,9 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
- cylc__job__poll_grep_workflow_log -E '1/fixable1/01:running.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '1/fixable2/01:running.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '1/fixable3/01:running.* \(received\)failed'
+ cylc__job__poll_grep_workflow_log -E '\[1/fixable1/01:failed\] failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '\[1/fixable2/01:failed\] failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '\[1/fixable3/01:failed\] failed/ERR'
cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*"
"""
[[Z]]
diff --git a/tests/functional/cylc-trigger/04-filter-names/flow.cylc b/tests/functional/cylc-trigger/04-filter-names/flow.cylc
index 31839c1b77f..95caf5ddc4e 100644
--- a/tests/functional/cylc-trigger/04-filter-names/flow.cylc
+++ b/tests/functional/cylc-trigger/04-filter-names/flow.cylc
@@ -22,11 +22,11 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
- cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* \(received\)failed'
- cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* \(received\)failed'
+ cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* failed'
+ cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* failed'
+ cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* failed'
+ cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* failed'
+ cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* failed'
cylc trigger "${CYLC_WORKFLOW_ID}//" \
'//1/FIXABLE-1' '//1/fixable-2*' '//1/fixable-3'
"""
diff --git a/tests/functional/hold-release/11-retrying/flow.cylc b/tests/functional/hold-release/11-retrying/flow.cylc
index 0e08699af09..0d747c4c798 100644
--- a/tests/functional/hold-release/11-retrying/flow.cylc
+++ b/tests/functional/hold-release/11-retrying/flow.cylc
@@ -18,7 +18,7 @@ t-retry-able => t-analyse
[[t-hold-release]]
script = """
cylc__job__poll_grep_workflow_log -E \
- '1/t-retry-able/01:running.* \(received\)failed'
+ '1/t-retry-able/01.*failed'
cylc__job__poll_grep_workflow_log -E \
'1/t-retry-able/01:running.* => waiting'
diff --git a/tests/functional/reload/10-runahead.t b/tests/functional/reload/10-runahead.t
index d591997f68b..fa93be9891a 100644
--- a/tests/functional/reload/10-runahead.t
+++ b/tests/functional/reload/10-runahead.t
@@ -30,7 +30,7 @@ run_fail "${TEST_NAME}" cylc play --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
TEST_NAME=${TEST_NAME_BASE}-check-fail
DB_FILE="$RUN_DIR/${WORKFLOW_NAME}/log/db"
-QUERY='SELECT COUNT(*) FROM task_states WHERE status == "failed"'
+QUERY="SELECT COUNT(*) FROM task_states WHERE status == 'failed'"
cmp_ok <(sqlite3 "$DB_FILE" "$QUERY") <<< "4"
#-------------------------------------------------------------------------------
purge
diff --git a/tests/functional/reload/25-xtriggers.t b/tests/functional/reload/25-xtriggers.t
index 0269a2e3775..db37d26c793 100644
--- a/tests/functional/reload/25-xtriggers.t
+++ b/tests/functional/reload/25-xtriggers.t
@@ -42,7 +42,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[[reload]]
script = """
# wait for "broken" to fail
- cylc__job__poll_grep_workflow_log -E '1/broken/01.* \(received\)failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '1/broken/01.* failed/ERR'
# fix "broken" to allow it to pass
sed -i 's/false/true/' "${CYLC_WORKFLOW_RUN_DIR}/flow.cylc"
# reload the workflow
diff --git a/tests/functional/reload/runahead/flow.cylc b/tests/functional/reload/runahead/flow.cylc
index c65b5e11d6d..b1fd8646bf3 100644
--- a/tests/functional/reload/runahead/flow.cylc
+++ b/tests/functional/reload/runahead/flow.cylc
@@ -20,7 +20,7 @@
script = true
[[reloader]]
script = """
- cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*\(received\)failed"
+ cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*failed"
perl -pi -e 's/(runahead limit = )P1( # marker)/\1 P3\2/' $CYLC_WORKFLOW_RUN_DIR/flow.cylc
cylc reload $CYLC_WORKFLOW_ID
"""
diff --git a/tests/functional/spawn-on-demand/10-retrigger/flow.cylc b/tests/functional/spawn-on-demand/10-retrigger/flow.cylc
index 7e9149ce3c9..38e9aafaeee 100644
--- a/tests/functional/spawn-on-demand/10-retrigger/flow.cylc
+++ b/tests/functional/spawn-on-demand/10-retrigger/flow.cylc
@@ -18,7 +18,7 @@
"""
[[triggerer]]
script = """
- cylc__job__poll_grep_workflow_log -E '1/oops/01:running.* \(received\)failed'
+ cylc__job__poll_grep_workflow_log -E '1/oops/01.* failed'
cylc trigger "${CYLC_WORKFLOW_ID}//1/oops"
"""
[[foo, bar]]
diff --git a/tests/functional/triggering/19-and-suicide/flow.cylc b/tests/functional/triggering/19-and-suicide/flow.cylc
index 670c361fc96..063360022a5 100644
--- a/tests/functional/triggering/19-and-suicide/flow.cylc
+++ b/tests/functional/triggering/19-and-suicide/flow.cylc
@@ -16,7 +16,7 @@
[[t0]]
# https://github.com/cylc/cylc-flow/issues/2655
# "1/t2" should not suicide on "1/t1:failed"
- script = cylc__job__poll_grep_workflow_log -E '1/t1.* \(received\)failed'
+ script = cylc__job__poll_grep_workflow_log -E '1/t1.* failed'
[[t1]]
script = false
[[t2]]
From 0b26abd5289aa5b8741f649b5c975ba511ea58d2 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 24 Oct 2024 13:18:45 +0100
Subject: [PATCH 6/7] update integration tests
---
.../hold-release/11-retrying/flow.cylc | 2 +-
tests/functional/reload/25-xtriggers.t | 4 +-
tests/integration/test_task_events_mgr.py | 44 ++++++++++++++-----
3 files changed, 37 insertions(+), 13 deletions(-)
diff --git a/tests/functional/hold-release/11-retrying/flow.cylc b/tests/functional/hold-release/11-retrying/flow.cylc
index 0e08699af09..dbd1ec3ed59 100644
--- a/tests/functional/hold-release/11-retrying/flow.cylc
+++ b/tests/functional/hold-release/11-retrying/flow.cylc
@@ -18,7 +18,7 @@ t-retry-able => t-analyse
[[t-hold-release]]
script = """
cylc__job__poll_grep_workflow_log -E \
- '1/t-retry-able/01:running.* \(received\)failed'
+ '\[1/t-retry-able:waiting\] failed/ERR'
cylc__job__poll_grep_workflow_log -E \
'1/t-retry-able/01:running.* => waiting'
diff --git a/tests/functional/reload/25-xtriggers.t b/tests/functional/reload/25-xtriggers.t
index 0269a2e3775..fdaa4ebb3f1 100644
--- a/tests/functional/reload/25-xtriggers.t
+++ b/tests/functional/reload/25-xtriggers.t
@@ -42,7 +42,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[[reload]]
script = """
# wait for "broken" to fail
- cylc__job__poll_grep_workflow_log -E '1/broken/01.* \(received\)failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '1/broken.*failed/ERR'
# fix "broken" to allow it to pass
sed -i 's/false/true/' "${CYLC_WORKFLOW_RUN_DIR}/flow.cylc"
# reload the workflow
@@ -63,7 +63,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
log_scan "${TEST_NAME_BASE}-scan" \
"$(cylc cat-log -m p "${WORKFLOW_NAME}")" \
1 1 \
- '1/broken.* (received)failed/ERR'
+ '1/broken.*failed/ERR'
log_scan "${TEST_NAME_BASE}-scan" \
"$(cylc cat-log -m p "${WORKFLOW_NAME}")" 1 1 \
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index 6e996bced6d..a8da192b863 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -76,17 +76,22 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
with correct submit number.
"""
conf = {
- 'scheduling': {'graph': {'R1': 'rhenas'}},
- 'runtime': {'rhenas': {'simulation': {
- 'fail cycle points': '1',
- 'fail try 1 only': False,
- }}}}
+ "scheduling": {"graph": {"R1": "rhenas"}},
+ "runtime": {
+ "rhenas": {
+ "simulation": {
+ "fail cycle points": "1",
+ "fail try 1 only": False,
+ }
+ }
+ },
+ }
id_ = flow(conf)
schd = scheduler(id_)
async with start(schd):
# Set task to running:
- itask = schd.pool.get_tasks()[0]
- itask.state.status = 'running'
+ itask = schd.pool.get_tasks()[0]
+ itask.state.status = "running"
itask.submit_num += 1
# Not run _insert_task_job yet:
@@ -185,9 +190,28 @@ async def test__process_message_failed_with_retry(one, start):
'submission retry delays': [1]
})
- # Process failed message:
+ # Process submit failed message with and without retries:
+ one.task_events_mgr._process_message_submit_failed(
+ fail_once, None, 1, False)
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.WARNING
+ assert '1/one:waiting(queued)' in last_record.message
+
+ one.task_events_mgr._process_message_submit_failed(
+ fail_once, None, 2, False)
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.ERROR
+ assert 'submission failed' in last_record.message
+
+ # Process failed message with and without retries:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.WARNING
+ assert 'failed/OOK' in last_record.message
- # Check that failure reported:
- assert 'failed/OOK' in LOG.messages[-1]
+ one.task_events_mgr._process_message_failed(
+ fail_once, None, 'failed', False, 'failed/OOK')
+ last_record = LOG.records[-1]
+ assert last_record.levelno == logging.ERROR
+ assert 'failed/OOK' in last_record.message
From 5453cac7cdfad2560156ddbde87fb847d5af27a0 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 7 Nov 2024 07:46:13 +0000
Subject: [PATCH 7/7] Update cylc/flow/task_events_mgr.py
Co-authored-by: Hilary James Oliver
response to review
---
cylc/flow/task_events_mgr.py | 6 ++----
.../cylc-trigger/02-filter-failed/flow.cylc | 6 +++---
tests/integration/test_task_events_mgr.py | 12 +++++-------
3 files changed, 10 insertions(+), 14 deletions(-)
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 121b4ae4e59..821fc09c370 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -1324,6 +1324,7 @@ def _process_message_failed(
"run_status": 1,
"time_run_exit": event_time,
})
+ LOG.error(f'[{itask}] {full_message or self.EVENT_FAILED}')
if (
forced
or TimerFlags.EXECUTION_RETRY not in itask.try_timers
@@ -1338,9 +1339,6 @@ def _process_message_failed(
self.data_store_mgr.delta_task_output(
itask, TASK_OUTPUT_FAILED)
self.data_store_mgr.delta_task_state(itask)
- LOG.error(
- f'[{itask}] {full_message or self.EVENT_FAILED} - '
- )
else:
# There is an execution retry lined up.
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
@@ -1428,6 +1426,7 @@ def _process_message_submit_failed(
"submit_status": 1,
})
itask.summary['submit_method_id'] = None
+ LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
if (
forced
or TimerFlags.SUBMISSION_RETRY not in itask.try_timers
@@ -1446,7 +1445,6 @@ def _process_message_submit_failed(
self.data_store_mgr.delta_task_output(
itask, TASK_OUTPUT_SUBMIT_FAILED)
self.data_store_mgr.delta_task_state(itask)
- LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
else:
# There is a submission retry lined up.
timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY]
diff --git a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc
index 342f1449980..3da59a7d867 100644
--- a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc
+++ b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc
@@ -18,9 +18,9 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
- cylc__job__poll_grep_workflow_log -E '\[1/fixable1/01:failed\] failed/ERR'
- cylc__job__poll_grep_workflow_log -E '\[1/fixable2/01:failed\] failed/ERR'
- cylc__job__poll_grep_workflow_log -E '\[1/fixable3/01:failed\] failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '\[1/fixable1/01:running\] failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '\[1/fixable2/01:running\] failed/ERR'
+ cylc__job__poll_grep_workflow_log -E '\[1/fixable3/01:running\] failed/ERR'
cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*"
"""
[[Z]]
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index a8da192b863..00557971135 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -174,7 +174,7 @@ async def test__always_insert_task_job(
}
-async def test__process_message_failed_with_retry(one, start):
+async def test__process_message_failed_with_retry(one, start, log_filter):
"""Log job failure, even if a retry is scheduled.
See: https://github.com/cylc/cylc-flow/pull/6169
@@ -199,9 +199,8 @@ async def test__process_message_failed_with_retry(one, start):
one.task_events_mgr._process_message_submit_failed(
fail_once, None, 2, False)
- last_record = LOG.records[-1]
- assert last_record.levelno == logging.ERROR
- assert 'submission failed' in last_record.message
+ failed_record = log_filter(LOG, level=logging.ERROR)[-1]
+ assert 'submission failed' in failed_record[2]
# Process failed message with and without retries:
one.task_events_mgr._process_message_failed(
@@ -212,6 +211,5 @@ async def test__process_message_failed_with_retry(one, start):
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
- last_record = LOG.records[-1]
- assert last_record.levelno == logging.ERROR
- assert 'failed/OOK' in last_record.message
+ failed_record = log_filter(LOG, level=logging.ERROR)[-1]
+ assert 'failed/OOK' in failed_record[2]