Skip to content

Commit

Permalink
Make automation evaluate async
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 14, 2024
1 parent 27decd9 commit b2e60d7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,12 @@ def toposorted_asset_keys(self) -> Sequence[AssetKey]:
]

@cached_property
def toposorted_entity_keys(self) -> Sequence[EntityKey]:
"""Return topologically sorted entity keys in graph. Keys with the same topological level are
def toposorted_entity_keys(self) -> Sequence[Sequence[EntityKey]]:
"""Return topologically sorted levels for entity keys in graph. Keys with the same topological level are
sorted alphabetically to provide stability.
"""
sort_key = lambda e: (e, None) if isinstance(e, AssetKey) else (e.asset_key, e.name)
return [
item
for items_in_level in toposort(self.entity_dep_graph["upstream"], sort_key=sort_key)
for item in sorted(items_in_level, key=sort_key)
]
return toposort(self.entity_dep_graph["upstream"], sort_key=sort_key)

@cached_property
def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
import logging
from collections import defaultdict
Expand Down Expand Up @@ -114,12 +115,17 @@ def prefetch(self) -> None:
self.logger.info("Done prefetching asset records.")

def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[EntityKey]]]:
return asyncio.run(self.async_evaluate())

async def async_evaluate(
self,
) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[EntityKey]]]:
self.prefetch()
num_conditions = len(self.entity_keys)
num_evaluated = 0
for entity_key in self.asset_graph.toposorted_entity_keys:
if entity_key not in self.entity_keys:
continue

async def _evaluate_entity_async(entity_key: EntityKey) -> None:
nonlocal num_evaluated

self.logger.debug(
f"Evaluating {entity_key.to_user_string()} ({num_evaluated+1}/{num_conditions})"
Expand All @@ -145,6 +151,15 @@ def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[En
f"({format(result.end_timestamp - result.start_timestamp, '.3f')} seconds)"
)
num_evaluated += 1

for topo_level in self.asset_graph.toposorted_entity_keys:
coroutines = [
_evaluate_entity_async(entity_key)
for entity_key in topo_level
if entity_key in self.entity_keys
]
await asyncio.gather(*coroutines)

return list(self.current_results_by_key.values()), list(self._get_entity_subsets())

def evaluate_entity(self, key: EntityKey) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
HourlyPartitionsDefinition,
evaluate_automation_conditions,
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import AssetMaterialization
from dagster._core.instance import DagsterInstance
from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets

Expand All @@ -26,7 +28,17 @@ def test_eager_perf() -> None:
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(),
)

defs = Definitions(assets=assets)

instance = DagsterInstance.ephemeral()

for asset_key in defs.get_asset_graph().all_asset_keys:
instance.report_runless_asset_event(
AssetMaterialization(
asset_key=asset_key, partition=hourly_partitions_def.get_last_partition_key()
)
)

cursor = None
start = time.time()
for _ in range(2):
Expand Down

0 comments on commit b2e60d7

Please sign in to comment.