diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py index 9b9a81b45c73c..978a170ff21e0 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py @@ -94,15 +94,13 @@ async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResu ): if downstream_condition in ignored_conditions: continue - child_condition = DownstreamConditionWrapperCondition( - downstream_keys=list(sorted(asset_keys)), operand=downstream_condition - ) - child_context = context.for_child_condition( - child_condition=child_condition, + child_result = await context.for_child_condition( + child_condition=DownstreamConditionWrapperCondition( + downstream_keys=list(sorted(asset_keys)), operand=downstream_condition + ), child_index=i, candidate_subset=context.candidate_subset, - ) - child_result = await child_condition.evaluate(child_context) + ).evaluate_async() child_results.append(child_result) true_subset = true_subset.compute_union(child_result.true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py index 5cd5eec0a1330..564767f611a2b 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py @@ -1,3 +1,4 @@ +import asyncio from typing import List, Sequence import dagster._check as check @@ -41,10 +42,9 @@ async def evaluate( child_results: List[AutomationResult] = [] true_subset = context.candidate_subset for i, child in enumerate(self.children): - child_context = context.for_child_condition( + child_result = await context.for_child_condition( child_condition=child, child_index=i, candidate_subset=true_subset - ) - child_result = await child_context.evaluate_async() + ).evaluate_async() child_results.append(child_result) true_subset = true_subset.compute_intersection(child_result.true_subset) return AutomationResult(context, true_subset, child_results=child_results) @@ -88,14 +88,17 @@ def requires_cursor(self) -> bool: async def evaluate( self, context: AutomationContext[T_EntityKey] ) -> AutomationResult[T_EntityKey]: - child_results: List[AutomationResult] = [] true_subset = context.get_empty_subset() - for i, child in enumerate(self.children): - child_context = context.for_child_condition( + + coroutines = [ + context.for_child_condition( child_condition=child, child_index=i, candidate_subset=context.candidate_subset - ) - child_result = await child_context.evaluate_async() - child_results.append(child_result) + ).evaluate_async() + for i, child in enumerate(self.children) + ] + + child_results = await asyncio.gather(*coroutines) + for child_result in child_results: true_subset = true_subset.compute_union(child_result.true_subset) return AutomationResult(context, true_subset, child_results=child_results) @@ -123,10 +126,9 @@ def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: async def evaluate( self, context: AutomationContext[T_EntityKey] ) -> AutomationResult[T_EntityKey]: - child_context = context.for_child_condition( + child_result = await context.for_child_condition( child_condition=self.operand, child_index=0, candidate_subset=context.candidate_subset - ) - child_result = await child_context.evaluate_async() + ).evaluate_async() true_subset = context.candidate_subset.compute_difference(child_result.true_subset) return AutomationResult(context, true_subset, child_results=[child_result]) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py index b39cd00abcc23..fbf5cc5bc5784 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py @@ -1,3 +1,4 @@ +import asyncio from abc import abstractmethod from typing import AbstractSet @@ -55,21 +56,21 @@ def base_name(self) -> str: return "ANY_CHECKS_MATCH" async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: - check_results = [] true_subset = context.get_empty_subset() - for i, check_key in enumerate( - sorted(self._get_check_keys(context.key, context.asset_graph)) - ): - check_condition = EntityMatchesCondition(key=check_key, operand=self.operand) - check_result = await check_condition.evaluate( - context.for_child_condition( - child_condition=check_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) + coroutines = [ + context.for_child_condition( + child_condition=EntityMatchesCondition(key=check_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() + for i, check_key in enumerate( + sorted(self._get_check_keys(context.key, context.asset_graph)) ) - check_results.append(check_result) + ] + + check_results = await asyncio.gather(*coroutines) + for check_result in check_results: true_subset = true_subset.compute_union(check_result.true_subset) true_subset = context.candidate_subset.compute_intersection(true_subset) @@ -90,14 +91,11 @@ async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResu for i, check_key in enumerate( sorted(self._get_check_keys(context.key, context.asset_graph)) ): - check_condition = EntityMatchesCondition(key=check_key, operand=self.operand) - check_result = await check_condition.evaluate( - context.for_child_condition( - child_condition=check_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) - ) + check_result = await context.for_child_condition( + child_condition=EntityMatchesCondition(key=check_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() check_results.append(check_result) true_subset = true_subset.compute_intersection(check_result.true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index fec38682cf20a..d217f6e765ef3 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -135,14 +135,11 @@ async def evaluate( true_subset = context.get_empty_subset() for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))): - dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand) - dep_result = await dep_condition.evaluate( - context.for_child_condition( - child_condition=dep_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) - ) + dep_result = await context.for_child_condition( + child_condition=EntityMatchesCondition(key=dep_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() dep_results.append(dep_result) true_subset = true_subset.compute_union(dep_result.true_subset) @@ -163,14 +160,11 @@ async def evaluate( true_subset = context.candidate_subset for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))): - dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand) - dep_result = await dep_condition.evaluate( - context.for_child_condition( - child_condition=dep_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) - ) + dep_result = await context.for_child_condition( + child_condition=EntityMatchesCondition(key=dep_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() dep_results.append(dep_result) true_subset = true_subset.compute_intersection(dep_result.true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py index 6309c15b50fdb..1f359cf5f5a32 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py @@ -39,13 +39,12 @@ def _get_previous_child_true_subset( async def evaluate(self, context: AutomationContext) -> AutomationResult: # evaluate child condition - child_context = context.for_child_condition( + child_result = await context.for_child_condition( self.operand, child_index=0, # must evaluate child condition over the entire subset to avoid missing state transitions candidate_subset=context.asset_graph_view.get_full_subset(key=context.key), - ) - child_result = await child_context.evaluate_async() + ).evaluate_async() # get the set of asset partitions of the child which newly became true newly_true_child_subset = child_result.true_subset.compute_difference( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py index 92a9d8952881b..67d69065fc85e 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py @@ -32,16 +32,14 @@ async def evaluate( child_candidate_subset = context.asset_graph_view.get_full_subset(key=context.key) # compute result for trigger condition - trigger_context = context.for_child_condition( + trigger_result = await context.for_child_condition( self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset - ) - trigger_result = await trigger_context.evaluate_async() + ).evaluate_async() # compute result for reset condition - reset_context = context.for_child_condition( + reset_result = await context.for_child_condition( self.reset_condition, child_index=1, candidate_subset=child_candidate_subset - ) - reset_result = await reset_context.evaluate_async() + ).evaluate_async() # take the previous subset that this was true for true_subset = context.previous_true_subset or context.get_empty_subset()