diff --git a/changes.d/6169.fix b/changes.d/6169.fix new file mode 100644 index 00000000000..c7ca6f74da8 --- /dev/null +++ b/changes.d/6169.fix @@ -0,0 +1 @@ +Ensure that job submit/failure is logged, even when retries are planned. diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 0a65baea1a9..821fc09c370 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -744,7 +744,7 @@ def process_message( # Already failed. return True if self._process_message_failed( - itask, event_time, self.JOB_FAILED, forced + itask, event_time, self.JOB_FAILED, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -795,7 +795,7 @@ def process_message( self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": signal}) if self._process_message_failed( - itask, event_time, self.JOB_FAILED, forced + itask, event_time, self.JOB_FAILED, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -812,7 +812,7 @@ def process_message( self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": aborted_with}) if self._process_message_failed( - itask, event_time, aborted_with, forced + itask, event_time, aborted_with, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -928,11 +928,15 @@ def _process_message_check( return False severity_lvl: int = LOG_LEVELS.get(severity, INFO) + # Don't log submit/failure messages here: + if flag != self.FLAG_POLLED and message in { + self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR' + }: + return True # Demote log level to DEBUG if this is a message that duplicates what # gets logged by itask state change anyway (and not manual poll) if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in { self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED, - self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR' }: severity_lvl = DEBUG LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}") @@ -1297,10 +1301,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False): if itask.state_reset(TASK_STATUS_WAITING): self.data_store_mgr.delta_task_state(itask) - def _process_message_failed(self, itask, event_time, message, forced): + def _process_message_failed( + self, itask, event_time, message, forced, full_message + ): """Helper for process_message, handle a failed message. Return True if no retries (hence go to the failed state). + + Args: + full_message: + If we have retries lined up we still tell users what + happened to cause the this attempt to fail. """ no_retries = False if event_time is None: @@ -1313,6 +1324,7 @@ def _process_message_failed(self, itask, event_time, message, forced): "run_status": 1, "time_run_exit": event_time, }) + LOG.error(f'[{itask}] {full_message or self.EVENT_FAILED}') if ( forced or TimerFlags.EXECUTION_RETRY not in itask.try_timers @@ -1332,7 +1344,10 @@ def _process_message_failed(self, itask, event_time, message, forced): timer = itask.try_timers[TimerFlags.EXECUTION_RETRY] self._retry_task(itask, timer.timeout) delay_msg = f"retrying in {timer.delay_timeout_as_str()}" - LOG.warning(f"[{itask}] {delay_msg}") + LOG.warning( + f'[{itask}] {full_message or self.EVENT_FAILED} - ' + f'{delay_msg}' + ) msg = f"{self.JOB_FAILED}, {delay_msg}" self.setup_event_handlers(itask, self.EVENT_RETRY, msg) self._reset_job_timers(itask) @@ -1404,7 +1419,6 @@ def _process_message_submit_failed( Return True if no retries (hence go to the submit-failed state). """ no_retries = False - LOG.critical(f"[{itask}] {self.EVENT_SUBMIT_FAILED}") if event_time is None: event_time = get_current_time_string() self.workflow_db_mgr.put_update_task_jobs(itask, { @@ -1412,6 +1426,7 @@ def _process_message_submit_failed( "submit_status": 1, }) itask.summary['submit_method_id'] = None + LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}") if ( forced or TimerFlags.SUBMISSION_RETRY not in itask.try_timers diff --git a/tests/functional/cylc-remove/00-simple/flow.cylc b/tests/functional/cylc-remove/00-simple/flow.cylc index 84c740ad421..e1c23857b8f 100644 --- a/tests/functional/cylc-remove/00-simple/flow.cylc +++ b/tests/functional/cylc-remove/00-simple/flow.cylc @@ -15,7 +15,7 @@ script = false [[cleaner]] script = """ -cylc__job__poll_grep_workflow_log -E '1/b/01:running.* \(received\)failed' +cylc__job__poll_grep_workflow_log -E '1/b/01.* failed' # Remove the unhandled failed task cylc remove "$CYLC_WORKFLOW_ID//1/b" # Remove waiting 1/c diff --git a/tests/functional/cylc-remove/02-cycling/flow.cylc b/tests/functional/cylc-remove/02-cycling/flow.cylc index 3b6c1051493..5371d577394 100644 --- a/tests/functional/cylc-remove/02-cycling/flow.cylc +++ b/tests/functional/cylc-remove/02-cycling/flow.cylc @@ -17,8 +17,8 @@ [runtime] [[remover]] script = """ - cylc__job__poll_grep_workflow_log -E '2020/bar/01:running.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '2021/baz/01:running.* \(received\)failed' + cylc__job__poll_grep_workflow_log -E '2020/bar/01.* failed' + cylc__job__poll_grep_workflow_log -E '2021/baz/01.* failed' # Remove the two unhandled failed tasks. cylc remove "$CYLC_WORKFLOW_ID//*/ba*:failed" # Remove the two unsatisfied waiting tasks. diff --git a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc index 7416cf5790d..3da59a7d867 100644 --- a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc +++ b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc @@ -18,9 +18,9 @@ [[fixer]] script = """ cylc__job__wait_cylc_message_started - cylc__job__poll_grep_workflow_log -E '1/fixable1/01:running.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable2/01:running.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable3/01:running.* \(received\)failed' + cylc__job__poll_grep_workflow_log -E '\[1/fixable1/01:running\] failed/ERR' + cylc__job__poll_grep_workflow_log -E '\[1/fixable2/01:running\] failed/ERR' + cylc__job__poll_grep_workflow_log -E '\[1/fixable3/01:running\] failed/ERR' cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*" """ [[Z]] diff --git a/tests/functional/cylc-trigger/04-filter-names/flow.cylc b/tests/functional/cylc-trigger/04-filter-names/flow.cylc index 31839c1b77f..95caf5ddc4e 100644 --- a/tests/functional/cylc-trigger/04-filter-names/flow.cylc +++ b/tests/functional/cylc-trigger/04-filter-names/flow.cylc @@ -22,11 +22,11 @@ [[fixer]] script = """ cylc__job__wait_cylc_message_started - cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* \(received\)failed' + cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* failed' + cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* failed' + cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* failed' + cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* failed' + cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* failed' cylc trigger "${CYLC_WORKFLOW_ID}//" \ '//1/FIXABLE-1' '//1/fixable-2*' '//1/fixable-3' """ diff --git a/tests/functional/hold-release/11-retrying/flow.cylc b/tests/functional/hold-release/11-retrying/flow.cylc index 0e08699af09..dbd1ec3ed59 100644 --- a/tests/functional/hold-release/11-retrying/flow.cylc +++ b/tests/functional/hold-release/11-retrying/flow.cylc @@ -18,7 +18,7 @@ t-retry-able => t-analyse [[t-hold-release]] script = """ cylc__job__poll_grep_workflow_log -E \ - '1/t-retry-able/01:running.* \(received\)failed' + '\[1/t-retry-able:waiting\] failed/ERR' cylc__job__poll_grep_workflow_log -E \ '1/t-retry-able/01:running.* => waiting' diff --git a/tests/functional/reload/10-runahead.t b/tests/functional/reload/10-runahead.t index d591997f68b..fa93be9891a 100644 --- a/tests/functional/reload/10-runahead.t +++ b/tests/functional/reload/10-runahead.t @@ -30,7 +30,7 @@ run_fail "${TEST_NAME}" cylc play --debug --no-detach "${WORKFLOW_NAME}" #------------------------------------------------------------------------------- TEST_NAME=${TEST_NAME_BASE}-check-fail DB_FILE="$RUN_DIR/${WORKFLOW_NAME}/log/db" -QUERY='SELECT COUNT(*) FROM task_states WHERE status == "failed"' +QUERY="SELECT COUNT(*) FROM task_states WHERE status == 'failed'" cmp_ok <(sqlite3 "$DB_FILE" "$QUERY") <<< "4" #------------------------------------------------------------------------------- purge diff --git a/tests/functional/reload/25-xtriggers.t b/tests/functional/reload/25-xtriggers.t index 0269a2e3775..fdaa4ebb3f1 100644 --- a/tests/functional/reload/25-xtriggers.t +++ b/tests/functional/reload/25-xtriggers.t @@ -42,7 +42,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' [[reload]] script = """ # wait for "broken" to fail - cylc__job__poll_grep_workflow_log -E '1/broken/01.* \(received\)failed/ERR' + cylc__job__poll_grep_workflow_log -E '1/broken.*failed/ERR' # fix "broken" to allow it to pass sed -i 's/false/true/' "${CYLC_WORKFLOW_RUN_DIR}/flow.cylc" # reload the workflow @@ -63,7 +63,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach log_scan "${TEST_NAME_BASE}-scan" \ "$(cylc cat-log -m p "${WORKFLOW_NAME}")" \ 1 1 \ - '1/broken.* (received)failed/ERR' + '1/broken.*failed/ERR' log_scan "${TEST_NAME_BASE}-scan" \ "$(cylc cat-log -m p "${WORKFLOW_NAME}")" 1 1 \ diff --git a/tests/functional/reload/runahead/flow.cylc b/tests/functional/reload/runahead/flow.cylc index c65b5e11d6d..b1fd8646bf3 100644 --- a/tests/functional/reload/runahead/flow.cylc +++ b/tests/functional/reload/runahead/flow.cylc @@ -20,7 +20,7 @@ script = true [[reloader]] script = """ - cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*\(received\)failed" + cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*failed" perl -pi -e 's/(runahead limit = )P1( # marker)/\1 P3\2/' $CYLC_WORKFLOW_RUN_DIR/flow.cylc cylc reload $CYLC_WORKFLOW_ID """ diff --git a/tests/functional/spawn-on-demand/10-retrigger/flow.cylc b/tests/functional/spawn-on-demand/10-retrigger/flow.cylc index 7e9149ce3c9..38e9aafaeee 100644 --- a/tests/functional/spawn-on-demand/10-retrigger/flow.cylc +++ b/tests/functional/spawn-on-demand/10-retrigger/flow.cylc @@ -18,7 +18,7 @@ """ [[triggerer]] script = """ - cylc__job__poll_grep_workflow_log -E '1/oops/01:running.* \(received\)failed' + cylc__job__poll_grep_workflow_log -E '1/oops/01.* failed' cylc trigger "${CYLC_WORKFLOW_ID}//1/oops" """ [[foo, bar]] diff --git a/tests/functional/triggering/19-and-suicide/flow.cylc b/tests/functional/triggering/19-and-suicide/flow.cylc index 670c361fc96..063360022a5 100644 --- a/tests/functional/triggering/19-and-suicide/flow.cylc +++ b/tests/functional/triggering/19-and-suicide/flow.cylc @@ -16,7 +16,7 @@ [[t0]] # https://github.com/cylc/cylc-flow/issues/2655 # "1/t2" should not suicide on "1/t1:failed" - script = cylc__job__poll_grep_workflow_log -E '1/t1.* \(received\)failed' + script = cylc__job__poll_grep_workflow_log -E '1/t1.* failed' [[t1]] script = false [[t2]] diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 7ac12274d7b..00557971135 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -14,7 +14,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from itertools import product import logging from typing import Any as Fixture @@ -22,9 +21,7 @@ from cylc.flow.scheduler import Scheduler from cylc.flow.data_store_mgr import ( JOBS, - TASK_STATUSES_ORDERED, TASK_STATUS_WAITING, - TASK_STATUS_SUBMIT_FAILED, ) @@ -79,17 +76,22 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate): with correct submit number. """ conf = { - 'scheduling': {'graph': {'R1': 'rhenas'}}, - 'runtime': {'rhenas': {'simulation': { - 'fail cycle points': '1', - 'fail try 1 only': False, - }}}} + "scheduling": {"graph": {"R1": "rhenas"}}, + "runtime": { + "rhenas": { + "simulation": { + "fail cycle points": "1", + "fail try 1 only": False, + } + } + }, + } id_ = flow(conf) schd = scheduler(id_) async with start(schd): # Set task to running: - itask = schd.pool.get_tasks()[0] - itask.state.status = 'running' + itask = schd.pool.get_tasks()[0] + itask.state.status = "running" itask.submit_num += 1 # Not run _insert_task_job yet: @@ -170,3 +172,44 @@ async def test__always_insert_task_job( '1/broken/01': 'submit-failed', '1/broken2/01': 'submit-failed' } + + +async def test__process_message_failed_with_retry(one, start, log_filter): + """Log job failure, even if a retry is scheduled. + + See: https://github.com/cylc/cylc-flow/pull/6169 + + """ + + async with start(one) as LOG: + fail_once = one.pool.get_tasks()[0] + # Add retry timers: + one.task_job_mgr._set_retry_timers( + fail_once, { + 'execution retry delays': [1], + 'submission retry delays': [1] + }) + + # Process submit failed message with and without retries: + one.task_events_mgr._process_message_submit_failed( + fail_once, None, 1, False) + last_record = LOG.records[-1] + assert last_record.levelno == logging.WARNING + assert '1/one:waiting(queued)' in last_record.message + + one.task_events_mgr._process_message_submit_failed( + fail_once, None, 2, False) + failed_record = log_filter(LOG, level=logging.ERROR)[-1] + assert 'submission failed' in failed_record[2] + + # Process failed message with and without retries: + one.task_events_mgr._process_message_failed( + fail_once, None, 'failed', False, 'failed/OOK') + last_record = LOG.records[-1] + assert last_record.levelno == logging.WARNING + assert 'failed/OOK' in last_record.message + + one.task_events_mgr._process_message_failed( + fail_once, None, 'failed', False, 'failed/OOK') + failed_record = log_filter(LOG, level=logging.ERROR)[-1] + assert 'failed/OOK' in failed_record[2]