Skip to content

Commit

Permalink
re-org
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 25, 2024
1 parent 0a34ecf commit 900b1eb
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 71 deletions.
76 changes: 75 additions & 1 deletion python_modules/dagster/dagster/_core/execution/retries.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import warnings
from collections import defaultdict
from enum import Enum
from typing import Mapping, Optional
from typing import TYPE_CHECKING, Mapping, Optional

from dagster import (
Field,
Selector,
_check as check,
)
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._core.storage.tags import (
MAX_RETRIES_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
RUN_FAILURE_REASON_TAG,
)
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.instance import DagsterInstance


def get_retries_config():
Expand Down Expand Up @@ -69,3 +80,66 @@ def mark_attempt(self, key: str) -> None:

def snapshot_attempts(self) -> Mapping[str, int]:
return dict(self._attempts)


def auto_reexecution_should_retry_run(instance: "DagsterInstance", run: DagsterRun):
"""Determines if a run will be retried by the automatic reexcution system.
A run will retry if:
- it is failed.
- the number of max allowed retries is > 0 (max retries can be set via system setting or run tag).
- there have not already been >= max_retries retries for the run.
If the run failure reason was a step failure and the retry_on_asset_or_op_failure tag/system setting is set to false,
a warning message will be logged and the run will not be retried.
We determine how many retries have been launched for the run by looking at the size of the run group
(the set of runs that have the same root_run_id and the run with root_run_id). Since manually launched retries are
part of the run group, this means that if a user launches a manual retry of run A and then this function
is called because a retry for run A launched by the auto-reexecution system failed, the manual retry will be
counted toward max_retries.
There is potential that one "extra" retry will be launched by the automatic reexecution system
since manual retries could be happening in parallel with automatic retries. Here is
an illustrative example:
- Max retries is 3
- Run A fails
- The automatic reexecution system launches a retry of run A (A_1), which fails
- The automatic reexecution system launches a retry run A_1 (A_2), which fails
- This function is executing and has fetched the run_group for run A_2: (A, A_1, A_2)
- A user launches a manual retry of run A (A_m). The run group is now (A, A_1, A_2, A_m), but this function does
not have the updated run group
- Since the run group we've fetched is (A, A_1, A_2), this function will mark A_2 as `will_retry=true` and
run `A_3` will be launched. This is the "extra" retry, since usually manual retries are counted toward max_retries, but
in this case it was not.
"""
from dagster._core.events import RunFailureReason

if run.status == DagsterRunStatus.FAILURE:
raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG)
if raw_max_retries_tag is None:
max_retries = instance.run_retries_max_retries
else:
try:
max_retries = int(raw_max_retries_tag)
except ValueError:
warnings.warn(f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run.")
return False
if max_retries > 0:
run_group = instance.get_run_group(run.run_id)
if run_group is not None:
_, run_group_iter = run_group
# since the original run is in the run group, the number of retries launched
# so far is len(run_group_iter) - 1
if len(list(run_group_iter)) <= max_retries:
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=instance.run_retries_retry_on_asset_or_op_failure,
)
if (
run.tags.get(RUN_FAILURE_REASON_TAG) == RunFailureReason.STEP_FAILURE.value
and not retry_on_asset_or_op_failure
):
return False
else:
return True
return False
73 changes: 3 additions & 70 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
DagsterRunAlreadyExists,
DagsterRunConflict,
)
from dagster._core.execution.retries import auto_reexecution_should_retry_run
from dagster._core.instance.config import (
DAGSTER_CONFIG_YAML_FILENAME,
DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT,
Expand All @@ -70,13 +71,10 @@
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
MAX_RETRIES_TAG,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
RESUME_RETRY_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
ROOT_RUN_ID_TAG,
RUN_FAILURE_REASON_TAG,
TAGS_TO_OMIT_ON_RETRY,
WILL_RETRY_TAG,
)
Expand All @@ -85,7 +83,6 @@
from dagster._utils import PrintFn, is_uuid, traced
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.merger import merge_dicts
from dagster._utils.tags import get_boolean_tag_value
from dagster._utils.warnings import disable_dagster_warnings, experimental_warning

# 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the
Expand Down Expand Up @@ -2399,71 +2396,6 @@ def get_handlers(self) -> Sequence[logging.Handler]:
def store_event(self, event: "EventLogEntry") -> None:
self._event_storage.store_event(event)

def should_retry_run(self, run: DagsterRun) -> bool:
"""Determines if a run will be retried by the automatic reexcution system.
A run will retry if:
- it is failed.
- the number of max allowed retries is > 0 (max retries can be set via system setting or run tag).
- there have not already been >= max_retries retries for the run.
If the run failure reason was a step failure and the retry_on_asset_or_op_failure tag/system setting is set to false,
a warning message will be logged and the run will not be retried.
We determine how many retries have been launched for the run by looking at the size of the run group
(the set of runs that have the same root_run_id and the run with root_run_id). Since manually launched retries are
given a root_run_id, this means that if a user launches a manual retry of run A and then this function
is called because a retry for run A launched by the auto-reexecution system failed, the manual retry will be
counted toward max_retries.
There is potential that one "extra" retry will be launched by the automatic reexecution system
since manual retries could be happening in parallel with automatic retries. Here is
an illustrative example:
- Max retries is 3
- Run A fails
- The automatic reexecution system launches a retry of run A (A_1), which fails
- The automatic reexecution system launches a retry run A_1 (A_2), which fails
- This function is executing and has fetched the run_group for run A_2: (A, A_1, A_2)
- A user launches a manual retry of run A (A_m). The run group is now (A, A_1, A_2, A_m), but this function does
not have the updated run group
- Since the run group we've fetched is (A, A_1, A_2), this function will mark A_2 as `will_retry=true` and
run `A_3` will be launched. This is the "extra" retry, since usually manual retries are counted toward max_retries, but
in this case it was not.
"""
from dagster._core.events import RunFailureReason

if run.status == DagsterRunStatus.FAILURE:
raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG)
if raw_max_retries_tag is None:
max_retries = self.run_retries_max_retries
else:
try:
max_retries = int(raw_max_retries_tag)
except ValueError:
warnings.warn(
f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run."
)
return False
if max_retries > 0:
run_group = self.get_run_group(run.run_id)
if run_group is not None:
_, run_group_iter = run_group
# since the original run is in the run group, the number of retries launched
# so far is len(run_group_iter) - 1
if len(list(run_group_iter)) <= max_retries:
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=self.run_retries_retry_on_asset_or_op_failure,
)
if (
run.tags.get(RUN_FAILURE_REASON_TAG)
== RunFailureReason.STEP_FAILURE.value
and not retry_on_asset_or_op_failure
):
return False
else:
return True
return False

def handle_new_event(
self,
event: "EventLogEntry",
Expand Down Expand Up @@ -2526,7 +2458,8 @@ def handle_new_event(
# Note that this tag is only applied to runs that fail. Successful runs will not
# have a WILL_RETRY_TAG tag.
self.add_run_tags(
run_id, {WILL_RETRY_TAG: str(self.should_retry_run(run)).lower()}
run_id,
{WILL_RETRY_TAG: str(auto_reexecution_should_retry_run(self, run)).lower()},
)
for sub in self._subscribers[run_id]:
sub(event)
Expand Down

0 comments on commit 900b1eb

Please sign in to comment.