diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index dbbe1a16d8ae4..f29367a9c5283 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -67,7 +67,10 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + MAX_RETRIES_TAG, + PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, + ROOT_RUN_ID_TAG, ) from dagster._core.test_utils import ( create_run_for_test, @@ -229,6 +232,11 @@ def bar(a1): return a1 +@asset(partitions_def=static_partitions) +def always_fails(): + raise Exception("I always fail") + + @asset( config_schema={"myparam": Field(str, description="YYYY-MM-DD")}, ) @@ -426,6 +434,7 @@ def the_repo(): ab1, ab2, define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions), + always_fails, # baz is a configurable asset which has no dependencies baz, asset_a, @@ -2861,3 +2870,70 @@ def test_asset_backfill_not_complete_until_retries_complete( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_not_complete_if_automatic_retry_could_happen( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("always_fails")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill("run_retries_backfill") + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_failed(instance, run, "always_fails") + + # since the failed runs should have automatic retries launched for them, the backfill should not + # be considered complete, even though the targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + # automatic retries wont get kicked off in test environment, so we manually create them and mark them completed + runs = instance.get_runs() + for run in runs: + retried_run = create_run_for_test( + instance=instance, + job_name=run.job_name, + tags={**run.tags, ROOT_RUN_ID_TAG: run.run_id, PARENT_RUN_ID_TAG: run.run_id}, + root_run_id=run.run_id, + parent_run_id=run.run_id, + ) + mark_run_successful(instance, retried_run) + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_FAILED