Skip to content

Commit

Permalink
update to use tags and add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 25, 2024
1 parent 6a6950a commit 41d95a9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 30 deletions.
69 changes: 40 additions & 29 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
DID_RETRY_TAG,
PARTITION_NAME_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id, toposort
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.execution.backfill import PartitionBackfill
Expand Down Expand Up @@ -930,12 +933,15 @@ def _check_validity_and_deserialize_asset_backfill_data(


def backfill_is_complete(
backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance
backfill_id: str,
backfill_data: AssetBackfillData,
instance: DagsterInstance,
logger: logging.Logger,
):
"""A backfill is complete when:
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry.
3. there are no failed runs that will result in an automatic retry, but have not yet been retried.
Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
Expand All @@ -946,16 +952,16 @@ def backfill_is_complete(
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
from dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
filter_runs_to_should_retry,
)

# conditions are in order of least expensive to most expensive to check
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill
# is not complete
if not backfill_data.all_targeted_partitions_have_materialization_status():
logger.info(
"Not all targeted asset partitions have a materialization status. Backfill is still in progress."
)
return False
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete
if (
# Condition 1 - all asset partitions in the target subset have a materialization state
backfill_data.all_targeted_partitions_have_materialization_status()
# Condition 2 - there are no in progress runs for the backfill
and len(
len(
instance.get_run_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
Expand All @@ -964,26 +970,28 @@ def backfill_is_complete(
limit=1,
)
)
== 0
# Condition 3 - there are no failed runs that will be retried
and len(
list(
filter_runs_to_should_retry(
instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
),
instance,
instance.run_retries_max_retries,
> 0
):
logger.info("Backfill has in progress runs. Backfill is still in progress.")
return False
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
if any(
[
get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False)
and run.tags.get(DID_RETRY_TAG) is None
for run in instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
)
)
== 0
]
):
return True
return False
logger.info(
"Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress."
)
return False
return True


def execute_asset_backfill_iteration(
Expand Down Expand Up @@ -1105,7 +1113,10 @@ def execute_asset_backfill_iteration(
updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)

if backfill_is_complete(
backfill_id=backfill.backfill_id, backfill_data=updated_backfill_data, instance=instance
backfill_id=backfill.backfill_id,
backfill_data=updated_backfill_data,
instance=instance,
logger=logger,
):
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,10 @@ def run_backfill_to_completion(
)

while not backfill_is_complete(
backfill_id=backfill_id, backfill_data=backfill_data, instance=instance
backfill_id=backfill_id,
backfill_data=backfill_data,
instance=instance,
logger=logging.getLogger("fake_logger"),
):
iteration_count += 1

Expand Down

0 comments on commit 41d95a9

Please sign in to comment.