-
Notifications
You must be signed in to change notification settings - Fork 0
/
SatFlow_GUI.py
277 lines (229 loc) · 10.9 KB
/
SatFlow_GUI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
try:
# for Python2
from Tkinter import Tk, messagebox, Label, Button, Entry, FileDialog, RIGHT, YES, X, DISABLED, StringVar, Frame, TOP, LEFT
except ImportError:
# for Python3
from tkinter import Tk, messagebox, Label, Button, Entry, filedialog, RIGHT, YES, X, DISABLED, StringVar, Frame, TOP, LEFT
import numpy as np
import os
from itertools import groupby
from operator import itemgetter
#Input Form
fields = ('Vehicles Ignored', 'Saturation Flow Headway', 'Min Vehs per Measurement', 'Minimum Valid Cycles')
class SatFlowsCalc:
def check_params(self, *args):
for p in self.params:
try:
t = int(p.get())
if t>0:
self.run_button.config(state='normal')
else:
self.run_button.config(state='disabled')
break
except ValueError:
break
def __init__(self, master):
self.master = master
master.title("Saturation Flows Calculator")
vcmd = master.register(self.validate) # we have to wrap the command
vcmd_fp = master.register(self.validate_inputFolder) # we have to wrap the command
self.fp_dis = StringVar()
e = Entry(master, textvariable=self.fp_dis,validate="key", validatecommand=(vcmd_fp,'%P'))
b = Button(master, text="Browse",
command=lambda:self.fp_dis.set(filedialog.askdirectory()))
e.pack()
b.pack()
self.vehsIgnored = StringVar()
self.satFlowHead = StringVar()
self.minVehsPerMeas = StringVar()
self.minValidCycles = StringVar()
self.params = [self.vehsIgnored, self.satFlowHead, self.minVehsPerMeas, self.minValidCycles]
i=0
for p in self.params:
row = Frame(master)
lab = Label(row, width=22, text=fields[i]+": ", anchor='w')
self.ent = Entry(row, textvariable=p, validate="key", validatecommand=(vcmd, '%P'))
self.ent.insert(0,'0')
row.pack(side=TOP, fill=X, padx=5, pady=5)
lab.pack(side=LEFT)
self.ent.pack(side=RIGHT, expand=YES, fill=X)
p.trace("w", self.check_params)
i+=1
self.run_button = Button(master, text='Calculate Saturation Flow',
state=DISABLED, command=lambda: self.Main(self.params))
self.run_button.pack(padx=5, pady=5,expand=YES, fill=X)
def validate_inputFolder(self, fp):
print('in')
if os.listdir(fp):
return False
else:
return True
def validate(self, new_text):
if not new_text: # the field is being cleared
self.entered_number = 0
return True
try:
self.entered_number = int(new_text)
return True
except ValueError:
return False
def Main(self,params):
#Close the window
self.master.withdraw()
vehsIgnored = self.vehsIgnored.get()
satFlowHead = self.satFlowHead.get()
minValidCycles = self.minValidCycles.get()
minVehsPerMeas = self.minVehsPerMeas.get()
csv_output_fp = r'Output_SatFlows.csv'
dis_input_fp = self.fp_dis.get()
#--------------------------------------------------------------------------------------------------------------#
satFlow_data = self.GetDisData(dis_input_fp)
#Keep the original Data in memory
wt = satFlow_data
#Remove the rows of the data where crossVehIdx <= VehsIgnored
wt = self.RemoveIgnoredVehs(wt,vehsIgnored)
#Return a dictionary with the total valid crossing time of vehicles for each Signal Head and Simulation
satFlows = self.CalculateSatFlows(wt,satFlowHead,minValidCycles,minVehsPerMeas)
#For the valid Signal Heads we can calculate the average of Saturation Flows
#For the rest we print the list
avgSatFlows = self.CalculateAvgSatFlows(satFlows)
self.Output(csv_output_fp,avgSatFlows)
completion_msg = 'End of execution - Results in {}'.format(csv_output_fp)
print(completion_msg )
messagebox.showinfo('Result', completion_msg )
self.master.destroy()
def GetDisData(self,dis_input_fp):
#Function to read the dis files generated by Vissim
#The output is a table with columns['sigHead','SimRun','cycleStart','timeGap']
import glob
sigHeads = []
Simulation_Runs = []
cycleStarts = []
crossVehIdxs = []
timeGaps = []
path = dis_input_fp + r'\*.dis'
for dis_file in glob.glob(path):
with open(dis_file) as f:
simRun = int(dis_file.rsplit('_',1)[1].replace('.dis',''))
lines = f.readlines()
for line in lines:
line = line.strip()
if line.startswith('Discharge at'):
sigHead = int(line.rsplit(' ', 1)[1])
else:
first_val = line.split(' ',1)[0]
#Try to convert value in float
try:
cycleStart = float(first_val)
#remove the consecutive spaces from the string
line_tmp = " ".join(line.split())
#First remove everything after the parenthesis
timeGaps_inLine = line_tmp.split('(')[0].strip()
timeGaps_inLine = timeGaps_inLine.split(' ')
del timeGaps_inLine[0]
crossVehIdx = 1
for timeGap in timeGaps_inLine:
sigHeads.append(sigHead)
Simulation_Runs.append(simRun)
cycleStarts.append(cycleStart)
crossVehIdxs.append(crossVehIdx)
timeGaps.append(timeGap)
crossVehIdx += 1
except:
continue
#Create the numpy array
#np_array = np.array(sigHeads,Simulation_Runs,cycleStarts,crossVehIdxs,timeGaps)
wtype=np.dtype([('sigHead','i4'),('SimRun','i4'),
('cycleStart','f4'),('crossVehIdx','f4'),
('timeGap','f4')])
wt=np.empty(len(sigHeads),dtype=wtype)
wt['sigHead'] = sigHeads
wt['SimRun'] = Simulation_Runs
wt['cycleStart'] = cycleStarts
wt['crossVehIdx'] = crossVehIdxs
wt['timeGap'] = timeGaps
wt.sort(order='sigHead')
return wt
def RemoveIgnoredVehs(self,inp_tbl,vehsIgnored):
inds_to_del = np.where(inp_tbl['crossVehIdx'] <= vehsIgnored)[0]
out_tbl = np.delete(inp_tbl,inds_to_del,axis=0)
return out_tbl
def CalculateSatFlows(self,wt,satFlowHead,minValidCycles,minVehsPerMeas):
""" Return a dictionary with keys [Signal_Head_Simulation_Run]
and values the total time of vehicles crossing (number of subsequent vehicles with
acceptable timeGap, greater than minVehsPerMeas) and their count
"""
satFlows = {}
for signalHead in np.unique(wt['sigHead']):
#dictionary to store the sat flows of the signal head for each iteration
d = {}
for simRun in np.unique(wt['SimRun']):
#Find the unique values of cycleStarts for this signalHead and simRun
sliced_tbl = wt[(wt['sigHead'] == signalHead) & (wt['SimRun'] == simRun)]
unique_cycleStarts = np.unique(sliced_tbl['cycleStart'])
if len(unique_cycleStarts) >= minValidCycles:
for cycleStart in unique_cycleStarts:
#Indices of the vehicles with low (acceptable) relative gap
idx_validGaps = np.where((wt['sigHead'] == signalHead) &
(wt['SimRun'] == simRun) &
(wt['cycleStart'] == cycleStart) &
(wt['timeGap'] <= satFlowHead))[0]
#Check if there is a sequence of acceptable indices with more element than the limit (minVehsPerMeas)
#Group the array in groups of consecutive integers
for k, g in groupby(enumerate(idx_validGaps), lambda ix: ix[0]-ix[1]):
idxs = map(itemgetter(1), g)
if len(idxs) >= minVehsPerMeas:
#We have found a sequence of vehicles with more elements than minVehsPerMeas so we can move on
crossTime = 0
for idx in idxs:
#Add the sum to a dictionary with the number of vehicles that were counted
crossTime += wt['timeGap'][idx]
sf = round((3600 * len(idxs)/crossTime),2)
d.setdefault(simRun,[]).append(sf)
break
satFlows[signalHead] = d
return satFlows
def CalculateAvgSatFlows(self,satFlows):
#Create a dictionary with the average sat flows for each Signal Head and each iteration
avgSatFlows = {}
for signalHead, simRun_SatFlows in satFlows.iteritems():
#dictionary with the avg sat flows for each iteration
d = {}
for simRun, satFlows in simRun_SatFlows.iteritems():
if len(satFlows) > 0:
avg = sum(satFlows)/len(satFlows)
else:
avg=-1
d[simRun] = round(avg,2)
avgSatFlows[signalHead] = d
#Calculate also the avg of all simRuns
tot = 0
for k, v in d.iteritems():
tot += v
if len(d) > 0:
d['AllSimRuns'] = round(tot/len(d),2)
else:
d['AllSimRuns'] = -1
return avgSatFlows
def Output(self,filepath_out,results):
import csv
import os
import collections as cl
directory = os.path.dirname(filepath_out)
if not os.path.exists(directory):
os.makedirs(directory)
with open(filepath_out,'wb') as f:
w = csv.writer(f)
w.writerow(['Signal_Head','Avg_Saturation_Flow'])
results_ordered = cl.OrderedDict(sorted(results.items()))
for k, v in results_ordered.iteritems():
dout = v
avgAllSimRuns = dout.pop('AllSimRuns',-1)
l = [[k],[avgAllSimRuns],dout.values()]
#flatten the list
row = [i for slst in l for i in slst]
w.writerow(row)
if __name__ == "__main__":
root = Tk()
my_gui = SatFlowsCalc(root)
root.mainloop()