Skip to content

Commit

Permalink
try add test
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 26, 2024
1 parent deb2fb1 commit 45ac4bf
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dagster_graphql.client.query import (
LAUNCH_PARTITION_BACKFILL_MUTATION,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
)
from dagster_graphql.test.utils import (
execute_dagster_graphql,
Expand Down Expand Up @@ -2302,3 +2303,63 @@ def test_retry_successful_job_backfill(self, graphql_context):

assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id

def test_run_retry_not_part_of_completed_backfill(self, graphql_context):
# TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill
repository_selector = infer_repository_selector(graphql_context)
result = execute_dagster_graphql(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": {
"repositorySelector": repository_selector,
"partitionSetName": "integers_partition_set",
},
"partitionNames": ["2", "3", "4", "5"],
}
},
)

assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

_seed_runs(
graphql_context,
[
(DagsterRunStatus.SUCCESS, "5"),
(DagsterRunStatus.SUCCESS, "2"),
(DagsterRunStatus.SUCCESS, "3"),
(DagsterRunStatus.SUCCESS, "4"),
(DagsterRunStatus.SUCCESS, "5"),
(DagsterRunStatus.SUCCESS, "2"),
(DagsterRunStatus.FAILURE, "3"),
(DagsterRunStatus.SUCCESS, "4"),
],
backfill_id,
)

backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

failed_run = graphql_context.instance.get_runs(
filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])
)[0]

retry_run_result = execute_dagster_graphql(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={
"reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"}
},
)
assert not retry_run_result.errors
assert retry_run_result.data
assert (
retry_run_result.data["launchPipelineReexecution"]["__typename"]
== "LaunchPipelineReexecutionSuccess"
)
8 changes: 6 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1661,9 +1661,13 @@ def create_reexecuted_run(
for key, val in parent_run.tags.items()
if key not in TAGS_TO_OMIT_ON_RETRY and key not in TAGS_TO_MAYBE_OMIT_ON_RETRY
}
# if the run was part of a backfill and the backfill is complete, we do not want the
# retry to be considered part of the backfill, so remove all backfill-related tags
# for all tags in TAGS_TO_MAYBE_OMIT_ON_RETRY, add a condition that determines
# whether the tag should be added to the retried run

# condition for BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG
if parent_run.tags.get(BACKFILL_ID_TAG) is not None:
# if the run was part of a backfill and the backfill is complete, we do not want the
# retry to be considered part of the backfill, so remove all backfill-related tags
backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG])
if backfill.status == BulkActionStatus.REQUESTED:
for tag in BACKFILL_TAGS:
Expand Down
95 changes: 95 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3164,3 +3164,98 @@ def test_asset_backfill_retries_make_downstreams_runnable(
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 0
)


def test_run_retry_not_part_of_completed_backfill(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_succeeded(instance, run, "reusable")
assert step_succeeded(instance, run, "bar")

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# manual retry of a run
instance.create_reexecuted_run()

# simulate a retry of a run
run_to_retry = instance.get_runs()[0]
retried_run = create_run_for_test(
instance=instance,
job_name=run_to_retry.job_name,
tags=run_to_retry.tags,
root_run_id=run_to_retry.run_id,
parent_run_id=run_to_retry.run_id,
)

# since there is a run in progress, the backfill should not be marked as complete, even though
# all targeted asset partitions have a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.status == BulkActionStatus.REQUESTED

# manually mark the run as successful to show that the backfill will be marked as complete
# since there are no in progress runs
instance.handle_new_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=retried_run.run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
event_type_value=DagsterEventType.RUN_SUCCESS.value,
job_name=retried_run.job_name,
),
)
)

retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0]
assert retried_run.status == DagsterRunStatus.SUCCESS

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

0 comments on commit 45ac4bf

Please sign in to comment.