Skip to content

Commit

Permalink
test for auto retries
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 7, 2024
1 parent 048fab8 commit 0493541
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
MAX_RETRIES_TAG,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
ROOT_RUN_ID_TAG,
)
from dagster._core.test_utils import (
create_run_for_test,
Expand Down Expand Up @@ -229,6 +232,11 @@ def bar(a1):
return a1


@asset(partitions_def=static_partitions)
def always_fails():
raise Exception("I always fail")


@asset(
config_schema={"myparam": Field(str, description="YYYY-MM-DD")},
)
Expand Down Expand Up @@ -426,6 +434,7 @@ def the_repo():
ab1,
ab2,
define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions),
always_fails,
# baz is a configurable asset which has no dependencies
baz,
asset_a,
Expand Down Expand Up @@ -2861,3 +2870,70 @@ def test_asset_backfill_not_complete_until_retries_complete(
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("always_fails")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill("run_retries_backfill")
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_failed(instance, run, "always_fails")

# since the failed runs should have automatic retries launched for them, the backfill should not
# be considered complete, even though the targeted asset partitions have a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

# automatic retries wont get kicked off in test environment, so we manually create them and mark them completed
runs = instance.get_runs()
for run in runs:
retried_run = create_run_for_test(
instance=instance,
job_name=run.job_name,
tags={**run.tags, ROOT_RUN_ID_TAG: run.run_id, PARENT_RUN_ID_TAG: run.run_id},
root_run_id=run.run_id,
parent_run_id=run.run_id,
)
mark_run_successful(instance, retried_run)

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_FAILED

0 comments on commit 0493541

Please sign in to comment.