From 275712815edf5983c13f4c48cffbc3ccfed8dfde Mon Sep 17 00:00:00 2001 From: briantu Date: Mon, 14 Oct 2024 15:09:27 -0400 Subject: [PATCH] Gather since op results --- .../automation_context.py | 1 + .../operators/since_operator.py | 21 +++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py index 8033c9f50ca92..5feb99137240f 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py @@ -10,6 +10,7 @@ from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, + AutomationResult, ) from dagster._core.definitions.declarative_automation.legacy.legacy_context import ( LegacyRuleEvaluationContext, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py index 67d69065fc85e..9bde13745a2d2 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py @@ -1,3 +1,4 @@ +import asyncio from typing import Sequence from dagster._core.definitions.asset_key import T_EntityKey @@ -31,15 +32,17 @@ async def evaluate( # must evaluate child condition over the entire subset to avoid missing state transitions child_candidate_subset = context.asset_graph_view.get_full_subset(key=context.key) - # compute result for trigger condition - trigger_result = await context.for_child_condition( - self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset - ).evaluate_async() - - # compute result for reset condition - reset_result = await context.for_child_condition( - self.reset_condition, child_index=1, candidate_subset=child_candidate_subset - ).evaluate_async() + # compute result for trigger and reset conditions + trigger_result, reset_result = await asyncio.gather( + *[ + context.for_child_condition( + self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset + ).evaluate_async(), + context.for_child_condition( + self.reset_condition, child_index=1, candidate_subset=child_candidate_subset + ).evaluate_async(), + ] + ) # take the previous subset that this was true for true_subset = context.previous_true_subset or context.get_empty_subset()