-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathContactNetwork_NetworkX.py
executable file
·205 lines (173 loc) · 7.64 KB
/
ContactNetwork_NetworkX.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#! /usr/bin/env python3
'''
Niema Moshiri 2016
"ContactNetwork" module, implemented with NetworkX
'''
from ContactNetwork import ContactNetwork
from ContactNetworkNode_NetworkX import ContactNetworkNode_NetworkX as Node
from ContactNetworkEdge_NetworkX import ContactNetworkEdge_NetworkX as Edge
import FAVITES_GlobalContext as GC
from copy import copy
class ContactNetwork_NetworkX(ContactNetwork):
'''
Implement the ``ContactNetwork`` abstract class using NetworkX
Attributes
----------
contact_network : Graph
The NetworkX ``Graph'' object to represent this ``ContactNetwork''
infected_nodes : set of ContactNetworkNode
Set containing the nodes that have been infected
name_to_num : dict
Dictionary mapping original node names to numbers
nodes : set of ContactNetworkNodes
Set containing all of the nodes in this ``ContactNetwork''
num_to_name : dict
Dictionary mapping numbers to original node names
transmissions : list
List of transmission events as (u,v,time) tuples
uninfected_nodes : set of ContactNetworkNodes
Set contianing the nodes that have not yet been infected
'''
def init():
try:
global DiGraph
from networkx import DiGraph
except:
from os import chdir
chdir(GC.START_DIR)
assert False, "Error loading NetworkX. Install with: pip3 install networkx"
def cite():
return GC.CITATION_NETWORKX
def __init__(self, edge_list=None):
# if ModuleFactory is just testing, do nothing
if edge_list is None:
return
# set up NetworkX and graph
self.contact_network = DiGraph()
self.name_to_num = {} # map original node names to numbers
self.num_to_name = [] # map numbers to original node names
self.nodes = set() # store all nodes
self.uninfected_nodes = set() # store uninfected nodes
self.infected_nodes = set() # store infected nodes
self.transmissions = [] # store u->v transmission as (u,v,time)
self.directed = False # am I directed (True) or undirected (False)?
# read in Contact Network as node+edge list
for line in edge_list:
line = line.strip()
if len(line) == 0 or line[0] == '#':
continue
parts = line.split('\t')
# add node to contact network
if parts[0] == 'NODE':
name = parts[1]
if name in self.name_to_num:
raise ValueError("Node %r already exists!" % name)
num = len(self.num_to_name)
self.name_to_num[name] = num
self.num_to_name.append(name)
self.contact_network.add_node(num)
if parts[2] == '.':
self.contact_network.nodes[num]['attribute'] = set()
else:
self.contact_network.nodes[num]['attribute'] = set([e.strip().upper() for e in parts[2].split(',')])
self.contact_network.nodes[num]['infections_from'] = []
self.contact_network.nodes[num]['infections_to'] = []
self.contact_network.nodes[num]['infection_trees'] = []
# add edge to contact network
elif parts[0] == 'EDGE':
uName = parts[1]
if uName not in self.name_to_num:
raise ValueError("Node %r does not exist!" % uName)
vName = parts[2]
if vName not in self.name_to_num:
raise ValueError("Node %r does not exist!" % vName)
uNum = self.name_to_num[uName]
vNum = self.name_to_num[vName]
self.contact_network.add_edge(uNum,vNum)
if parts[3] == '.':
self.contact_network[uNum][vNum]['attribute'] = set()
else:
self.contact_network[uNum][vNum]['attribute'] = set(parts[3].split(','))
if parts[4] not in {'d','u'}:
raise ValueError("Invalid directionality: %r" % parts[4])
if parts[4] == 'u': # undirected edge, so add v to u too
self.contact_network.add_edge(vNum,uNum)
self.contact_network[vNum][uNum]['attribute'] = self.contact_network[uNum][vNum]['attribute']
else:
self.directed = True
# invalid type
else:
raise ValueError("Invalid type in list: %r" % parts[0])
# create sets of nodes
for node in self.contact_network.nodes():
self.nodes.add(Node(self, self.num_to_name[node], node))
for node in self.nodes:
self.uninfected_nodes.add(node)
def is_directed(self):
return self.directed
def num_transmissions(self):
return len(self.transmissions)
def num_nodes(self):
return len(self.nodes)
def get_nodes(self):
return copy(self.nodes)
def get_node(self, name):
if name is None:
return None
return Node(self, name, self.name_to_num[name])
def num_infected_nodes(self):
return len(self.infected_nodes)
def get_infected_nodes(self):
return copy(self.infected_nodes)
def num_uninfected_nodes(self):
return len(self.uninfected_nodes)
def get_uninfected_nodes(self):
return copy(self.uninfected_nodes)
def num_edges(self):
return self.contact_network.number_of_edges()
def nodes_iter(self):
for node in self.nodes:
yield node
def edges_iter(self):
for edge in self.contact_network.edges():
uNum,vNum = edge
attr = self.contact_network[uNum][vNum]['attribute']
u = Node(self, self.num_to_name[uNum], uNum)
v = Node(self, self.num_to_name[vNum], vNum)
yield Edge(u,v,attr)
def get_edges_from(self, node):
nx_edges = self.contact_network.out_edges(self.name_to_num[node.get_name()])
return [Edge(node, Node(self, self.num_to_name[vNum], vNum), self.contact_network[uNum][vNum]['attribute']) for uNum,vNum in nx_edges]
def get_edges_to(self,node):
nx_edges = self.contact_network.in_edges(self.name_to_num[node.get_name()])
return [Edge(Node(self, self.num_to_name[uNum], uNum), node, self.contact_network[uNum][vNum]['attribute']) for uNum,vNum in nx_edges]
def get_transmissions(self):
return copy(self.transmissions)
def add_transmission(self,u,v,time):
assert u is None or isinstance(u, Node), "u is not a ContactNetworkNode_NetworkX"
assert isinstance(v, Node), "v is not a ContactNetworkNode_NetworkX"
self.transmissions.append((u,v,time))
if u != v:
self.add_to_infected(v)
def add_to_infected(self,node):
assert isinstance(node, Node), "node is not a ContactNetworkNode_NetworkX"
assert node.is_infected(), "node is not infected! Infect before moving"
if node in self.uninfected_nodes:
self.uninfected_nodes.remove(node)
self.infected_nodes.add(node)
def remove_from_infected(self,node):
assert isinstance(node, Node), "node is not a ContactNetworkNode_NetworkX"
assert not node.is_infected(), "node is infected! Uninfect before moving"
self.infected_nodes.discard(node)
self.uninfected_nodes.add(node)
def check():
'''
Check ``ContactNetwork_NetworkX`` for validity
'''
pass
if __name__ == '__main__':
'''
This function is just used for testing purposes. It has no actual function
in the simulator tool.
'''
check()