-
Notifications
You must be signed in to change notification settings - Fork 30
/
nested_graphs.py
73 lines (60 loc) · 3.85 KB
/
nested_graphs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Nested graphs are supported in flowpipe."""
from flowpipe import Graph, Node
@Node(outputs=["file"])
def MyNode(file):
# Something is done in here ...
return {"file": file}
# A graph that fixes an incoming file, cleaning up messy names etc.
#
# +-----------------------+ +-------------------------+
# | Cleanup Filename | | Change Lineendings |
# |-----------------------| |-------------------------|
# o file<> | +--->o file<> |
# | file o-----+ | file o
# +-----------------------+ +-------------------------+
fix_file = Graph(name="fix_file")
cleanup_filename = MyNode(name="Cleanup Filename", graph=fix_file)
change_lineendings = MyNode(name="Change Lineendings", graph=fix_file)
cleanup_filename.outputs["file"].connect(change_lineendings.inputs["file"])
# A second graph reads finds files, and extracts their contents into a database
# +----------------+ +----------------------------+ +----------------+
# | Find File | | Read Values from File | | Update DB |
# |----------------| |----------------------------| |----------------|
# o file<> | +--->o file<> | +--->o file<> |
# | file o-----+ | file o-----+ | file o
# +----------------+ +----------------------------+ +----------------+
udpate_db_from_file = Graph(name="udpate_db_from_file")
find_file = MyNode(name="Find File", graph=udpate_db_from_file)
values_from_file = MyNode(
name="Read Values from File", graph=udpate_db_from_file
)
update_db = MyNode(name="Update DB", graph=udpate_db_from_file)
find_file.outputs["file"].connect(values_from_file.inputs["file"])
values_from_file.outputs["file"].connect(update_db.inputs["file"])
# The second graph however relies on clean input files so the first graph can
# be used within the second "udpate db" graph.
# For this purpose, graphs can promote input and output plugs from their nodes
# to the graph level, making other graphs aware of them:
fix_file["Cleanup Filename"].inputs["file"].promote_to_graph(
name="file_to_clean"
)
fix_file["Change Lineendings"].outputs["file"].promote_to_graph(
name="clean_file"
)
# Now the update_db graph can connect nodes to the fix_file graph
find_file.outputs["file"].connect(fix_file.inputs["file_to_clean"])
fix_file.outputs["clean_file"].connect(
udpate_db_from_file["Read Values from File"].inputs["file"]
)
# The result now looks like this:
#
# +---udpate_db_from_file----+ +-------fix_file--------+ +--------fix_file---------+ +----udpate_db_from_file-----+ +---udpate_db_from_file----+
# | Find File | | Cleanup Filename | | Change Lineendings | | Read Values from File | | Update DB |
# |--------------------------| |-----------------------| |-------------------------| |----------------------------| |--------------------------|
# o file<> | +--->o file<> | +--->o file<> | +--->o file<> | +--->o file<> |
# | file o-----+ | file o-----+ | file o-----+ | file o-----+ | file o
# +--------------------------+ +-----------------------+ +-------------------------+ +----------------------------+ +--------------------------+
print(fix_file)
# Subgraphs can be accessed by their name from any participating graph
assert udpate_db_from_file.subgraphs["fix_file"] is fix_file
assert fix_file.subgraphs["udpate_db_from_file"] is udpate_db_from_file