Skip to content

Commit

Permalink
[backfill daemon run retries 3/n] retries of runs in completed backfi…
Browse files Browse the repository at this point in the history
…lls should not be considered part of the backfill (#25900)

## Summary & Motivation
If a run is retried after a backfill is complete, that run is given the
backfill tag, but has no affect on the backfill itself. This can cause
confusion. Imagine the scenario where a single asset-partition failed in
a backfill. The backfill is complete and a user retries the failed asset
and the retry succeeds. That retried run will show up in the list of
runs for the backfill, but the status in the overview tab for partition
will still be failed since the status is locked when the backfill
completes.

We should be more strict about when run retries are considered part of
the backfill. We decided in
dagster-io/internal#12460 that retries
that are launched while the backfill is in progress will be part of the
backfill, but that retries that are launched after the backfill is
complete should not be considered part of the backfill.

To make this change we need to remove the backfill tag from retried runs
if the backfill is not in progress.

## How I Tested These Changes
new unit tests 

manually launched a retry of a run that was launched by a backfill after
the backfill was complete. no backfill tags were added
<img width="1037" alt="Screenshot 2024-12-02 at 1 55 09 PM"
src="https://github.com/user-attachments/assets/5bb8ae12-4c61-4fd4-8255-1d245ae43318">


## Changelog

Manual retries of runs launched by backfills are no longer considered
part of the backfill if the backfill is complete when the retry is
launched.
  • Loading branch information
jamiedemaria authored Dec 5, 2024
1 parent e8290c7 commit 8891639
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 13 deletions.
29 changes: 22 additions & 7 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
BACKFILL_TAGS,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
RESUME_RETRY_TAG,
ROOT_RUN_ID_TAG,
TAGS_TO_OMIT_ON_RETRY,
TAGS_TO_MAYBE_OMIT_ON_RETRY,
WILL_RETRY_TAG,
)
from dagster._serdes import ConfigurableClass
Expand Down Expand Up @@ -1633,6 +1635,7 @@ def create_reexecuted_run(
run_config: Optional[Mapping[str, Any]] = None,
use_parent_run_tags: bool = False,
) -> DagsterRun:
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.plan.state import KnownExecutionState
from dagster._core.remote_representation import CodeLocation, RemoteJob
Expand All @@ -1650,15 +1653,27 @@ def create_reexecuted_run(
parent_run_id = parent_run.run_id

# these can differ from remote_job.tags if tags were added at launch time
parent_run_tags = (
{key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY}
if use_parent_run_tags
else {}
)
parent_run_tags_to_include = {}
if use_parent_run_tags:
parent_run_tags_to_include = {
key: val
for key, val in parent_run.tags.items()
if key not in TAGS_TO_MAYBE_OMIT_ON_RETRY
}
# condition to determine whether to include BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG,
# ROOT_BACKFILL_ID_TAG on retried run
if parent_run.tags.get(BACKFILL_ID_TAG) is not None:
# if the run was part of a backfill and the backfill is complete, we do not want the
# retry to be considered part of the backfill, so remove all backfill-related tags
backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG])
if backfill and backfill.status == BulkActionStatus.REQUESTED:
for tag in BACKFILL_TAGS:
if parent_run.tags.get(tag) is not None:
parent_run_tags_to_include[tag] = parent_run.tags[tag]

tags = merge_dicts(
remote_job.tags,
parent_run_tags,
parent_run_tags_to_include,
extra_tags or {},
{
PARENT_RUN_ID_TAG: parent_run_id,
Expand Down
5 changes: 4 additions & 1 deletion python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@
RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval"
RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics"

BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG}

TAGS_TO_OMIT_ON_RETRY = {

TAGS_TO_MAYBE_OMIT_ON_RETRY = {
*RUN_METRIC_TAGS,
RUN_FAILURE_REASON_TAG,
WILL_RETRY_TAG,
AUTO_RETRY_RUN_ID_TAG,
*BACKFILL_TAGS,
}


Expand Down
17 changes: 12 additions & 5 deletions python_modules/dagster/dagster_tests/daemon_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex
yield workspace_context


@pytest.fixture(name="remote_repo", scope="module")
def remote_repo_fixture(
@pytest.fixture(name="code_location", scope="module")
def code_location_fixture(
workspace_context: WorkspaceProcessContext,
) -> Iterator[RemoteRepository]:
yield cast(
) -> CodeLocation:
return cast(
CodeLocation,
next(
iter(workspace_context.create_request_context().get_code_location_entries().values())
).code_location,
).get_repository("the_repo")
)


@pytest.fixture(name="remote_repo", scope="module")
def remote_repo_fixture(
code_location: CodeLocation,
) -> Iterator[RemoteRepository]:
yield code_location.get_repository("the_repo")


def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin:
Expand Down
86 changes: 86 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig
from dagster._core.definitions.selector import (
JobSubsetSelector,
PartitionRangeSelector,
PartitionsByAssetSelector,
PartitionsSelector,
Expand All @@ -53,11 +54,13 @@
get_asset_backfill_run_chunk_size,
)
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.remote_representation import (
InProcessCodeLocationOrigin,
RemoteRepository,
RemoteRepositoryOrigin,
)
from dagster._core.remote_representation.code_location import CodeLocation
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.dagster_run import (
IN_PROGRESS_RUN_STATUSES,
Expand All @@ -69,6 +72,7 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
BACKFILL_TAGS,
MAX_RETRIES_TAG,
PARTITION_NAME_TAG,
)
Expand Down Expand Up @@ -3164,3 +3168,85 @@ def test_asset_backfill_retries_make_downstreams_runnable(
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 0
)


def test_run_retry_not_part_of_completed_backfill(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
code_location: CodeLocation,
remote_repo: RemoteRepository,
):
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_succeeded(instance, run, "reusable")
assert step_succeeded(instance, run, "bar")

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# simulate a retry of a run
run_to_retry = instance.get_runs()[0]
selector = JobSubsetSelector(
location_name=code_location.name,
repository_name=remote_repo.name,
job_name=run_to_retry.job_name,
asset_selection=run_to_retry.asset_selection,
op_selection=None,
)
remote_job = code_location.get_job(selector)
retried_run = instance.create_reexecuted_run(
parent_run=run_to_retry,
code_location=code_location,
remote_job=remote_job,
strategy=ReexecutionStrategy.ALL_STEPS,
run_config=run_to_retry.run_config,
use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested
)

for tag in BACKFILL_TAGS:
assert tag not in retried_run.tags.keys()

# Since the backfill is alerady complete, it should not be processed by the backfill daemon and
# should remain in a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

assert retried_run.run_id not in [
r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id))
]

0 comments on commit 8891639

Please sign in to comment.