Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 14, 2024
1 parent 9490d9c commit ebe8594
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,13 @@ async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResu
):
if downstream_condition in ignored_conditions:
continue
child_condition = DownstreamConditionWrapperCondition(
downstream_keys=list(sorted(asset_keys)), operand=downstream_condition
)
child_context = context.for_child_condition(
child_condition=child_condition,
child_result = await context.for_child_condition(
child_condition=DownstreamConditionWrapperCondition(
downstream_keys=list(sorted(asset_keys)), operand=downstream_condition
),
child_index=i,
candidate_subset=context.candidate_subset,
)
child_result = await child_condition.evaluate(child_context)
).evaluate_async()

child_results.append(child_result)
true_subset = true_subset.compute_union(child_result.true_subset)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import List, Sequence

import dagster._check as check
Expand Down Expand Up @@ -41,10 +42,9 @@ async def evaluate(
child_results: List[AutomationResult] = []
true_subset = context.candidate_subset
for i, child in enumerate(self.children):
child_context = context.for_child_condition(
child_result = await context.for_child_condition(
child_condition=child, child_index=i, candidate_subset=true_subset
)
child_result = await child_context.evaluate_async()
).evaluate_async()
child_results.append(child_result)
true_subset = true_subset.compute_intersection(child_result.true_subset)
return AutomationResult(context, true_subset, child_results=child_results)
Expand Down Expand Up @@ -88,14 +88,17 @@ def requires_cursor(self) -> bool:
async def evaluate(
self, context: AutomationContext[T_EntityKey]
) -> AutomationResult[T_EntityKey]:
child_results: List[AutomationResult] = []
true_subset = context.get_empty_subset()
for i, child in enumerate(self.children):
child_context = context.for_child_condition(

coroutines = [
context.for_child_condition(
child_condition=child, child_index=i, candidate_subset=context.candidate_subset
)
child_result = await child_context.evaluate_async()
child_results.append(child_result)
).evaluate_async()
for i, child in enumerate(self.children)
]

child_results = await asyncio.gather(*coroutines)
for child_result in child_results:
true_subset = true_subset.compute_union(child_result.true_subset)

return AutomationResult(context, true_subset, child_results=child_results)
Expand Down Expand Up @@ -123,10 +126,9 @@ def children(self) -> Sequence[AutomationCondition[T_EntityKey]]:
async def evaluate(
self, context: AutomationContext[T_EntityKey]
) -> AutomationResult[T_EntityKey]:
child_context = context.for_child_condition(
child_result = await context.for_child_condition(
child_condition=self.operand, child_index=0, candidate_subset=context.candidate_subset
)
child_result = await child_context.evaluate_async()
).evaluate_async()
true_subset = context.candidate_subset.compute_difference(child_result.true_subset)

return AutomationResult(context, true_subset, child_results=[child_result])
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from abc import abstractmethod
from typing import AbstractSet

Expand Down Expand Up @@ -62,21 +63,21 @@ def name(self) -> str:
return "ANY_CHECKS_MATCH"

async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]:
check_results = []
true_subset = context.get_empty_subset()

for i, check_key in enumerate(
sorted(self._get_check_keys(context.key, context.asset_graph))
):
check_condition = EntityMatchesCondition(key=check_key, operand=self.operand)
check_result = await check_condition.evaluate(
context.for_child_condition(
child_condition=check_condition,
child_index=i,
candidate_subset=context.candidate_subset,
)
coroutines = [
context.for_child_condition(
child_condition=EntityMatchesCondition(key=check_key, operand=self.operand),
child_index=i,
candidate_subset=context.candidate_subset,
).evaluate_async()
for i, check_key in enumerate(
sorted(self._get_check_keys(context.key, context.asset_graph))
)
check_results.append(check_result)
]

check_results = await asyncio.gather(*coroutines)
for check_result in check_results:
true_subset = true_subset.compute_union(check_result.true_subset)

true_subset = context.candidate_subset.compute_intersection(true_subset)
Expand All @@ -101,14 +102,11 @@ async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResu
for i, check_key in enumerate(
sorted(self._get_check_keys(context.key, context.asset_graph))
):
check_condition = EntityMatchesCondition(key=check_key, operand=self.operand)
check_result = await check_condition.evaluate(
context.for_child_condition(
child_condition=check_condition,
child_index=i,
candidate_subset=context.candidate_subset,
)
)
check_result = await context.for_child_condition(
child_condition=EntityMatchesCondition(key=check_key, operand=self.operand),
child_index=i,
candidate_subset=context.candidate_subset,
).evaluate_async()
check_results.append(check_result)
true_subset = true_subset.compute_intersection(check_result.true_subset)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,11 @@ async def evaluate(
true_subset = context.get_empty_subset()

for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))):
dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand)
dep_result = await dep_condition.evaluate(
context.for_child_condition(
child_condition=dep_condition,
child_index=i,
candidate_subset=context.candidate_subset,
)
)
dep_result = await context.for_child_condition(
child_condition=EntityMatchesCondition(key=dep_key, operand=self.operand),
child_index=i,
candidate_subset=context.candidate_subset,
).evaluate_async()
dep_results.append(dep_result)
true_subset = true_subset.compute_union(dep_result.true_subset)

Expand All @@ -153,14 +150,11 @@ async def evaluate(
true_subset = context.candidate_subset

for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))):
dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand)
dep_result = await dep_condition.evaluate(
context.for_child_condition(
child_condition=dep_condition,
child_index=i,
candidate_subset=context.candidate_subset,
)
)
dep_result = await context.for_child_condition(
child_condition=EntityMatchesCondition(key=dep_key, operand=self.operand),
child_index=i,
candidate_subset=context.candidate_subset,
).evaluate_async()
dep_results.append(dep_result)
true_subset = true_subset.compute_intersection(dep_result.true_subset)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ def _get_previous_child_true_subset(

async def evaluate(self, context: AutomationContext) -> AutomationResult:
# evaluate child condition
child_context = context.for_child_condition(
child_result = await context.for_child_condition(
self.operand,
child_index=0,
# must evaluate child condition over the entire subset to avoid missing state transitions
candidate_subset=context.asset_graph_view.get_full_subset(key=context.key),
)
child_result = await child_context.evaluate_async()
).evaluate_async()

# get the set of asset partitions of the child which newly became true
newly_true_child_subset = child_result.true_subset.compute_difference(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ async def evaluate(
child_candidate_subset = context.asset_graph_view.get_full_subset(key=context.key)

# compute result for trigger condition
trigger_context = context.for_child_condition(
trigger_result = await context.for_child_condition(
self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset
)
trigger_result = await trigger_context.evaluate_async()
).evaluate_async()

# compute result for reset condition
reset_context = context.for_child_condition(
reset_result = await context.for_child_condition(
self.reset_condition, child_index=1, candidate_subset=child_candidate_subset
)
reset_result = await reset_context.evaluate_async()
).evaluate_async()

# take the previous subset that this was true for
true_subset = context.previous_true_subset or context.get_empty_subset()
Expand Down

0 comments on commit ebe8594

Please sign in to comment.