Skip to content

Commit

Permalink
modify so it just writes the tag
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 20, 2024
1 parent d7508b6 commit 05e24b1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 19 deletions.
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2415,7 +2415,7 @@ def should_retry_run(self, run: DagsterRun) -> bool:
if len(list(run_group_iter)) < max_retries:
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=self.retry_run_on_asset_or_op_failure,
default_value=self.run_retries_retry_on_asset_or_op_failure,
)
if (
run.tags.get(RUN_FAILURE_REASON_TAG)
Expand Down
3 changes: 1 addition & 2 deletions python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
RETRY_STRATEGY_TAG = f"{SYSTEM_TAG_PREFIX}retry_strategy"
RETRY_ON_ASSET_OR_OP_FAILURE_TAG = f"{SYSTEM_TAG_PREFIX}retry_on_asset_or_op_failure"
WILL_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}will_retry"
DID_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}did_retry"

MAX_RUNTIME_SECONDS_TAG = f"{SYSTEM_TAG_PREFIX}max_runtime"

Expand Down Expand Up @@ -104,7 +103,7 @@
RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics"


TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG, DID_RETRY_TAG}
TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG}


class TagType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
from dagster._core.events import EngineEventData, RunFailureReason
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun, RunRecord
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunRecord
from dagster._core.storage.tags import (
DID_RETRY_TAG,
MAX_RETRIES_TAG,
RETRY_NUMBER_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
RETRY_STRATEGY_TAG,
RUN_FAILURE_REASON_TAG,
WILL_RETRY_TAG,
)
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._daemon.utils import DaemonErrorCapture
Expand All @@ -26,19 +25,53 @@ def filter_runs_to_should_retry(
runs: Sequence[DagsterRun], instance: DagsterInstance, default_max_retries: int
) -> Iterator[Tuple[DagsterRun, int]]:
"""Return only runs that should retry along with their retry number (1st retry, 2nd, etc.)."""
# TODO - Jamie - revert back to how this branch originally was
for run in runs:
if get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), default_value=False):
if run.tags.get(DID_RETRY_TAG) is None:
retry_number = int(run.tags.get(RETRY_NUMBER_TAG, "0")) + 1
yield (run, retry_number)

def get_retry_number(run: DagsterRun) -> Optional[int]:
if run.status != DagsterRunStatus.FAILURE:
return None

raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG)
if raw_max_retries_tag is None:
max_retries = default_max_retries
else:
# for runs that we will not retry, see if the reason is because retry_on_asset_or_op_failure
# is False and if so, report an engine event
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=instance.retry_run_on_asset_or_op_failure,
)
try:
max_retries = int(raw_max_retries_tag)
except ValueError:
instance.report_engine_event(
f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run.", run
)
return None

if max_retries == 0:
return None

# TODO: group these to reduce db calls
run_group = instance.get_run_group(run.run_id)

if run_group:
_, run_group_iter = run_group
run_group_list = list(run_group_iter)

# Has the parent run already been retried the maximum number of times? (Group includes the parent)
if len(run_group_list) >= max_retries + 1:
return None

# Does this run already have a child run?
if any([run.run_id == run_.parent_run_id for run_ in run_group_list]):
return None
return len(run_group_list)
else:
return 1

default_retry_on_asset_or_op_failure: bool = instance.retry_run_on_asset_or_op_failure

for run in runs:
retry_number = get_retry_number(run)
retry_on_asset_or_op_failure = get_boolean_tag_value(
run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG),
default_value=default_retry_on_asset_or_op_failure,
)
if retry_number is not None:
if (
run.tags.get(RUN_FAILURE_REASON_TAG) == RunFailureReason.STEP_FAILURE.value
and not retry_on_asset_or_op_failure
Expand All @@ -48,6 +81,8 @@ def filter_runs_to_should_retry(
"are configured with retry_on_asset_or_op_failure set to false.",
run,
)
else:
yield (run, retry_number)


def get_reexecution_strategy(
Expand Down Expand Up @@ -141,7 +176,6 @@ def retry_run(
)

instance.submit_run(new_run.run_id, workspace)
instance.add_run_tags(failed_run.run_id, {DID_RETRY_TAG: "true"})


def consume_new_runs_for_automatic_reexecution(
Expand Down

0 comments on commit 05e24b1

Please sign in to comment.