Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 11, 2024
1 parent ea402c5 commit 489f7bc
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 17 deletions.
37 changes: 20 additions & 17 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from python_modules.dagster.dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
filter_runs_to_should_retry,
)

if TYPE_CHECKING:
from dagster._core.execution.backfill import PartitionBackfill
Expand Down Expand Up @@ -936,7 +933,7 @@ def backfill_is_complete(
backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance
):
"""A backfill is complete when:
1. all asset parttiions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry.
Expand All @@ -949,13 +946,17 @@ def backfill_is_complete(
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
from dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
filter_runs_to_should_retry,
)

# conditions are in order of least expensive to most expensive to check
if (
# Condition 1 - all asset partitions in the target subset have a materialization state
backfill_data.all_targeted_partitions_have_materialization_status()
# Condtition 2 - there are no in progress runs for the backfill
and len(
instance.get_runs_ids(
instance.get_run_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
tags={BACKFILL_ID_TAG: backfill_id},
Expand All @@ -966,15 +967,17 @@ def backfill_is_complete(
== 0
# Condition 3 - there are no failed runs that will be retried
and len(
filter_runs_to_should_retry(
instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
),
instance,
instance.run_retries_max_retries,
list(
filter_runs_to_should_retry(
instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
),
instance,
instance.run_retries_max_retries,
)
)
)
== 0
Expand Down Expand Up @@ -1270,7 +1273,7 @@ def get_canceling_asset_backfill_iteration_data(
" AssetGraphSubset object"
)

failed_subset, _ = AssetGraphSubset.from_asset_partition_set(
failed_subset = AssetGraphSubset.from_asset_partition_set(
set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)),
asset_graph,
)
Expand Down Expand Up @@ -1343,7 +1346,7 @@ def _get_failed_and_downstream_asset_partitions(
asset_graph: RemoteWorkspaceAssetGraph,
instance_queryer: CachingInstanceQueryer,
backfill_start_timestamp: float,
) -> Tuple[AssetGraphSubset, bool]:
) -> AssetGraphSubset:
failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set(
asset_graph.bfs_filter_asset_partitions(
instance_queryer,
Expand Down Expand Up @@ -1768,7 +1771,7 @@ def _get_failed_asset_partitions(
asset_graph: RemoteAssetGraph,
) -> Sequence[AssetKeyPartitionKey]:
"""Returns asset partitions that materializations were requested for as part of the backfill, but
were not successfully materialized.
will not be materialized.
Includes canceled asset partitions. Implementation assumes that successful runs won't have any
failed partitions.
Expand Down
72 changes: 72 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from dagster._core.test_utils import (
create_run_for_test,
environ,
mark_run_successful,
step_did_not_run,
step_failed,
step_succeeded,
Expand Down Expand Up @@ -2789,3 +2790,74 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_backfill_not_complete_until_retries_complete(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
remote_repo: RemoteRepository,
):
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill("run_retries_backfill")
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_succeeded(instance, run, "reusable")
assert step_succeeded(instance, run, "bar")

# simulate a retry of a run
run_to_retry = instance.get_runs()[0]
retried_run = create_run_for_test(
instance=instance,
job_name=run_to_retry.job_name,
tags=run_to_retry.tags,
root_run_id=run_to_retry.run_id,
parent_run_id=run_to_retry.run_id,
)

# since there is a run in progress, the backfill should not be marked as complete, even though
# all targeted asset partitions have a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

mark_run_successful(instance, retried_run)

retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0]
assert retried_run.status == DagsterRunStatus.SUCCESS

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

0 comments on commit 489f7bc

Please sign in to comment.