diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index d898094eea40d..a3c957bb82c76 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -70,9 +70,6 @@ from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from python_modules.dagster.dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( - filter_runs_to_should_retry, -) if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -936,7 +933,7 @@ def backfill_is_complete( backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance ): """A backfill is complete when: - 1. all asset parttiions in the target subset have a materialization state (successful, failed, downstream of a failed partition). + 1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). 2. there are no in progress runs for the backfill. 3. there are no failed runs that will result in an automatic retry. @@ -949,13 +946,17 @@ def backfill_is_complete( daemon continues to update the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ + from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( + filter_runs_to_should_retry, + ) + # conditions are in order of least expensive to most expensive to check if ( # Condition 1 - all asset partitions in the target subset have a materialization state backfill_data.all_targeted_partitions_have_materialization_status() # Condtition 2 - there are no in progress runs for the backfill and len( - instance.get_runs_ids( + instance.get_run_ids( filters=RunsFilter( statuses=NOT_FINISHED_STATUSES, tags={BACKFILL_ID_TAG: backfill_id}, @@ -966,15 +967,17 @@ def backfill_is_complete( == 0 # Condition 3 - there are no failed runs that will be retried and len( - filter_runs_to_should_retry( - instance.get_runs( - filters=RunsFilter( - tags={BACKFILL_ID_TAG: backfill_id}, - statuses=[DagsterRunStatus.FAILURE], - ) - ), - instance, - instance.run_retries_max_retries, + list( + filter_runs_to_should_retry( + instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill_id}, + statuses=[DagsterRunStatus.FAILURE], + ) + ), + instance, + instance.run_retries_max_retries, + ) ) ) == 0 @@ -1270,7 +1273,7 @@ def get_canceling_asset_backfill_iteration_data( " AssetGraphSubset object" ) - failed_subset, _ = AssetGraphSubset.from_asset_partition_set( + failed_subset = AssetGraphSubset.from_asset_partition_set( set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)), asset_graph, ) @@ -1343,7 +1346,7 @@ def _get_failed_and_downstream_asset_partitions( asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, -) -> Tuple[AssetGraphSubset, bool]: +) -> AssetGraphSubset: failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( asset_graph.bfs_filter_asset_partitions( instance_queryer, @@ -1768,7 +1771,7 @@ def _get_failed_asset_partitions( asset_graph: RemoteAssetGraph, ) -> Sequence[AssetKeyPartitionKey]: """Returns asset partitions that materializations were requested for as part of the backfill, but - were not successfully materialized. + will not be materialized. Includes canceled asset partitions. Implementation assumes that successful runs won't have any failed partitions. diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 0bcb908e53daf..27f9ee3e64b35 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -72,6 +72,7 @@ from dagster._core.test_utils import ( create_run_for_test, environ, + mark_run_successful, step_did_not_run, step_failed, step_succeeded, @@ -2789,3 +2790,74 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions( ) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_not_complete_until_retries_complete( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill("run_retries_backfill") + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + retried_run = create_run_for_test( + instance=instance, + job_name=run_to_retry.job_name, + tags=run_to_retry.tags, + root_run_id=run_to_retry.run_id, + parent_run_id=run_to_retry.run_id, + ) + + # since there is a run in progress, the backfill should not be marked as complete, even though + # all targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + mark_run_successful(instance, retried_run) + + retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] + assert retried_run.status == DagsterRunStatus.SUCCESS + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS