diff --git a/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py b/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py index 16521b8976fbc..122a2d86acb10 100644 --- a/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py +++ b/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py @@ -1,6 +1,7 @@ import logging import time from typing import cast +from unittest.mock import PropertyMock, patch from dagster import DagsterEvent, DagsterEventType, DagsterInstance, EventLogEntry from dagster._core.events import JobFailureData, RunFailureReason @@ -12,6 +13,7 @@ MAX_RETRIES_TAG, RETRY_ON_ASSET_OR_OP_FAILURE_TAG, RETRY_STRATEGY_TAG, + WILL_RETRY_TAG, ) from dagster._core.test_utils import MockedRunCoordinator, create_run_for_test, instance_for_test from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( @@ -43,212 +45,270 @@ def create_run(instance, **kwargs): def test_filter_runs_to_should_retry(instance): + max_retries_setting = 2 instance.wipe() + with patch( + instance.__class__.__module__ + + "." + + instance.__class__.__name__ + + ".run_retries_max_retries", + new_callable=PropertyMock, + ) as mock_max_run_retries: + mock_max_run_retries.return_value = max_retries_setting + + run = create_run(instance, status=DagsterRunStatus.STARTED) + + assert list(filter_runs_to_should_retry([run], instance, max_retries_setting)) == [] + + dagster_event = DagsterEvent( + event_type_value=DagsterEventType.PIPELINE_FAILURE.value, + job_name="foo", + message="", + ) + event_record = EventLogEntry( + user_message="", + level=logging.ERROR, + job_name="foo", + run_id=run.run_id, + error_info=None, + timestamp=time.time(), + dagster_event=dagster_event, + ) + instance.handle_new_event(event_record) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "true" - run = create_run(instance, status=DagsterRunStatus.STARTED) - - assert list(filter_runs_to_should_retry([run], instance, 2)) == [] - - dagster_event = DagsterEvent( - event_type_value=DagsterEventType.PIPELINE_FAILURE.value, - job_name="foo", - message="", - ) - event_record = EventLogEntry( - user_message="", - level=logging.ERROR, - job_name="foo", - run_id=run.run_id, - error_info=None, - timestamp=time.time(), - dagster_event=dagster_event, - ) - instance.handle_new_event(event_record) - - assert ( - len( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, + assert ( + len( + list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, + ) ) ) + == 1 ) - == 1 - ) def test_filter_runs_no_retry_on_asset_or_op_failure(instance_no_retry_on_asset_or_op_failure): + max_retries_setting = 2 instance = instance_no_retry_on_asset_or_op_failure + with patch( + instance.__class__.__module__ + + "." + + instance.__class__.__name__ + + ".run_retries_max_retries", + new_callable=PropertyMock, + ) as mock_max_run_retries: + mock_max_run_retries.return_value = max_retries_setting + + run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "2"}) + + assert list(filter_runs_to_should_retry([run], instance, max_retries_setting)) == [] + + dagster_event = DagsterEvent( + event_type_value=DagsterEventType.PIPELINE_FAILURE.value, + job_name=run.job_name, + message="oops step failure", + event_specific_data=JobFailureData( + error=None, failure_reason=RunFailureReason.STEP_FAILURE + ), + ) + instance.report_dagster_event(dagster_event, run_id=run.run_id, log_level=logging.ERROR) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "false" - run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "2"}) - - assert list(filter_runs_to_should_retry([run], instance, 2)) == [] - - dagster_event = DagsterEvent( - event_type_value=DagsterEventType.PIPELINE_FAILURE.value, - job_name=run.job_name, - message="oops step failure", - event_specific_data=JobFailureData( - error=None, failure_reason=RunFailureReason.STEP_FAILURE - ), - ) - instance.report_dagster_event(dagster_event, run_id=run.run_id, log_level=logging.ERROR) - - # doesn't retry because its a step failure - - assert ( - len( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, + # doesn't retry because its a step failure + assert ( + len( + list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, + ) ) ) + == 0 ) - == 0 - ) - assert any( - "Not retrying run since it failed due to an asset or op failure and run retries are configured with retry_on_asset_or_op_failure set to false." - in str(event) - for event in instance.all_logs(run.run_id) - ) - - run = create_run( - instance, - status=DagsterRunStatus.STARTED, - tags={MAX_RETRIES_TAG: "2", RETRY_ON_ASSET_OR_OP_FAILURE_TAG: False}, - ) + assert any( + "Not retrying run since it failed due to an asset or op failure and run retries are configured with retry_on_asset_or_op_failure set to false." + in str(event) + for event in instance.all_logs(run.run_id) + ) - dagster_event = DagsterEvent( - event_type_value=DagsterEventType.PIPELINE_FAILURE.value, - job_name=run.job_name, - message="oops step failure", - event_specific_data=JobFailureData( - error=None, failure_reason=RunFailureReason.STEP_FAILURE - ), - ) - instance.report_dagster_event(dagster_event, run_id=run.run_id, log_level=logging.ERROR) + run = create_run( + instance, + status=DagsterRunStatus.STARTED, + tags={MAX_RETRIES_TAG: "2", RETRY_ON_ASSET_OR_OP_FAILURE_TAG: False}, + ) - # does not retry due to the RETRY_ON_ASSET_OR_OP_FAILURE_TAG tag being false + dagster_event = DagsterEvent( + event_type_value=DagsterEventType.PIPELINE_FAILURE.value, + job_name=run.job_name, + message="oops step failure", + event_specific_data=JobFailureData( + error=None, failure_reason=RunFailureReason.STEP_FAILURE + ), + ) + instance.report_dagster_event(dagster_event, run_id=run.run_id, log_level=logging.ERROR) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "false" - assert ( - len( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, + # does not retry due to the RETRY_ON_ASSET_OR_OP_FAILURE_TAG tag being false + assert ( + len( + list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, + ) ) ) + == 0 ) - == 0 - ) - - run = create_run( - instance, - status=DagsterRunStatus.STARTED, - tags={MAX_RETRIES_TAG: "2", RETRY_ON_ASSET_OR_OP_FAILURE_TAG: True}, - ) - dagster_event = DagsterEvent( - event_type_value=DagsterEventType.PIPELINE_FAILURE.value, - job_name=run.job_name, - message="oops step failure", - event_specific_data=JobFailureData( - error=None, failure_reason=RunFailureReason.STEP_FAILURE - ), - ) - instance.report_dagster_event(dagster_event, run_id=run.run_id, log_level=logging.ERROR) + run = create_run( + instance, + status=DagsterRunStatus.STARTED, + tags={MAX_RETRIES_TAG: "2", RETRY_ON_ASSET_OR_OP_FAILURE_TAG: True}, + ) - # does retry due to the RETRY_ON_ASSET_OR_OP_FAILURE_TAG tag being true + dagster_event = DagsterEvent( + event_type_value=DagsterEventType.PIPELINE_FAILURE.value, + job_name=run.job_name, + message="oops step failure", + event_specific_data=JobFailureData( + error=None, failure_reason=RunFailureReason.STEP_FAILURE + ), + ) + instance.report_dagster_event(dagster_event, run_id=run.run_id, log_level=logging.ERROR) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "true" - assert ( - len( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, - ) + # does retry due to the RETRY_ON_ASSET_OR_OP_FAILURE_TAG tag being true + runs_to_retry = list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, ) ) - == 1 - ) + assert len(runs_to_retry) == 1 def test_filter_runs_to_should_retry_tags(instance): instance.wipe() + max_retries_setting = 2 + with patch( + instance.__class__.__module__ + + "." + + instance.__class__.__name__ + + ".run_retries_max_retries", + new_callable=PropertyMock, + ) as mock_max_run_retries: + mock_max_run_retries.return_value = max_retries_setting - run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "0"}) + run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "0"}) - assert list(filter_runs_to_should_retry([run], instance, 2)) == [] + assert list(filter_runs_to_should_retry([run], instance, max_retries_setting)) == [] - instance.report_run_failed(run) + instance.report_run_failed(run) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "false" - assert ( - len( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, + assert ( + len( + list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, + ) ) ) + == 0 ) - == 0 - ) instance.wipe() + max_retries_setting = 0 + with patch( + instance.__class__.__module__ + + "." + + instance.__class__.__name__ + + ".run_retries_max_retries", + new_callable=PropertyMock, + ) as mock_max_run_retries: + mock_max_run_retries.return_value = max_retries_setting - run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "10"}) + run = create_run(instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "10"}) - assert list(filter_runs_to_should_retry([run], instance, 0)) == [] + assert list(filter_runs_to_should_retry([run], instance, max_retries_setting)) == [] - instance.report_run_failed(run) + instance.report_run_failed(run) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "true" - assert ( - len( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, + assert ( + len( + list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, + ) ) ) + == 1 ) - == 1 - ) instance.wipe() + max_retries_setting = 2 + with patch( + instance.__class__.__module__ + + "." + + instance.__class__.__name__ + + ".run_retries_max_retries", + new_callable=PropertyMock, + ) as mock_max_run_retries: + mock_max_run_retries.return_value = max_retries_setting + + run = create_run( + instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "not-an-int"} + ) - run = create_run( - instance, status=DagsterRunStatus.STARTED, tags={MAX_RETRIES_TAG: "not-an-int"} - ) - - assert list(filter_runs_to_should_retry([run], instance, 0)) == [] + assert list(filter_runs_to_should_retry([run], instance, max_retries_setting)) == [] - instance.report_run_failed(run) + instance.report_run_failed(run) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "false" - assert ( - list( - filter_runs_to_should_retry( - instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), - instance, - 2, + assert ( + list( + filter_runs_to_should_retry( + instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE])), + instance, + max_retries_setting, + ) ) + == [] ) - == [] - ) def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context): instance.wipe() instance.run_coordinator.queue().clear() - list( consume_new_runs_for_automatic_reexecution( workspace_context, @@ -275,6 +335,9 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) dagster_event=dagster_event, ) instance.handle_new_event(event_record) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "true" list( consume_new_runs_for_automatic_reexecution( @@ -294,6 +357,7 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) assert len(instance.run_coordinator.queue()) == 1 # retries once the new run failed + first_retry = instance.run_coordinator.queue()[0] dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, job_name="foo", @@ -303,12 +367,16 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) user_message="", level=logging.ERROR, job_name="foo", - run_id=instance.run_coordinator.queue()[0].run_id, + run_id=first_retry.run_id, error_info=None, timestamp=time.time(), dagster_event=dagster_event, ) instance.handle_new_event(event_record) + first_retry = instance.get_run_by_id(first_retry.run_id) + assert first_retry + assert first_retry.tags.get(WILL_RETRY_TAG) == "true" + list( consume_new_runs_for_automatic_reexecution( workspace_context, @@ -318,6 +386,7 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) assert len(instance.run_coordinator.queue()) == 2 # doesn't retry a third time + second_retry = instance.run_coordinator.queue()[1] dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, job_name="foo", @@ -327,12 +396,16 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) user_message="", level=logging.ERROR, job_name="foo", - run_id=instance.run_coordinator.queue()[1].run_id, + run_id=second_retry.run_id, error_info=None, timestamp=time.time(), dagster_event=dagster_event, ) instance.handle_new_event(event_record) + second_retry = instance.get_run_by_id(second_retry.run_id) + assert second_retry + assert second_retry.tags.get(WILL_RETRY_TAG) == "false" + list( consume_new_runs_for_automatic_reexecution( workspace_context, @@ -410,6 +483,9 @@ def test_subset_run(instance: DagsterInstance, workspace_context): dagster_event=dagster_event, ) instance.handle_new_event(event_record) + run = instance.get_run_by_id(run.run_id) + assert run + assert run.tags.get(WILL_RETRY_TAG) == "true" list( consume_new_runs_for_automatic_reexecution( diff --git a/python_modules/dagster/dagster/_core/execution/retries.py b/python_modules/dagster/dagster/_core/execution/retries.py index 6dbdc0ecd4989..39dcf1d65d60e 100644 --- a/python_modules/dagster/dagster/_core/execution/retries.py +++ b/python_modules/dagster/dagster/_core/execution/retries.py @@ -1,13 +1,24 @@ +import warnings from collections import defaultdict from enum import Enum -from typing import Mapping, Optional +from typing import TYPE_CHECKING, Mapping, Optional from dagster import ( Field, Selector, _check as check, ) +from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus +from dagster._core.storage.tags import ( + MAX_RETRIES_TAG, + RETRY_ON_ASSET_OR_OP_FAILURE_TAG, + RUN_FAILURE_REASON_TAG, +) from dagster._serdes.serdes import whitelist_for_serdes +from dagster._utils.tags import get_boolean_tag_value + +if TYPE_CHECKING: + from dagster._core.instance import DagsterInstance def get_retries_config(): @@ -69,3 +80,71 @@ def mark_attempt(self, key: str) -> None: def snapshot_attempts(self) -> Mapping[str, int]: return dict(self._attempts) + + +def auto_reexecution_should_retry_run(instance: "DagsterInstance", run: DagsterRun): + """Determines if a run will be retried by the automatic reexcution system. + A run will retry if: + - it is failed. + - the number of max allowed retries is > 0 (max retries can be set via system setting or run tag). + - there have not already been >= max_retries retries for the run. + + If the run failure reason was a step failure and the retry_on_asset_or_op_failure tag/system setting is set to false, + a warning message will be logged and the run will not be retried. + + We determine how many retries have been launched for the run by looking at the size of the run group + (the set of runs that have the same root_run_id and the run with root_run_id). Since manually launched retries are + part of the run group, this means that if a user launches a manual retry of run A and then this function + is called because a retry for run A launched by the auto-reexecution system failed, the manual retry will be + counted toward max_retries. + + It is unlikely, but possible, that one "extra" retry will be launched by the automatic reexecution system + since manual retries could be happening in parallel with automatic retries. Here is + an illustrative example: + - Max retries is 3 + - Run A fails + - The automatic reexecution system launches a retry of run A (A_1), which fails + - The automatic reexecution system launches a retry run A_1 (A_2), which fails + - This function is executing and has fetched the run_group for run A_2: (A, A_1, A_2) + - A user launches a manual retry of run A (A_m). The run group is now (A, A_1, A_2, A_m), but this function does + not have the updated run group + - Since the run group we've fetched is (A, A_1, A_2), this function will mark A_2 as `will_retry=true` and + run `A_3` will be launched. This is the "extra" retry, since usually manual retries are counted toward max_retries, but + in this case it was not. + + We think this is an acceptable tradeoff to make since the automatic reexecution system won't launch more than max_retries + run itself, just that max_retries + 1 runs could be launched in total if a manual retry is timed to cause this condition (unlikely). + """ + from dagster._core.events import RunFailureReason + + if run.status != DagsterRunStatus.FAILURE: + return False + + raw_max_retries_tag = run.tags.get(MAX_RETRIES_TAG) + if raw_max_retries_tag is None: + max_retries = instance.run_retries_max_retries + else: + try: + max_retries = int(raw_max_retries_tag) + except ValueError: + warnings.warn(f"Error parsing int from tag {MAX_RETRIES_TAG}, won't retry the run.") + return False + if max_retries > 0: + run_group = instance.get_run_group(run.run_id) + if run_group is not None: + _, run_group_iter = run_group + # since the original run is in the run group, the number of retries launched + # so far is len(run_group_iter) - 1 + if len(list(run_group_iter)) <= max_retries: + retry_on_asset_or_op_failure = get_boolean_tag_value( + run.tags.get(RETRY_ON_ASSET_OR_OP_FAILURE_TAG), + default_value=instance.run_retries_retry_on_asset_or_op_failure, + ) + if ( + run.tags.get(RUN_FAILURE_REASON_TAG) == RunFailureReason.STEP_FAILURE.value + and not retry_on_asset_or_op_failure + ): + return False + else: + return True + return False diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 976818c611516..faadb38006a6a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -47,6 +47,7 @@ DagsterRunAlreadyExists, DagsterRunConflict, ) +from dagster._core.execution.retries import auto_reexecution_should_retry_run from dagster._core.instance.config import ( DAGSTER_CONFIG_YAML_FILENAME, DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT, @@ -75,6 +76,7 @@ RESUME_RETRY_TAG, ROOT_RUN_ID_TAG, TAGS_TO_OMIT_ON_RETRY, + WILL_RETRY_TAG, ) from dagster._serdes import ConfigurableClass from dagster._time import get_current_datetime, get_current_timestamp @@ -2463,7 +2465,14 @@ def handle_new_event( and event.get_dagster_event().is_job_event ): self._run_storage.handle_run_event(run_id, event.get_dagster_event()) - + run = self.get_run_by_id(run_id) + if run and event.get_dagster_event().is_run_failure and self.run_retries_enabled: + # Note that this tag is only applied to runs that fail. Successful runs will not + # have a WILL_RETRY_TAG tag. + self.add_run_tags( + run_id, + {WILL_RETRY_TAG: str(auto_reexecution_should_retry_run(self, run)).lower()}, + ) for sub in self._subscribers[run_id]: sub(event) diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index a980da3a49bcc..978e334268314 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -58,6 +58,10 @@ RETRY_STRATEGY_TAG = f"{SYSTEM_TAG_PREFIX}retry_strategy" RETRY_ON_ASSET_OR_OP_FAILURE_TAG = f"{SYSTEM_TAG_PREFIX}retry_on_asset_or_op_failure" +# This tag is used to indicate that the automatic retry daemon will launch a retry for this run +# If this tag is not on a run, it means the run did not fail or automatic retries is disabled. +WILL_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}will_retry" + MAX_RUNTIME_SECONDS_TAG = f"{SYSTEM_TAG_PREFIX}max_runtime" AUTO_MATERIALIZE_TAG = f"{SYSTEM_TAG_PREFIX}auto_materialize" @@ -102,7 +106,7 @@ RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics" -TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG} +TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG} class TagType(Enum): diff --git a/python_modules/dagster/dagster_tests/execution_tests/versioning_tests/test_data_versions.py b/python_modules/dagster/dagster_tests/execution_tests/versioning_tests/test_data_versions.py index df8cf0d29fc2c..b8d87fa66c7fb 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/versioning_tests/test_data_versions.py +++ b/python_modules/dagster/dagster_tests/execution_tests/versioning_tests/test_data_versions.py @@ -1049,7 +1049,10 @@ def downstream_asset(**kwargs): counter = Counter() traced_counter.set(counter) materialize_assets(all_assets, instance)[downstream_asset.key] - assert traced_counter.get().counts() == { - "DagsterInstance.get_asset_records": 1, - "DagsterInstance.get_run_record_by_id": 1, - } + assert ( + traced_counter.get().counts() + == { + "DagsterInstance.get_asset_records": 1, + "DagsterInstance.get_run_record_by_id": 3, # get_run_record_by_id called when handling events for the run + } + )