From 513c1abb80652c625fa491cce441cc4b91742f1f Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Thu, 5 Dec 2024 13:06:25 -0500 Subject: [PATCH] [backfill daemon run retries 1/n] update how we determine backfill completion to account for retried runs (#25771) ## Summary & Motivation The backfill daemon doesn't account for run retries. See https://github.com/dagster-io/internal/discussions/12460 for more context. We've decided that we want the daemon to account for automatic and manual retries of runs that occur while the backfill is still in progress. This requires two changes: ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried; and updating the daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. This PR addresses the first point, ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried. Currently a backfill is marked complete when all targeted asset partitions are in a terminal state (successfully materialized, failed, or downstream of a failed partition). Since failed runs may be retried, there is a case where all asset partitions are in a terminal state, but there is a retry in progress that could change the state of some asset partitions. This means that if there are any runs in progress for the partition we need to wait for them to complete before marking the backfill complete. Additionally, we need to account for a race condition where a failed run may have a retry automatically launched for it, but the daemon marks the backfill complete before the retried run is queued. This PR adds an additional check to ensure that no failed runs are about to be retried. ## How I Tested These Changes new unit tests manually ran a backfill with automatic run retries configured and saw that the backfill didn't complete until all automatic retries were complete --- .../dagster/_core/execution/asset_backfill.py | 80 +++++++- .../dagster/_core/storage/dagster_run.py | 21 +++ .../execution_tests/test_asset_backfill.py | 9 +- .../dagster_tests/daemon_tests/conftest.py | 24 ++- .../daemon_tests/test_backfill.py | 173 ++++++++++++++++++ 5 files changed, 289 insertions(+), 18 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 7fe3026e41ae8..c8aa4b22336aa 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -54,6 +54,7 @@ from dagster._core.storage.dagster_run import ( CANCELABLE_RUN_STATUSES, IN_PROGRESS_RUN_STATUSES, + NOT_FINISHED_STATUSES, DagsterRunStatus, RunsFilter, ) @@ -166,11 +167,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool): return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots) - def is_complete(self) -> bool: + def all_targeted_partitions_have_materialization_status(self) -> bool: """The asset backfill is complete when all runs to be requested have finished (success, failure, or cancellation). Since the AssetBackfillData object stores materialization states - per asset partition, the daemon continues to update the backfill data until all runs have - finished in order to display the final partition statuses in the UI. + per asset partition, we can use the materialization states and whether any runs for the backfill are + not finished to determine if the backfill is complete. We want the daemon to continue to update + the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ return ( ( @@ -927,6 +929,67 @@ def _check_validity_and_deserialize_asset_backfill_data( return asset_backfill_data +def backfill_is_complete( + backfill_id: str, + backfill_data: AssetBackfillData, + instance: DagsterInstance, + logger: logging.Logger, +): + """A backfill is complete when: + 1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). + 2. there are no in progress runs for the backfill. + 3. there are no failed runs that will result in an automatic retry, but have not yet been retried. + + Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we + cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are + in progress. Condition 3 guards against a race condition where a failed run could be automatically retried + but it was not added into the queue in time to be caught by condition 2. + + Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the + daemon continues to update the backfill data until all runs have finished in order to display the + final partition statuses in the UI. + """ + # Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill + # is not complete + if not backfill_data.all_targeted_partitions_have_materialization_status(): + logger.info( + "Not all targeted asset partitions have a materialization status. Backfill is still in progress." + ) + return False + # Condition 2 - if there are in progress runs for the backfill, the backfill is not complete + if ( + len( + instance.get_run_ids( + filters=RunsFilter( + statuses=NOT_FINISHED_STATUSES, + tags={BACKFILL_ID_TAG: backfill_id}, + ), + limit=1, + ) + ) + > 0 + ): + logger.info("Backfill has in progress runs. Backfill is still in progress.") + return False + # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete + if any( + [ + run.is_complete_and_waiting_to_retry + for run in instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill_id}, + statuses=[DagsterRunStatus.FAILURE], + ) + ) + ] + ): + logger.info( + "Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress." + ) + return False + return True + + def execute_asset_backfill_iteration( backfill: "PartitionBackfill", logger: logging.Logger, @@ -1045,11 +1108,12 @@ def execute_asset_backfill_iteration( updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) - if updated_backfill_data.is_complete(): - # The asset backfill is complete when all runs to be requested have finished (success, - # failure, or cancellation). Since the AssetBackfillData object stores materialization states - # per asset partition, the daemon continues to update the backfill data until all runs have - # finished in order to display the final partition statuses in the UI. + if backfill_is_complete( + backfill_id=backfill.backfill_id, + backfill_data=updated_backfill_data, + instance=instance, + logger=logger, + ): if ( updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets > 0 diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index 46f38e31a7d8a..5c948829638eb 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -24,6 +24,7 @@ from dagster._core.origin import JobPythonOrigin from dagster._core.storage.tags import ( ASSET_EVALUATION_ID_TAG, + AUTO_RETRY_RUN_ID_TAG, AUTOMATION_CONDITION_TAG, BACKFILL_ID_TAG, PARENT_RUN_ID_TAG, @@ -33,10 +34,12 @@ SCHEDULE_NAME_TAG, SENSOR_NAME_TAG, TICK_ID_TAG, + WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id from dagster._record import IHaveNew, record_custom from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes +from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.definitions.schedule_definition import ScheduleDefinition @@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool: """bool: If this run was created from retrying another run from the point of failure.""" return self.tags.get(RESUME_RETRY_TAG) == "true" + @property + def is_complete_and_waiting_to_retry(self): + """Indicates if a run is waiting to be retried by the auto-reexecution system. + Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry), + 3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet. + Otherwise returns False. + """ + if self.status in NOT_FINISHED_STATUSES: + return False + if self.status != DagsterRunStatus.FAILURE: + return False + will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False) + retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None + if will_retry: + return retry_not_launched + + return False + @property def previous_run_id(self) -> Optional[str]: # Compat diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 7ad4fb7bf76d1..e3ebe7a6ae057 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -55,6 +55,7 @@ AssetBackfillData, AssetBackfillIterationResult, AssetBackfillStatus, + backfill_is_complete, execute_asset_backfill_iteration_inner, get_canceling_asset_backfill_iteration_data, ) @@ -618,7 +619,12 @@ def run_backfill_to_completion( evaluation_time=backfill_data.backfill_start_datetime, ) - while not backfill_data.is_complete(): + while not backfill_is_complete( + backfill_id=backfill_id, + backfill_data=backfill_data, + instance=instance, + logger=logging.getLogger("fake_logger"), + ): iteration_count += 1 result1 = execute_asset_backfill_iteration_consume_generator( @@ -628,7 +634,6 @@ def run_backfill_to_completion( instance=instance, ) - # iteration_count += 1 assert result1.backfill_data != backfill_data instance_queryer = _get_instance_queryer( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index 9231dc882e573..edb4acfd36dbe 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -1,5 +1,6 @@ import os import sys +import tempfile from typing import Iterator, Optional, cast import pytest @@ -21,15 +22,22 @@ @pytest.fixture(name="instance_module_scoped", scope="module") def instance_module_scoped_fixture() -> Iterator[DagsterInstance]: - with instance_for_test( - overrides={ - "run_launcher": { - "module": "dagster._core.launcher.sync_in_memory_run_launcher", - "class": "SyncInMemoryRunLauncher", + with tempfile.TemporaryDirectory() as temp_dir: + with instance_for_test( + overrides={ + "run_launcher": { + "module": "dagster._core.launcher.sync_in_memory_run_launcher", + "class": "SyncInMemoryRunLauncher", + }, + "event_log_storage": { + "module": "dagster._core.storage.event_log", + "class": "ConsolidatedSqliteEventLogStorage", + "config": {"base_dir": temp_dir}, + }, + "run_retries": {"enabled": True}, } - } - ) as instance: - yield instance + ) as instance: + yield instance @pytest.fixture(name="instance", scope="function") diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 7610743f51cf2..7b659ef0c6442 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -46,6 +46,8 @@ PartitionsSelector, ) from dagster._core.errors import DagsterUserCodeUnreachableError +from dagster._core.events import DagsterEvent, DagsterEventType +from dagster._core.events.log import EventLogEntry from dagster._core.execution.asset_backfill import ( AssetBackfillData, get_asset_backfill_run_chunk_size, @@ -67,6 +69,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + MAX_RETRIES_TAG, PARTITION_NAME_TAG, ) from dagster._core.test_utils import ( @@ -79,6 +82,9 @@ from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.workspace.context import WorkspaceProcessContext from dagster._daemon import get_default_daemon_logger +from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( + consume_new_runs_for_automatic_reexecution, +) from dagster._daemon.backfill import execute_backfill_iteration from dagster._seven import IS_WINDOWS, get_system_temp_directory from dagster._time import get_current_timestamp @@ -228,6 +234,17 @@ def bar(a1): return a1 +@asset(partitions_def=static_partitions) +def pass_on_retry(context): + if context.run.parent_run_id is None: + raise Exception("I failed!") + + +@asset(partitions_def=static_partitions) +def always_fails(): + raise Exception("I always fail") + + @asset( config_schema={"myparam": Field(str, description="YYYY-MM-DD")}, ) @@ -425,6 +442,8 @@ def the_repo(): ab1, ab2, define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions), + always_fails, + pass_on_retry, # baz is a configurable asset which has no dependencies baz, asset_a, @@ -2789,3 +2808,157 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions( ) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_not_complete_until_retries_complete( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + retried_run = create_run_for_test( + instance=instance, + job_name=run_to_retry.job_name, + tags=run_to_retry.tags, + root_run_id=run_to_retry.run_id, + parent_run_id=run_to_retry.run_id, + ) + + # since there is a run in progress, the backfill should not be marked as complete, even though + # all targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.asset_backfill_data + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.status == BulkActionStatus.REQUESTED + + # manually mark the run as successful to show that the backfill will be marked as complete + # since there are no in progress runs + instance.handle_new_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=retried_run.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.RUN_SUCCESS.value, + job_name=retried_run.job_name, + ), + ) + ) + + retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] + assert retried_run.status == DagsterRunStatus.SUCCESS + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_not_complete_if_automatic_retry_could_happen( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("pass_on_retry")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_failed(instance, run, "pass_on_retry") + + # since the failed runs should have automatic retries launched for them, the backfill should not + # be considered complete, even though the targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.asset_backfill_data + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.status == BulkActionStatus.REQUESTED + + # automatic retries wont get automatically run in test environment, so we run the function manually + runs = instance.get_run_records() + list( + consume_new_runs_for_automatic_reexecution( + workspace_process_context=workspace_context, run_records=runs + ) + ) + wait_for_all_runs_to_finish(instance, timeout=30) + assert instance.get_runs_count() == 6 + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_FAILED