diff --git a/python_modules/dagster/dagster/_core/execution/retries.py b/python_modules/dagster/dagster/_core/execution/retries.py index 6dbdc0ecd4989..d184e8fba28f0 100644 --- a/python_modules/dagster/dagster/_core/execution/retries.py +++ b/python_modules/dagster/dagster/_core/execution/retries.py @@ -1,3 +1,4 @@ +import warnings from collections import defaultdict from enum import Enum from typing import Mapping, Optional @@ -7,7 +8,15 @@ Selector, _check as check, ) +from dagster._core.instance import DagsterInstance +from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus +from dagster._core.storage.tags import ( + MAX_RETRIES_TAG, + RETRY_ON_ASSET_OR_OP_FAILURE_TAG, + RUN_FAILURE_REASON_TAG, +) from dagster._serdes.serdes import whitelist_for_serdes +from dagster._utils.tags import get_boolean_tag_value def get_retries_config(): @@ -69,3 +78,66 @@ def mark_attempt(self, key: str) -> None: def snapshot_attempts(self) -> Mapping[str, int]: return dict(self._attempts) + + +def auto_reexecution_should_retry_run(instance: DagsterInstance, run: DagsterRun): + """Determines if a run will be retried by the automatic reexcution system. + A run will retry if: + - it is failed. + - the number of max allowed retries is > 0 (max retries can be set via system setting or run tag). + - there have not already been >= max_retries retries for the run. + + If the run failure reason was a step failure and the retry_on_asset_or_op_failure tag/system setting is set to false, + a warning message will be logged and the run will not be retried. + + We determine how many retries have been launched for the run by looking at the size of the run group + (the set of runs that have the same root_run_id and the run with root_run_id). Since manually launched retries are + part of the run group, this means that if a user launches a manual retry of run A and then this function + is called because a retry for run A launched by the auto-reexecution system failed, the manual retry will be + counted toward max_retries. + + There is potential that one "extra" retry will be launched by the automatic reexecution system + since manual retries could be happening in parallel with automatic retries. Here is + an illustrative example: + - Max retries is 3 + - Run A fails + - The automatic reexecution system launches a retry of run A (A_1), which fails + - The automatic reexecution system launches a retry run A_1 (A_2), which fails + - This function is executing and has fetched the run_group for run A_2: (A, A_1, A_2) + - A user launches a manual retry of run A (A_m). The run group is now (A, A_1, A_2, A_m), but this function does + not have the updated run group + - Since the run group we've fetched is (A, A_1, A_2), this function will mark A_2 as `will_retry=true` and + run `A_3` will be launched. This is the "extra" retry, since usually manual retries are counted toward max_retries, but + in this case it was not. + """ + from dagster._core.events import RunFailureReason + + if run.status == DagsterRunStatus.FAILURE: + raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG) + if raw_max_retries_tag is None: + max_retries = instance.run_retries_max_retries + else: + try: + max_retries = int(raw_max_retries_tag) + except ValueError: + warnings.warn(f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run.") + return False + if max_retries > 0: + run_group = instance.get_run_group(run.run_id) + if run_group is not None: + _, run_group_iter = run_group + # since the original run is in the run group, the number of retries launched + # so far is len(run_group_iter) - 1 + if len(list(run_group_iter)) <= max_retries: + retry_on_asset_or_op_failure = get_boolean_tag_value( + run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG), + default_value=instance.run_retries_retry_on_asset_or_op_failure, + ) + if ( + run.tags.get(RUN_FAILURE_REASON_TAG) == RunFailureReason.STEP_FAILURE.value + and not retry_on_asset_or_op_failure + ): + return False + else: + return True + return False diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index c87c3dbf71ba5..44219351f7d3c 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -47,6 +47,7 @@ DagsterRunAlreadyExists, DagsterRunConflict, ) +from dagster._core.execution.retries import auto_reexecution_should_retry_run from dagster._core.instance.config import ( DAGSTER_CONFIG_YAML_FILENAME, DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT, @@ -70,13 +71,10 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, - MAX_RETRIES_TAG, PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, RESUME_RETRY_TAG, - RETRY_ON_ASSET_OR_OP_FAILURE_TAG, ROOT_RUN_ID_TAG, - RUN_FAILURE_REASON_TAG, TAGS_TO_OMIT_ON_RETRY, WILL_RETRY_TAG, ) @@ -85,7 +83,6 @@ from dagster._utils import PrintFn, is_uuid, traced from dagster._utils.error import serializable_error_info_from_exc_info from dagster._utils.merger import merge_dicts -from dagster._utils.tags import get_boolean_tag_value from dagster._utils.warnings import disable_dagster_warnings, experimental_warning # 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the @@ -2399,71 +2396,6 @@ def get_handlers(self) -> Sequence[logging.Handler]: def store_event(self, event: "EventLogEntry") -> None: self._event_storage.store_event(event) - def should_retry_run(self, run: DagsterRun) -> bool: - """Determines if a run will be retried by the automatic reexcution system. - A run will retry if: - - it is failed. - - the number of max allowed retries is > 0 (max retries can be set via system setting or run tag). - - there have not already been >= max_retries retries for the run. - - If the run failure reason was a step failure and the retry_on_asset_or_op_failure tag/system setting is set to false, - a warning message will be logged and the run will not be retried. - - We determine how many retries have been launched for the run by looking at the size of the run group - (the set of runs that have the same root_run_id and the run with root_run_id). Since manually launched retries are - given a root_run_id, this means that if a user launches a manual retry of run A and then this function - is called because a retry for run A launched by the auto-reexecution system failed, the manual retry will be - counted toward max_retries. - - There is potential that one "extra" retry will be launched by the automatic reexecution system - since manual retries could be happening in parallel with automatic retries. Here is - an illustrative example: - - Max retries is 3 - - Run A fails - - The automatic reexecution system launches a retry of run A (A_1), which fails - - The automatic reexecution system launches a retry run A_1 (A_2), which fails - - This function is executing and has fetched the run_group for run A_2: (A, A_1, A_2) - - A user launches a manual retry of run A (A_m). The run group is now (A, A_1, A_2, A_m), but this function does - not have the updated run group - - Since the run group we've fetched is (A, A_1, A_2), this function will mark A_2 as `will_retry=true` and - run `A_3` will be launched. This is the "extra" retry, since usually manual retries are counted toward max_retries, but - in this case it was not. - """ - from dagster._core.events import RunFailureReason - - if run.status == DagsterRunStatus.FAILURE: - raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG) - if raw_max_retries_tag is None: - max_retries = self.run_retries_max_retries - else: - try: - max_retries = int(raw_max_retries_tag) - except ValueError: - warnings.warn( - f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run." - ) - return False - if max_retries > 0: - run_group = self.get_run_group(run.run_id) - if run_group is not None: - _, run_group_iter = run_group - # since the original run is in the run group, the number of retries launched - # so far is len(run_group_iter) - 1 - if len(list(run_group_iter)) <= max_retries: - retry_on_asset_or_op_failure = get_boolean_tag_value( - run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG), - default_value=self.run_retries_retry_on_asset_or_op_failure, - ) - if ( - run.tags.get(RUN_FAILURE_REASON_TAG) - == RunFailureReason.STEP_FAILURE.value - and not retry_on_asset_or_op_failure - ): - return False - else: - return True - return False - def handle_new_event( self, event: "EventLogEntry", @@ -2526,7 +2458,8 @@ def handle_new_event( # Note that this tag is only applied to runs that fail. Successful runs will not # have a WILL_RETRY_TAG tag. self.add_run_tags( - run_id, {WILL_RETRY_TAG: str(self.should_retry_run(run)).lower()} + run_id, + {WILL_RETRY_TAG: str(auto_reexecution_should_retry_run(self, run)).lower()}, ) for sub in self._subscribers[run_id]: sub(event)