From 8891639b7f2342e8ffb0aa0d4305346b8cd85a61 Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Thu, 5 Dec 2024 14:12:33 -0500 Subject: [PATCH] [backfill daemon run retries 3/n] retries of runs in completed backfills should not be considered part of the backfill (#25900) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary & Motivation If a run is retried after a backfill is complete, that run is given the backfill tag, but has no affect on the backfill itself. This can cause confusion. Imagine the scenario where a single asset-partition failed in a backfill. The backfill is complete and a user retries the failed asset and the retry succeeds. That retried run will show up in the list of runs for the backfill, but the status in the overview tab for partition will still be failed since the status is locked when the backfill completes. We should be more strict about when run retries are considered part of the backfill. We decided in https://github.com/dagster-io/internal/discussions/12460 that retries that are launched while the backfill is in progress will be part of the backfill, but that retries that are launched after the backfill is complete should not be considered part of the backfill. To make this change we need to remove the backfill tag from retried runs if the backfill is not in progress. ## How I Tested These Changes new unit tests manually launched a retry of a run that was launched by a backfill after the backfill was complete. no backfill tags were added Screenshot 2024-12-02 at 1 55 09 PM ## Changelog Manual retries of runs launched by backfills are no longer considered part of the backfill if the backfill is complete when the retry is launched. --- .../dagster/_core/instance/__init__.py | 29 +++++-- .../dagster/dagster/_core/storage/tags.py | 5 +- .../dagster_tests/daemon_tests/conftest.py | 17 ++-- .../daemon_tests/test_backfill.py | 86 +++++++++++++++++++ 4 files changed, 124 insertions(+), 13 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index faadb38006a6a..fb42ffe3b8c2a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -71,11 +71,13 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, + BACKFILL_ID_TAG, + BACKFILL_TAGS, PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, RESUME_RETRY_TAG, ROOT_RUN_ID_TAG, - TAGS_TO_OMIT_ON_RETRY, + TAGS_TO_MAYBE_OMIT_ON_RETRY, WILL_RETRY_TAG, ) from dagster._serdes import ConfigurableClass @@ -1633,6 +1635,7 @@ def create_reexecuted_run( run_config: Optional[Mapping[str, Any]] = None, use_parent_run_tags: bool = False, ) -> DagsterRun: + from dagster._core.execution.backfill import BulkActionStatus from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.execution.plan.state import KnownExecutionState from dagster._core.remote_representation import CodeLocation, RemoteJob @@ -1650,15 +1653,27 @@ def create_reexecuted_run( parent_run_id = parent_run.run_id # these can differ from remote_job.tags if tags were added at launch time - parent_run_tags = ( - {key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY} - if use_parent_run_tags - else {} - ) + parent_run_tags_to_include = {} + if use_parent_run_tags: + parent_run_tags_to_include = { + key: val + for key, val in parent_run.tags.items() + if key not in TAGS_TO_MAYBE_OMIT_ON_RETRY + } + # condition to determine whether to include BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, + # ROOT_BACKFILL_ID_TAG on retried run + if parent_run.tags.get(BACKFILL_ID_TAG) is not None: + # if the run was part of a backfill and the backfill is complete, we do not want the + # retry to be considered part of the backfill, so remove all backfill-related tags + backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG]) + if backfill and backfill.status == BulkActionStatus.REQUESTED: + for tag in BACKFILL_TAGS: + if parent_run.tags.get(tag) is not None: + parent_run_tags_to_include[tag] = parent_run.tags[tag] tags = merge_dicts( remote_job.tags, - parent_run_tags, + parent_run_tags_to_include, extra_tags or {}, { PARENT_RUN_ID_TAG: parent_run_id, diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index c510becec9772..7c22457b39ee5 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -108,12 +108,15 @@ RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval" RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics" +BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG} -TAGS_TO_OMIT_ON_RETRY = { + +TAGS_TO_MAYBE_OMIT_ON_RETRY = { *RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG, AUTO_RETRY_RUN_ID_TAG, + *BACKFILL_TAGS, } diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index edb4acfd36dbe..712318128e4d0 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex yield workspace_context -@pytest.fixture(name="remote_repo", scope="module") -def remote_repo_fixture( +@pytest.fixture(name="code_location", scope="module") +def code_location_fixture( workspace_context: WorkspaceProcessContext, -) -> Iterator[RemoteRepository]: - yield cast( +) -> CodeLocation: + return cast( CodeLocation, next( iter(workspace_context.create_request_context().get_code_location_entries().values()) ).code_location, - ).get_repository("the_repo") + ) + + +@pytest.fixture(name="remote_repo", scope="module") +def remote_repo_fixture( + code_location: CodeLocation, +) -> Iterator[RemoteRepository]: + yield code_location.get_repository("the_repo") def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 50077a20ce36e..934a84fe825c8 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -41,6 +41,7 @@ from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig from dagster._core.definitions.selector import ( + JobSubsetSelector, PartitionRangeSelector, PartitionsByAssetSelector, PartitionsSelector, @@ -53,11 +54,13 @@ get_asset_backfill_run_chunk_size, ) from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.remote_representation import ( InProcessCodeLocationOrigin, RemoteRepository, RemoteRepositoryOrigin, ) +from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, @@ -69,6 +72,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + BACKFILL_TAGS, MAX_RETRIES_TAG, PARTITION_NAME_TAG, ) @@ -3164,3 +3168,85 @@ def test_asset_backfill_retries_make_downstreams_runnable( backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0 ) + + +def test_run_retry_not_part_of_completed_backfill( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + code_location: CodeLocation, + remote_repo: RemoteRepository, +): + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + selector = JobSubsetSelector( + location_name=code_location.name, + repository_name=remote_repo.name, + job_name=run_to_retry.job_name, + asset_selection=run_to_retry.asset_selection, + op_selection=None, + ) + remote_job = code_location.get_job(selector) + retried_run = instance.create_reexecuted_run( + parent_run=run_to_retry, + code_location=code_location, + remote_job=remote_job, + strategy=ReexecutionStrategy.ALL_STEPS, + run_config=run_to_retry.run_config, + use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested + ) + + for tag in BACKFILL_TAGS: + assert tag not in retried_run.tags.keys() + + # Since the backfill is alerady complete, it should not be processed by the backfill daemon and + # should remain in a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + assert retried_run.run_id not in [ + r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) + ]