From 41d95a9f5ab53226624c442585cfaf557ba85a2e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Nov 2024 13:52:05 -0500 Subject: [PATCH] update to use tags and add logging --- .../dagster/_core/execution/asset_backfill.py | 69 +++++++++++-------- .../execution_tests/test_asset_backfill.py | 5 +- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index ece46d54c98ba..4b927ea22a6c8 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -62,13 +62,16 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + DID_RETRY_TAG, PARTITION_NAME_TAG, + WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id, toposort from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer +from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -930,12 +933,15 @@ def _check_validity_and_deserialize_asset_backfill_data( def backfill_is_complete( - backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance + backfill_id: str, + backfill_data: AssetBackfillData, + instance: DagsterInstance, + logger: logging.Logger, ): """A backfill is complete when: 1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). 2. there are no in progress runs for the backfill. - 3. there are no failed runs that will result in an automatic retry. + 3. there are no failed runs that will result in an automatic retry, but have not yet been retried. Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are @@ -946,16 +952,16 @@ def backfill_is_complete( daemon continues to update the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ - from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( - filter_runs_to_should_retry, - ) - - # conditions are in order of least expensive to most expensive to check + # Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill + # is not complete + if not backfill_data.all_targeted_partitions_have_materialization_status(): + logger.info( + "Not all targeted asset partitions have a materialization status. Backfill is still in progress." + ) + return False + # Condition 2 - if there are in progress runs for the backfill, the backfill is not complete if ( - # Condition 1 - all asset partitions in the target subset have a materialization state - backfill_data.all_targeted_partitions_have_materialization_status() - # Condition 2 - there are no in progress runs for the backfill - and len( + len( instance.get_run_ids( filters=RunsFilter( statuses=NOT_FINISHED_STATUSES, @@ -964,26 +970,28 @@ def backfill_is_complete( limit=1, ) ) - == 0 - # Condition 3 - there are no failed runs that will be retried - and len( - list( - filter_runs_to_should_retry( - instance.get_runs( - filters=RunsFilter( - tags={BACKFILL_ID_TAG: backfill_id}, - statuses=[DagsterRunStatus.FAILURE], - ) - ), - instance, - instance.run_retries_max_retries, + > 0 + ): + logger.info("Backfill has in progress runs. Backfill is still in progress.") + return False + # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete + if any( + [ + get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False) + and run.tags.get(DID_RETRY_TAG) is None + for run in instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill_id}, + statuses=[DagsterRunStatus.FAILURE], ) ) - ) - == 0 + ] ): - return True - return False + logger.info( + "Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress." + ) + return False + return True def execute_asset_backfill_iteration( @@ -1105,7 +1113,10 @@ def execute_asset_backfill_iteration( updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) if backfill_is_complete( - backfill_id=backfill.backfill_id, backfill_data=updated_backfill_data, instance=instance + backfill_id=backfill.backfill_id, + backfill_data=updated_backfill_data, + instance=instance, + logger=logger, ): if ( updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 6c5f183a3095f..3dd95442e52cc 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -614,7 +614,10 @@ def run_backfill_to_completion( ) while not backfill_is_complete( - backfill_id=backfill_id, backfill_data=backfill_data, instance=instance + backfill_id=backfill_id, + backfill_data=backfill_data, + instance=instance, + logger=logging.getLogger("fake_logger"), ): iteration_count += 1