diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index ad9b4f01cb1e0..934a84fe825c8 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -41,6 +41,7 @@ from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig from dagster._core.definitions.selector import ( + JobSubsetSelector, PartitionRangeSelector, PartitionsByAssetSelector, PartitionsSelector, @@ -3219,12 +3220,21 @@ def test_run_retry_not_part_of_completed_backfill( # simulate a retry of a run run_to_retry = instance.get_runs()[0] - remote_job = remote_repo.get_full_job(run_to_retry.job_name) + selector = JobSubsetSelector( + location_name=code_location.name, + repository_name=remote_repo.name, + job_name=run_to_retry.job_name, + asset_selection=run_to_retry.asset_selection, + op_selection=None, + ) + remote_job = code_location.get_job(selector) retried_run = instance.create_reexecuted_run( parent_run=run_to_retry, code_location=code_location, remote_job=remote_job, strategy=ReexecutionStrategy.ALL_STEPS, + run_config=run_to_retry.run_config, + use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested ) for tag in BACKFILL_TAGS: @@ -3237,8 +3247,6 @@ def test_run_retry_not_part_of_completed_backfill( assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS - wait_for_all_runs_to_finish(instance, timeout=30) - assert retried_run.run_id not in [ r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) ]