Skip to content

Commit

Permalink
update how we determine backfill completion to account for retried runs
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 6, 2024
1 parent b7dafbd commit 0561ad1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
76 changes: 65 additions & 11 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
IN_PROGRESS_RUN_STATUSES,
NOT_FINISHED_STATUSES,
DagsterRunStatus,
RunsFilter,
)
Expand All @@ -69,6 +70,9 @@
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from python_modules.dagster.dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
filter_runs_to_should_retry,
)

if TYPE_CHECKING:
from dagster._core.execution.backfill import PartitionBackfill
Expand Down Expand Up @@ -167,11 +171,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)

def is_complete(self) -> bool:
def all_targeted_partitions_have_materialization_status(self) -> bool:
"""The asset backfill is complete when all runs to be requested have finished (success,
failure, or cancellation). Since the AssetBackfillData object stores materialization states
per asset partition, the daemon continues to update the backfill data until all runs have
finished in order to display the final partition statuses in the UI.
per asset partition, we can use the materialization states and whether any runs for the backfill are
not finished to determine if the backfill is complete. We want the daemon to continue to update
the backfill data until all runs have finished in order to display the final partition statuses in the UI.
"""
return (
(
Expand Down Expand Up @@ -927,6 +932,57 @@ def _check_validity_and_deserialize_asset_backfill_data(
return asset_backfill_data


def backfill_is_complete(
backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance
):
"""A backfill is complete when:
1. all asset parttiions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry.
Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
in progress. Condition 3 guards against a race condition where a failed run could be automatically retried
but it was not added into the queue in time to be caught by condition 2.
Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
# conditions are in order of least expensive to most expensive to check
if (
# Condition 1 - all asset partitions in the target subset have a materialization state
backfill_data.all_targeted_partitions_have_materialization_status()
# Condtition 2 - there are no in progress runs for the backfill
and len(
instance.get_runs_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
tags={"tags": {BACKFILL_ID_TAG: backfill_id}},
),
limit=1,
)
)
== 0
# Condition 3 - there are no failed runs that will be retried
and len(
filter_runs_to_should_retry(
instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
),
instance,
instance.run_retries_max_retries,
)
)
== 0
):
return True
return False


def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
Expand Down Expand Up @@ -1045,11 +1101,9 @@ def execute_asset_backfill_iteration(

updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)

if updated_backfill_data.is_complete():
# The asset backfill is complete when all runs to be requested have finished (success,
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
# per asset partition, the daemon continues to update the backfill data until all runs have
# finished in order to display the final partition statuses in the UI.
if backfill_is_complete(
backfill_id=backfill.backfill_id, backfill_data=updated_backfill_data, instance=instance
):
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
Expand Down Expand Up @@ -1216,7 +1270,7 @@ def get_canceling_asset_backfill_iteration_data(
" AssetGraphSubset object"
)

failed_subset = AssetGraphSubset.from_asset_partition_set(
failed_subset, _ = AssetGraphSubset.from_asset_partition_set(
set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)),
asset_graph,
)
Expand Down Expand Up @@ -1289,7 +1343,7 @@ def _get_failed_and_downstream_asset_partitions(
asset_graph: RemoteWorkspaceAssetGraph,
instance_queryer: CachingInstanceQueryer,
backfill_start_timestamp: float,
) -> AssetGraphSubset:
) -> Tuple[AssetGraphSubset, bool]:
failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set(
asset_graph.bfs_filter_asset_partitions(
instance_queryer,
Expand Down Expand Up @@ -1714,7 +1768,7 @@ def _get_failed_asset_partitions(
asset_graph: RemoteAssetGraph,
) -> Sequence[AssetKeyPartitionKey]:
"""Returns asset partitions that materializations were requested for as part of the backfill, but
will not be materialized.
were not successfully materialized.
Includes canceled asset partitions. Implementation assumes that successful runs won't have any
failed partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
AssetBackfillData,
AssetBackfillIterationResult,
AssetBackfillStatus,
backfill_is_complete,
execute_asset_backfill_iteration_inner,
get_canceling_asset_backfill_iteration_data,
)
Expand Down Expand Up @@ -612,7 +613,9 @@ def run_backfill_to_completion(
evaluation_time=backfill_data.backfill_start_datetime,
)

while not backfill_data.is_complete():
while not backfill_is_complete(
backfill_id=backfill_id, backfill_data=backfill_data, instance=instance
):
iteration_count += 1

result1 = execute_asset_backfill_iteration_consume_generator(
Expand All @@ -622,7 +625,6 @@ def run_backfill_to_completion(
instance=instance,
)

# iteration_count += 1
assert result1.backfill_data != backfill_data

instance_queryer = _get_instance_queryer(
Expand Down

0 comments on commit 0561ad1

Please sign in to comment.