From eb2dd112baeb7f24e37edbdcad944166a244ef5a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 13 Nov 2024 11:02:24 -0500 Subject: [PATCH] update canceling logic for failed subset --- .../dagster/dagster/_core/execution/asset_backfill.py | 9 ++++++++- .../dagster/dagster_tests/daemon_tests/test_backfill.py | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 36392c303b2b9..19b45b438fe1a 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1295,12 +1295,19 @@ def get_canceling_asset_backfill_iteration_data( ), asset_graph, ) + # we fetch the failed_subset to get any new assets that have failed and add that to the set of + # assets we alerady know failed and their downstreams. However we need to remove any assets in + # updated_materialized_subset to account for the case where a run retry successfully + # materialized a previlusly failed asset. + updated_failed_subset = ( + asset_backfill_data.failed_and_downstream_subset | failed_subset + ) - updated_materialized_subset updated_backfill_data = AssetBackfillData( target_subset=asset_backfill_data.target_subset, latest_storage_id=asset_backfill_data.latest_storage_id, requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots, materialized_subset=updated_materialized_subset, - failed_and_downstream_subset=failed_subset, + failed_and_downstream_subset=updated_failed_subset, requested_subset=asset_backfill_data.requested_subset, backfill_start_time=TimestampWithTimezone(backfill_start_timestamp, "UTC"), ) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 4815314534e8a..50077a20ce36e 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3159,6 +3159,7 @@ def test_asset_backfill_retries_make_downstreams_runnable( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + assert backfill.asset_backfill_data assert ( backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0