Skip to content

Commit

Permalink
try test again
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 25, 2024
1 parent 752e0d9 commit 0c88250
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from dagster_graphql.client.query import (
LAUNCH_PARTITION_BACKFILL_MUTATION,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
)
from dagster_graphql.test.utils import (
execute_dagster_graphql,
Expand Down Expand Up @@ -2303,63 +2302,3 @@ def test_retry_successful_job_backfill(self, graphql_context):

assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

def test_run_retry_not_part_of_completed_backfill(self, graphql_context):
# TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill
repository_selector = infer_repository_selector(graphql_context)
result = execute_dagster_graphql(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": {
"repositorySelector": repository_selector,
"partitionSetName": "integers_partition_set",
},
"partitionNames": ["2", "3", "4", "5"],
}
},
)

assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_seed_runs(
graphql_context,
[
(DagsterRunStatus.SUCCESS, "5"),
(DagsterRunStatus.SUCCESS, "2"),
(DagsterRunStatus.SUCCESS, "3"),
(DagsterRunStatus.SUCCESS, "4"),
(DagsterRunStatus.SUCCESS, "5"),
(DagsterRunStatus.SUCCESS, "2"),
(DagsterRunStatus.FAILURE, "3"),
(DagsterRunStatus.SUCCESS, "4"),
],
backfill_id,
)

backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

failed_run = graphql_context.instance.get_runs(
filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])
)[0]

retry_run_result = execute_dagster_graphql(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={
"reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"}
},
)
assert not retry_run_result.errors
assert retry_run_result.data
assert (
retry_run_result.data["launchPipelineReexecution"]["__typename"]
== "LaunchPipelineReexecutionSuccess"
)
17 changes: 12 additions & 5 deletions python_modules/dagster/dagster_tests/daemon_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex
yield workspace_context


@pytest.fixture(name="remote_repo", scope="module")
def remote_repo_fixture(
@pytest.fixture(name="code_location", scope="module")
def code_location_fixture(
workspace_context: WorkspaceProcessContext,
) -> Iterator[RemoteRepository]:
yield cast(
) -> CodeLocation:
return cast(
CodeLocation,
next(
iter(workspace_context.create_request_context().get_code_location_entries().values())
).code_location,
).get_repository("the_repo")
)


@pytest.fixture(name="remote_repo", scope="module")
def remote_repo_fixture(
code_location: CodeLocation,
) -> Iterator[RemoteRepository]:
yield code_location.get_repository("the_repo")


def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin:
Expand Down
53 changes: 19 additions & 34 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
get_asset_backfill_run_chunk_size,
)
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.remote_representation import (
InProcessCodeLocationOrigin,
RemoteRepository,
RemoteRepositoryOrigin,
)
from dagster._core.remote_representation.code_location import CodeLocation
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.dagster_run import (
IN_PROGRESS_RUN_STATUSES,
Expand All @@ -69,6 +71,7 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
BACKFILL_TAGS,
MAX_RETRIES_TAG,
PARTITION_NAME_TAG,
)
Expand Down Expand Up @@ -3169,9 +3172,9 @@ def test_asset_backfill_retries_make_downstreams_runnable(
def test_run_retry_not_part_of_completed_backfill(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
code_location: CodeLocation,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
Expand Down Expand Up @@ -3215,47 +3218,29 @@ def test_run_retry_not_part_of_completed_backfill(
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# manual retry of a run
instance.create_reexecuted_run()

# simulate a retry of a run
run_to_retry = instance.get_runs()[0]
retried_run = create_run_for_test(
instance=instance,
job_name=run_to_retry.job_name,
tags=run_to_retry.tags,
root_run_id=run_to_retry.run_id,
parent_run_id=run_to_retry.run_id,
remote_job = remote_repo.get_full_job(run_to_retry.job_name)
retried_run = instance.create_reexecuted_run(
parent_run=run_to_retry,
code_location=code_location,
remote_job=remote_job,
strategy=ReexecutionStrategy.ALL_STEPS,
)

# since there is a run in progress, the backfill should not be marked as complete, even though
# all targeted asset partitions have a completed state
for tag in BACKFILL_TAGS:
assert tag not in retried_run.tags.keys()

# Since the backfill is alerady complete, it should not be processed by the backfill daemon and
# should remain in a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.status == BulkActionStatus.REQUESTED

# manually mark the run as successful to show that the backfill will be marked as complete
# since there are no in progress runs
instance.handle_new_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=retried_run.run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
event_type_value=DagsterEventType.RUN_SUCCESS.value,
job_name=retried_run.job_name,
),
)
)

retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0]
assert retried_run.status == DagsterRunStatus.SUCCESS
wait_for_all_runs_to_finish(instance, timeout=30)

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert retried_run.run_id not in [
r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id))
]

0 comments on commit 0c88250

Please sign in to comment.