Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 27, 2024
1 parent daae492 commit 08e7927
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions python_modules/dagster/dagster/_core/execution/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def auto_reexecution_should_retry_run(instance: "DagsterInstance", run: DagsterR
is called because a retry for run A launched by the auto-reexecution system failed, the manual retry will be
counted toward max_retries.
There is potential that one "extra" retry will be launched by the automatic reexecution system
It is unlikely, but possible, that one "extra" retry will be launched by the automatic reexecution system
since manual retries could be happening in parallel with automatic retries. Here is
an illustrative example:
- Max retries is 3
Expand All @@ -111,35 +111,39 @@ def auto_reexecution_should_retry_run(instance: "DagsterInstance", run: DagsterR
- Since the run group we've fetched is (A, A_1, A_2), this function will mark A_2 as `will_retry=true` and
run `A_3` will be launched. This is the "extra" retry, since usually manual retries are counted toward max_retries, but
in this case it was not.
We think this is an acceptable tradeoff to make since the automatic reexecution system won't launch more than max_retries
run itself, just that max_retries + 1 runs could be launched in total if a manual retry is timed to cause this condition (unlikely).
"""
from dagster._core.events import RunFailureReason

if run.status == DagsterRunStatus.FAILURE:
raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG)
if raw_max_retries_tag is None:
max_retries = instance.run_retries_max_retries
else:
try:
max_retries = int(raw_max_retries_tag)
except ValueError:
warnings.warn(f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run.")
return False
if max_retries > 0:
run_group = instance.get_run_group(run.run_id)
if run_group is not None:
_, run_group_iter = run_group
# since the original run is in the run group, the number of retries launched
# so far is len(run_group_iter) - 1
if len(list(run_group_iter)) <= max_retries:
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=instance.run_retries_retry_on_asset_or_op_failure,
)
if (
run.tags.get(RUN_FAILURE_REASON_TAG) == RunFailureReason.STEP_FAILURE.value
and not retry_on_asset_or_op_failure
):
return False
else:
return True
return False
if run.status != DagsterRunStatus.FAILURE:
return False

raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG)
if raw_max_retries_tag is None:
max_retries = instance.run_retries_max_retries
else:
try:
max_retries = int(raw_max_retries_tag)
except ValueError:
warnings.warn(f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run.")
return False
if max_retries > 0:
run_group = instance.get_run_group(run.run_id)
if run_group is not None:
_, run_group_iter = run_group
# since the original run is in the run group, the number of retries launched
# so far is len(run_group_iter) - 1
if len(list(run_group_iter)) <= max_retries:
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=instance.run_retries_retry_on_asset_or_op_failure,
)
if (
run.tags.get(RUN_FAILURE_REASON_TAG) == RunFailureReason.STEP_FAILURE.value
and not retry_on_asset_or_op_failure
):
return False
else:
return True

0 comments on commit 08e7927

Please sign in to comment.