Skip to content

Commit

Permalink
handle graph snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 8, 2025
1 parent 3e3a642 commit 818b6ed
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2571,6 +2571,9 @@
"mapped_solid_name": "never_runs_op"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -33745,6 +33748,9 @@
"name": "simple_graph",
"output_def_snaps": [],
"output_mapping_snaps": [],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -33784,7 +33790,7 @@
'''
# ---
# name: test_all_snapshot_ids[17]
'2f4a7e8dac16272a9e008a61e766b43bd473eb7e'
'cfb767a61be6a485474582ec75fc6834cd314c08'
# ---
# name: test_all_snapshot_ids[18]
'''
Expand Down Expand Up @@ -34920,6 +34926,9 @@
"mapped_solid_name": "adder_2"
}
],
"pools": {
"__set__": []
},
"tags": {}
},
{
Expand Down Expand Up @@ -35002,6 +35011,9 @@
"mapped_solid_name": "adder_2"
}
],
"pools": {
"__set__": []
},
"tags": {}
},
{
Expand Down Expand Up @@ -35084,6 +35096,9 @@
"mapped_solid_name": "div_2"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -35165,10 +35180,10 @@
'''
# ---
# name: test_all_snapshot_ids[19]
'f726acb538de96f43dfa0335f7bc79d1b52394c1'
'b336066bf8dc411b162f9869dd08da0b18057896'
# ---
# name: test_all_snapshot_ids[1]
'6c2e7891d74bdc0ff647f100a13f4a630081c2ba'
'fb54440831226acbda6ad87d0d7599c4f3424168'
# ---
# name: test_all_snapshot_ids[20]
'''
Expand Down Expand Up @@ -54872,6 +54887,9 @@
"mapped_solid_name": "never_runs_op"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -55018,7 +55036,7 @@
'''
# ---
# name: test_all_snapshot_ids[51]
'709d40d76a7309b11219cb331842fb349e161eb0'
'200c1bc946100875b19e0c6b7f26fbb9edab070b'
# ---
# name: test_all_snapshot_ids[52]
'''
Expand Down Expand Up @@ -80111,6 +80129,9 @@
"mapped_solid_name": "plus_one"
}
],
"pools": {
"__set__": []
},
"tags": {}
}
],
Expand Down Expand Up @@ -80254,7 +80275,7 @@
'''
# ---
# name: test_all_snapshot_ids[99]
'cf140114fda778cc12c11ba13f7e26cd0fbccb3c'
'1959e3612cf7d53844928fff25df04128d1b15ff'
# ---
# name: test_all_snapshot_ids[9]
'cf67ad5622d23845499daf2adba4a11f6b23f9eb'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,13 @@ def tags(self) -> Mapping[str, str]:
"""The tags associated with the graph."""
return super(GraphDefinition, self).tags

@property
def pools(self) -> Set[str]:
pools = set()
for node_def in self.node_defs:
pools.update(node_def.pools)
return pools

@public
def alias(self, name: str) -> "PendingNodeInvocation":
"""Aliases the graph with a new name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
)

Expand Down Expand Up @@ -240,3 +241,7 @@ def get_op_handles(self, parent: "NodeHandle") -> AbstractSet["NodeHandle"]: ...
def get_op_output_handles(
self, parent: Optional["NodeHandle"]
) -> AbstractSet["NodeOutputHandle"]: ...

@property
@abstractmethod
def pools(self) -> Set[str]: ...
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -309,6 +310,11 @@ def pool(self) -> Optional[str]:
"""Optional[str]: The concurrency group for this op."""
return self._pool

@property
def pools(self) -> Set[str]:
"""Optional[str]: The concurrency group for this op."""
return {self._pool} if self._pool else set()

def is_from_decorator(self) -> bool:
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction

Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/snap/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import cached_property
from typing import Mapping, Optional, Sequence, Union
from typing import Mapping, Optional, Sequence, Set, Union

import dagster._check as check
from dagster._config import ConfigFieldSnap, snap_from_field
Expand Down Expand Up @@ -158,6 +158,7 @@ class GraphDefSnap:
dep_structure_snapshot: DependencyStructureSnapshot
input_mapping_snaps: Sequence[InputMappingSnap]
output_mapping_snaps: Sequence[OutputMappingSnap]
pools: Set[str]

@cached_property
def input_def_map(self) -> Mapping[str, InputDefSnap]:
Expand Down Expand Up @@ -280,6 +281,7 @@ def build_graph_def_snap(graph_def: GraphDefinition) -> GraphDefSnap:
dep_structure_snapshot=build_dep_structure_snapshot_from_graph_def(graph_def),
input_mapping_snaps=list(map(build_input_mapping_snap, graph_def.input_mappings)),
output_mapping_snaps=list(map(build_output_mapping_snap, graph_def.output_mappings)),
pools=graph_def.pools,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
},
"step_output_versions": []
},
"pipeline_snapshot_id": "3057df527127ce93bc08075d3dc3149b5054065a",
"pipeline_snapshot_id": "b57cc7e00dc45fd4927a082265100e665a21f23d",
"snapshot_version": 1,
"step_keys_to_execute": [
"comp_1.return_one",
Expand Down

0 comments on commit 818b6ed

Please sign in to comment.