diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index f645ec05d7c71..e3b5347f7eec5 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -37,6 +37,7 @@ from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, + LAUNCH_PIPELINE_REEXECUTION_MUTATION, ) from dagster_graphql.test.utils import ( execute_dagster_graphql, @@ -2302,3 +2303,63 @@ def test_retry_successful_job_backfill(self, graphql_context): assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_run_retry_not_part_of_completed_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill + repository_selector = infer_repository_selector(graphql_context) + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": { + "repositorySelector": repository_selector, + "partitionSetName": "integers_partition_set", + }, + "partitionNames": ["2", "3", "4", "5"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _seed_runs( + graphql_context, + [ + (DagsterRunStatus.SUCCESS, "5"), + (DagsterRunStatus.SUCCESS, "2"), + (DagsterRunStatus.SUCCESS, "3"), + (DagsterRunStatus.SUCCESS, "4"), + (DagsterRunStatus.SUCCESS, "5"), + (DagsterRunStatus.SUCCESS, "2"), + (DagsterRunStatus.FAILURE, "3"), + (DagsterRunStatus.SUCCESS, "4"), + ], + backfill_id, + ) + + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill( + backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS) + ) + + failed_run = graphql_context.instance.get_runs( + filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE]) + )[0] + + retry_run_result = execute_dagster_graphql( + graphql_context, + LAUNCH_PIPELINE_REEXECUTION_MUTATION, + variables={ + "reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"} + }, + ) + assert not retry_run_result.errors + assert retry_run_result.data + assert ( + retry_run_result.data["launchPipelineReexecution"]["__typename"] + == "LaunchPipelineReexecutionSuccess" + ) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index b8314e6241077..23555eedb0011 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1655,9 +1655,13 @@ def create_reexecuted_run( for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY and key not in TAGS_TO_MAYBE_OMIT_ON_RETRY } - # if the run was part of a backfill and the backfill is complete, we do not want the - # retry to be considered part of the backfill, so remove all backfill-related tags + # for all tags in TAGS_TO_MAYBE_OMIT_ON_RETRY, add a condition that determines + # whether the tag should be added to the retried run + + # condition for BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG if parent_run.tags.get(BACKFILL_ID_TAG) is not None: + # if the run was part of a backfill and the backfill is complete, we do not want the + # retry to be considered part of the backfill, so remove all backfill-related tags backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG]) if backfill.status == BulkActionStatus.REQUESTED: for tag in BACKFILL_TAGS: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 50077a20ce36e..cd2c386297dab 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3164,3 +3164,98 @@ def test_asset_backfill_retries_make_downstreams_runnable( backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0 ) + + +def test_run_retry_not_part_of_completed_backfill( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + # manual retry of a run + instance.create_reexecuted_run() + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + retried_run = create_run_for_test( + instance=instance, + job_name=run_to_retry.job_name, + tags=run_to_retry.tags, + root_run_id=run_to_retry.run_id, + parent_run_id=run_to_retry.run_id, + ) + + # since there is a run in progress, the backfill should not be marked as complete, even though + # all targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.asset_backfill_data + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.status == BulkActionStatus.REQUESTED + + # manually mark the run as successful to show that the backfill will be marked as complete + # since there are no in progress runs + instance.handle_new_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=retried_run.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.RUN_SUCCESS.value, + job_name=retried_run.job_name, + ), + ) + ) + + retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] + assert retried_run.status == DagsterRunStatus.SUCCESS + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS