diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 846d9dcc752aa..c8aa4b22336aa 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -61,17 +61,14 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, - AUTO_RETRY_RUN_ID_TAG, BACKFILL_ID_TAG, PARTITION_NAME_TAG, - WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id, toposort from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -977,8 +974,7 @@ def backfill_is_complete( # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete if any( [ - get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False) - and run.tags.get(AUTO_RETRY_RUN_ID_TAG) is None + run.is_complete_and_waiting_to_retry for run in instance.get_runs( filters=RunsFilter( tags={BACKFILL_ID_TAG: backfill_id}, diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index 46f38e31a7d8a..5c948829638eb 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -24,6 +24,7 @@ from dagster._core.origin import JobPythonOrigin from dagster._core.storage.tags import ( ASSET_EVALUATION_ID_TAG, + AUTO_RETRY_RUN_ID_TAG, AUTOMATION_CONDITION_TAG, BACKFILL_ID_TAG, PARENT_RUN_ID_TAG, @@ -33,10 +34,12 @@ SCHEDULE_NAME_TAG, SENSOR_NAME_TAG, TICK_ID_TAG, + WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id from dagster._record import IHaveNew, record_custom from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes +from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.definitions.schedule_definition import ScheduleDefinition @@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool: """bool: If this run was created from retrying another run from the point of failure.""" return self.tags.get(RESUME_RETRY_TAG) == "true" + @property + def is_complete_and_waiting_to_retry(self): + """Indicates if a run is waiting to be retried by the auto-reexecution system. + Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry), + 3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet. + Otherwise returns False. + """ + if self.status in NOT_FINISHED_STATUSES: + return False + if self.status != DagsterRunStatus.FAILURE: + return False + will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False) + retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None + if will_retry: + return retry_not_launched + + return False + @property def previous_run_id(self) -> Optional[str]: # Compat