diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index e3b5347f7eec5..f645ec05d7c71 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -37,7 +37,6 @@ from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, - LAUNCH_PIPELINE_REEXECUTION_MUTATION, ) from dagster_graphql.test.utils import ( execute_dagster_graphql, @@ -2303,63 +2302,3 @@ def test_retry_successful_job_backfill(self, graphql_context): assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id - - def test_run_retry_not_part_of_completed_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill - repository_selector = infer_repository_selector(graphql_context) - result = execute_dagster_graphql( - graphql_context, - LAUNCH_PARTITION_BACKFILL_MUTATION, - variables={ - "backfillParams": { - "selector": { - "repositorySelector": repository_selector, - "partitionSetName": "integers_partition_set", - }, - "partitionNames": ["2", "3", "4", "5"], - } - }, - ) - - assert not result.errors - assert result.data - assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" - backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - - _seed_runs( - graphql_context, - [ - (DagsterRunStatus.SUCCESS, "5"), - (DagsterRunStatus.SUCCESS, "2"), - (DagsterRunStatus.SUCCESS, "3"), - (DagsterRunStatus.SUCCESS, "4"), - (DagsterRunStatus.SUCCESS, "5"), - (DagsterRunStatus.SUCCESS, "2"), - (DagsterRunStatus.FAILURE, "3"), - (DagsterRunStatus.SUCCESS, "4"), - ], - backfill_id, - ) - - backfill = graphql_context.instance.get_backfill(backfill_id) - graphql_context.instance.update_backfill( - backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS) - ) - - failed_run = graphql_context.instance.get_runs( - filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE]) - )[0] - - retry_run_result = execute_dagster_graphql( - graphql_context, - LAUNCH_PIPELINE_REEXECUTION_MUTATION, - variables={ - "reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"} - }, - ) - assert not retry_run_result.errors - assert retry_run_result.data - assert ( - retry_run_result.data["launchPipelineReexecution"]["__typename"] - == "LaunchPipelineReexecutionSuccess" - ) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index edb4acfd36dbe..712318128e4d0 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex yield workspace_context -@pytest.fixture(name="remote_repo", scope="module") -def remote_repo_fixture( +@pytest.fixture(name="code_location", scope="module") +def code_location_fixture( workspace_context: WorkspaceProcessContext, -) -> Iterator[RemoteRepository]: - yield cast( +) -> CodeLocation: + return cast( CodeLocation, next( iter(workspace_context.create_request_context().get_code_location_entries().values()) ).code_location, - ).get_repository("the_repo") + ) + + +@pytest.fixture(name="remote_repo", scope="module") +def remote_repo_fixture( + code_location: CodeLocation, +) -> Iterator[RemoteRepository]: + yield code_location.get_repository("the_repo") def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index cd2c386297dab..a0d817a0a3de9 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -53,11 +53,13 @@ get_asset_backfill_run_chunk_size, ) from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.remote_representation import ( InProcessCodeLocationOrigin, RemoteRepository, RemoteRepositoryOrigin, ) +from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, @@ -69,6 +71,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + BACKFILL_TAGS, MAX_RETRIES_TAG, PARTITION_NAME_TAG, ) @@ -3169,9 +3172,9 @@ def test_asset_backfill_retries_make_downstreams_runnable( def test_run_retry_not_part_of_completed_backfill( instance: DagsterInstance, workspace_context: WorkspaceProcessContext, + code_location: CodeLocation, remote_repo: RemoteRepository, ): - del remote_repo backfill_id = "run_retries_backfill" partition_keys = static_partitions.get_partition_keys() asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] @@ -3215,47 +3218,29 @@ def test_run_retry_not_part_of_completed_backfill( assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS # manual retry of a run - instance.create_reexecuted_run() # simulate a retry of a run run_to_retry = instance.get_runs()[0] - retried_run = create_run_for_test( - instance=instance, - job_name=run_to_retry.job_name, - tags=run_to_retry.tags, - root_run_id=run_to_retry.run_id, - parent_run_id=run_to_retry.run_id, + remote_job = remote_repo.get_full_job(run_to_retry.job_name) + retried_run = instance.create_reexecuted_run( + parent_run=run_to_retry, + code_location=code_location, + remote_job=remote_job, + strategy=ReexecutionStrategy.ALL_STEPS, ) - # since there is a run in progress, the backfill should not be marked as complete, even though - # all targeted asset partitions have a completed state + for tag in BACKFILL_TAGS: + assert tag not in retried_run.tags.keys() + + # Since the backfill is alerady complete, it should not be processed by the backfill daemon and + # should remain in a completed state list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill - assert backfill.asset_backfill_data - assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() assert backfill.status == BulkActionStatus.REQUESTED - # manually mark the run as successful to show that the backfill will be marked as complete - # since there are no in progress runs - instance.handle_new_event( - EventLogEntry( - error_info=None, - level="debug", - user_message="", - run_id=retried_run.run_id, - timestamp=time.time(), - dagster_event=DagsterEvent( - event_type_value=DagsterEventType.RUN_SUCCESS.value, - job_name=retried_run.job_name, - ), - ) - ) - - retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] - assert retried_run.status == DagsterRunStatus.SUCCESS + wait_for_all_runs_to_finish(instance, timeout=30) - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) - backfill = instance.get_backfill(backfill_id) - assert backfill - assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + assert retried_run.run_id not in [ + r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) + ]