diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py index d138cc2678f3f..b7e711cc47d8f 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py @@ -130,7 +130,7 @@ async def _evaluate_entity_async(entity_key: EntityKey) -> int: ) try: - self.evaluate_entity(entity_key) + await self.evaluate_entity(entity_key) except Exception as e: raise Exception( f"Error while evaluating conditions for {entity_key.to_user_string()}" @@ -161,10 +161,9 @@ async def _evaluate_entity_async(entity_key: EntityKey) -> int: return list(self.current_results_by_key.values()), list(self._get_entity_subsets()) - def evaluate_entity(self, key: EntityKey) -> None: + async def evaluate_entity(self, key: EntityKey) -> None: # evaluate the condition of this asset - context = AutomationContext.create(key=key, evaluator=self) - result = context.condition.evaluate(context) + result = await AutomationContext.create(key=key, evaluator=self).evaluate_async() # update dictionaries to keep track of this result self.current_results_by_key[key] = result diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py index 3d4d73c6aeb94..817b10e040986 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py @@ -1,4 +1,5 @@ import datetime +import inspect import logging from dataclasses import dataclass from typing import TYPE_CHECKING, Generic, Mapping, Optional, Type, TypeVar @@ -116,6 +117,11 @@ def for_child_condition( _root_log=self._root_log, ) + async def evaluate_async(self) -> AutomationResult[T_EntityKey]: + if inspect.iscoroutinefunction(self.condition.evaluate): + return await self.condition.evaluate(self) + return self.condition.evaluate(self) + @property def log(self) -> logging.Logger: """The logger for the current condition evaluation.""" diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py index 23cf97fe84e20..5cd5eec0a1330 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py @@ -35,14 +35,16 @@ def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: def requires_cursor(self) -> bool: return False - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: child_results: List[AutomationResult] = [] true_subset = context.candidate_subset for i, child in enumerate(self.children): child_context = context.for_child_condition( child_condition=child, child_index=i, candidate_subset=true_subset ) - child_result = child.evaluate(child_context) + child_result = await child_context.evaluate_async() child_results.append(child_result) true_subset = true_subset.compute_intersection(child_result.true_subset) return AutomationResult(context, true_subset, child_results=child_results) @@ -83,14 +85,16 @@ def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: def requires_cursor(self) -> bool: return False - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: child_results: List[AutomationResult] = [] true_subset = context.get_empty_subset() for i, child in enumerate(self.children): child_context = context.for_child_condition( child_condition=child, child_index=i, candidate_subset=context.candidate_subset ) - child_result = child.evaluate(child_context) + child_result = await child_context.evaluate_async() child_results.append(child_result) true_subset = true_subset.compute_union(child_result.true_subset) @@ -116,11 +120,13 @@ def name(self) -> str: def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: return [self.operand] - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: child_context = context.for_child_condition( child_condition=self.operand, child_index=0, candidate_subset=context.candidate_subset ) - child_result = self.operand.evaluate(child_context) + child_result = await child_context.evaluate_async() true_subset = context.candidate_subset.compute_difference(child_result.true_subset) return AutomationResult(context, true_subset, child_results=[child_result]) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py index 4f07078ff058c..8068bb3c79fc6 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py @@ -61,7 +61,7 @@ def base_description(self) -> str: def name(self) -> str: return "ANY_CHECKS_MATCH" - def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: + async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: check_results = [] true_subset = context.get_empty_subset() @@ -69,7 +69,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass sorted(self._get_check_keys(context.key, context.asset_graph)) ): check_condition = EntityMatchesCondition(key=check_key, operand=self.operand) - check_result = check_condition.evaluate( + check_result = await check_condition.evaluate( context.for_child_condition( child_condition=check_condition, child_index=i, @@ -94,7 +94,7 @@ def base_description(self) -> str: def name(self) -> str: return "ALL_CHECKS_MATCH" - def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: + async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: check_results = [] true_subset = context.candidate_subset @@ -102,7 +102,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass sorted(self._get_check_keys(context.key, context.asset_graph)) ): check_condition = EntityMatchesCondition(key=check_key, operand=self.operand) - check_result = check_condition.evaluate( + check_result = await check_condition.evaluate( context.for_child_condition( child_condition=check_condition, child_index=i, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index e035986b5bb25..cea10efc175be 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -26,13 +26,15 @@ class EntityMatchesCondition( key: U_EntityKey operand: AutomationCondition[U_EntityKey] - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: to_candidate_subset = context.candidate_subset.compute_mapped_subset(self.key) to_context = context.for_child_condition( child_condition=self.operand, child_index=0, candidate_subset=to_candidate_subset ) - to_result = self.operand.evaluate(to_context) + to_result = await to_context.evaluate_async() true_subset = to_result.true_subset.compute_mapped_subset(context.key) return AutomationResult(context=context, true_subset=true_subset, child_results=[to_result]) @@ -108,13 +110,15 @@ def base_description(self) -> str: def name(self) -> str: return "ANY_DEPS_MATCH" - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: dep_results = [] true_subset = context.get_empty_subset() for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))): dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand) - dep_result = dep_condition.evaluate( + dep_result = await dep_condition.evaluate( context.for_child_condition( child_condition=dep_condition, child_index=i, @@ -138,13 +142,15 @@ def base_description(self) -> str: def name(self) -> str: return "ALL_DEPS_MATCH" - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: dep_results = [] true_subset = context.candidate_subset for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))): dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand) - dep_result = dep_condition.evaluate( + dep_result = await dep_condition.evaluate( context.for_child_condition( child_condition=dep_condition, child_index=i, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py index 81ba978b214a8..53d9cc811c851 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py @@ -41,7 +41,7 @@ def _get_previous_child_true_subset( return None return context.asset_graph_view.get_subset_from_serializable_subset(true_subset) - def evaluate(self, context: AutomationContext) -> AutomationResult: + async def evaluate(self, context: AutomationContext) -> AutomationResult: # evaluate child condition child_context = context.for_child_condition( self.operand, @@ -49,7 +49,7 @@ def evaluate(self, context: AutomationContext) -> AutomationResult: # must evaluate child condition over the entire subset to avoid missing state transitions candidate_subset=context.asset_graph_view.get_full_subset(key=context.key), ) - child_result = self.operand.evaluate(child_context) + child_result = await child_context.evaluate_async() # get the set of asset partitions of the child which newly became true newly_true_child_subset = child_result.true_subset.compute_difference( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py index d732aecae063c..05ca7259942be 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py @@ -31,7 +31,9 @@ def name(self) -> str: def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: return [self.trigger_condition, self.reset_condition] - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: # must evaluate child condition over the entire subset to avoid missing state transitions child_candidate_subset = context.asset_graph_view.get_full_subset(key=context.key) @@ -39,13 +41,13 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ trigger_context = context.for_child_condition( self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset ) - trigger_result = self.trigger_condition.evaluate(trigger_context) + trigger_result = await trigger_context.evaluate_async() # compute result for reset condition reset_context = context.for_child_condition( self.reset_condition, child_index=1, candidate_subset=child_candidate_subset ) - reset_result = self.reset_condition.evaluate(reset_context) + reset_result = await reset_context.evaluate_async() # take the previous subset that this was true for true_subset = context.previous_true_subset or context.get_empty_subset()