Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 12, 2024
1 parent 88f4ebb commit 9c5e17f
Showing 1 changed file with 41 additions and 18 deletions.
59 changes: 41 additions & 18 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
PartitionsSelector,
)
from dagster._core.errors import DagsterUserCodeUnreachableError
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.asset_backfill import (
AssetBackfillData,
get_asset_backfill_run_chunk_size,
Expand All @@ -68,21 +70,21 @@
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
MAX_RETRIES_TAG,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
ROOT_RUN_ID_TAG,
)
from dagster._core.test_utils import (
create_run_for_test,
environ,
mark_run_successful,
step_did_not_run,
step_failed,
step_succeeded,
)
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
consume_new_runs_for_automatic_reexecution,
)
from dagster._daemon.backfill import execute_backfill_iteration
from dagster._seven import IS_WINDOWS, get_system_temp_directory
from dagster._time import get_current_timestamp
Expand Down Expand Up @@ -232,6 +234,12 @@ def bar(a1):
return a1


@asset(partitions_def=static_partitions)
def pass_on_retry(context):
if context.run.parent_run_id is None:
raise Exception("I failed!")


@asset(partitions_def=static_partitions)
def always_fails():
raise Exception("I always fail")
Expand Down Expand Up @@ -435,6 +443,7 @@ def the_repo():
ab2,
define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions),
always_fails,
pass_on_retry,
# baz is a configurable asset which has no dependencies
baz,
asset_a,
Expand Down Expand Up @@ -2825,7 +2834,7 @@ def test_asset_backfill_not_complete_until_retries_complete(
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill("run_retries_backfill")
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

Expand Down Expand Up @@ -2859,9 +2868,24 @@ def test_asset_backfill_not_complete_until_retries_complete(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.status == BulkActionStatus.REQUESTED

mark_run_successful(instance, retried_run)
# manually mark the run as successful to show that the backfill will be marked as complete
# since there are no in prgress runs
instance.handle_new_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run.run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
event_type_value=DagsterEventType.RUN_SUCCESS.value,
job_name=run.job_name,
),
)
)

retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0]
assert retried_run.status == DagsterRunStatus.SUCCESS
Expand All @@ -2880,7 +2904,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
del remote_repo
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("always_fails")]
asset_selection = [AssetKey("foo"), AssetKey("pass_on_retry")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
Expand All @@ -2896,7 +2920,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill("run_retries_backfill")
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

Expand All @@ -2919,19 +2943,18 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
assert backfill.status == BulkActionStatus.REQUESTED

# automatic retries wont get kicked off in test environment, so we manually create them and mark them completed
runs = instance.get_runs()
for run in runs:
retried_run = create_run_for_test(
instance=instance,
job_name=run.job_name,
tags={**run.tags, ROOT_RUN_ID_TAG: run.run_id, PARENT_RUN_ID_TAG: run.run_id},
root_run_id=run.run_id,
parent_run_id=run.run_id,
)
mark_run_successful(instance, retried_run)
# automatic retries wont get automatically run in test environment, so we run the function manually
runs = instance.get_run_records()
list(
consume_new_runs_for_automatic_reexecution(
workspace_process_context=workspace_context, run_records=runs
)
)
wait_for_all_runs_to_finish(instance, timeout=30)
assert instance.get_runs_count() == 6

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
Expand Down

0 comments on commit 9c5e17f

Please sign in to comment.