forked from Karenw1004/Deeppicar-v3
-
Notifications
You must be signed in to change notification settings - Fork 4
/
deeppicar.py
executable file
·327 lines (289 loc) · 10.9 KB
/
deeppicar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#!/usr/bin/python
import os
import time
import atexit
import cv2
import math
import numpy as np
import sys
import params
import argparse
from PIL import Image, ImageDraw
##########################################################
# import deeppicar's sensor/actuator modules
##########################################################
camera = __import__(params.camera)
actuator = __import__(params.actuator)
inputdev = __import__(params.inputdev)
##########################################################
# global variable initialization
##########################################################
use_thread = True
view_video = False
fpv_video = False
enable_record = False
cfg_cam_res = (160, 120)
cfg_cam_fps = 30
frame_id = 0
angle = 0.0
period = 0.05 # sec (=50ms)
##########################################################
# local functions
##########################################################
def deg2rad(deg):
return deg * math.pi / 180.0
def rad2deg(rad):
return 180.0 * rad / math.pi
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count*period - time.time(),0)
def turn_off():
actuator.stop()
camera.stop()
if frame_id > 0:
keyfile.close()
vidfile.release()
def crop_image(img):
scaled_img = cv2.resize(img, (max(int(params.img_height * 4 / 3), params.img_width), params.img_height))
fb_h, fb_w, fb_c = scaled_img.shape
# print(scaled_img.shape)
startx = int((fb_w - params.img_width) / 2);
starty = int((fb_h - params.img_height) / 2);
return scaled_img[starty:starty+params.img_height, startx:startx+params.img_width,:]
def preprocess(img):
if args.pre == "crop":
img = crop_image(img)
else:
img = cv2.resize(img, (params.img_width, params.img_height))
# Convert to grayscale and readd channel dimension
if params.img_channels == 1:
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = np.reshape(img, (params.img_height, params.img_width, params.img_channels))
img = img / 255.
return img
def overlay_image(l_img, s_img, x_offset, y_offset):
assert y_offset + s_img.shape[0] <= l_img.shape[0]
assert x_offset + s_img.shape[1] <= l_img.shape[1]
l_img = l_img.copy()
for c in range(0, 3):
l_img[y_offset:y_offset+s_img.shape[0],
x_offset:x_offset+s_img.shape[1], c] = (
s_img[:,:,c] * (s_img[:,:,3]/255.0) +
l_img[y_offset:y_offset+s_img.shape[0],
x_offset:x_offset+s_img.shape[1], c] *
(1.0 - s_img[:,:,3]/255.0))
return l_img
def print_stats(execution_times):
# Calculate statistics
avg = np.mean(execution_times)
min = np.min(execution_times)
max = np.max(execution_times)
p99 = np.percentile(execution_times, 99)
p90 = np.percentile(execution_times, 90)
p50 = np.percentile(execution_times, 50)
print(f"Average Execution Time: {avg:.6f} seconds")
print(f"Minimum Execution Time: {min:.6f} seconds")
print(f"Maximum Execution Time: {max:.6f} seconds")
print(f"99th Percentile Execution Time: {p99:.6f} seconds")
print(f"90th Percentile Execution Time: {p90:.6f} seconds")
print(f"50th Percentile Execution Time: {p50:.6f} seconds")
def measure_execution_time(func, num_trials):
execution_times = []
for _ in range(num_trials):
start_time = time.time()
func() # Call the function to measure its execution time
end_time = time.time()
execution_time = end_time - start_time
execution_times.append(execution_time)
# Calculate statistics
print_stats(execution_times)
##########################################################
# program begins
##########################################################
parser = argparse.ArgumentParser(description='DeepPicar main')
parser.add_argument("-d", "--dnn", help="Enable DNN", action="store_true")
parser.add_argument("-t", "--throttle", help="throttle percent. [0-100]%", type=int, default=100)
parser.add_argument("--turnthresh", help="throttle percent. [0-30]degree", type=int, default=10)
parser.add_argument("-n", "--ncpu", help="number of cores to use.", type=int, default=2)
parser.add_argument("-f", "--hz", help="control frequnecy", type=int)
parser.add_argument("--fpvvideo", help="Take FPV video of DNN driving", action="store_true")
parser.add_argument("--use", help="use [tflite|tf|openvino]", type=str, default="tflite")
parser.add_argument("--pre", help="preprocessing [resize|crop]", type=str, default="resize")
args = parser.parse_args()
if args.throttle:
print ("throttle = %d pct" % (args.throttle))
if args.turnthresh:
args.turnthresh = args.turnthresh
print ("turn angle threshold = %d degree\n" % (args.turnthresh))
if args.hz:
period = 1.0/args.hz
print("new period: ", period)
if args.fpvvideo:
fpv_video = True
print("FPV video of DNN driving is on")
print("period (sec):", period)
print("preprocessing:", args.pre)
##########################################################
# import deeppicar's DNN model
##########################################################
print ("Loading model: " + params.model_file)
print("use:", args.use)
if args.use == "tf":
from tensorflow import keras
model = keras.models.load_model(params.model_file+'.h5')
elif args.use == "openvino":
import openvino as ov
core = ov.Core()
ov_model = core.read_model(params.model_file+'.tflite')
model = core.compile_model(ov_model, 'AUTO')
else:
try:
# Import TFLite interpreter from tflite_runtime package if it's available.
from tflite_runtime.interpreter import Interpreter
interpreter = Interpreter(params.model_file+'.tflite', num_threads=args.ncpu)
except ImportError:
# Import TFLMicro interpreter
try:
from tflite_micro_runtime.interpreter import Interpreter
interpreter = Interpreter(params.model_file+'.tflite')
except:
# If all failed, fallback to use the TFLite interpreter from the full TF package.
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=params.model_file+'.tflite', num_threads=args.ncpu)
interpreter.allocate_tensors()
input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]
# initlaize deeppicar modules
actuator.init(args.throttle)
camera.init(res=cfg_cam_res, fps=cfg_cam_fps, threading=use_thread)
atexit.register(turn_off)
g = g_tick()
start_ts = time.time()
frame_arr = []
angle_arr = []
actuator_times = []
throttle_pct = 0
steering_deg = 0
prev_throttle_pct = -1
prev_steering_deg = -1
# enter main loop
while True:
if use_thread:
time.sleep(next(g))
frame = camera.read_frame()
if frame is None:
print("frame is None")
break
ts = time.time()
# receive input (must be non blocking)
ch = inputdev.read_single_event()
# process input
if ch == ord('j'): # left
steering_deg = -30
elif ch == ord('k'): # center
steering_deg = 0
elif ch == ord('l'): # right
steering_deg = 30
elif ch == ord('u'):
steering_deg += -10
elif ch == ord('o'):
steering_deg += 10
elif ch == ord('a'): # accel
throttle_pct += 5
start_ts = ts
elif ch == ord('z'): # reverse
throttle_pct += -5
elif ch == ord('s'): # stop
throttle_pct = 0
print ("stop")
print ("duration: %.2f" % (ts - start_ts))
enable_record = False # stop recording as well
args.dnn = False # manual mode
elif ch == ord('r'):
enable_record = not enable_record
print ("record mode: ", enable_record)
elif ch == ord('t'):
print ("toggle video mode")
view_video = not view_video
elif ch == ord('d'):
args.dnn = not args.dnn
print ("dnn mode:", args.dnn)
elif ch == ord('q'):
break
if args.dnn == True:
# 1. machine input
img = preprocess(frame)
img = np.expand_dims(img, axis=0).astype(np.float32)
if args.use == "tf":
angle = model.predict(img)[0]
elif args.use == "openvino":
angle = model(img)[0][0][0]
# print ('angle:', angle);
else: # tflite
interpreter.set_tensor(input_index, img)
interpreter.invoke()
angle = interpreter.get_tensor(output_index)[0][0]
steering_deg = rad2deg(angle)
# print(steering_deg, angle)
actuator.set_steering(steering_deg)
actuator.set_throttle(throttle_pct)
else:
# manual mode
if prev_steering_deg != steering_deg:
actuator.set_steering(steering_deg)
if prev_throttle_pct != throttle_pct:
actuator.set_throttle(throttle_pct)
dur = time.time() - ts
if dur > period:
print("%.3f: took %d ms - deadline miss."
% (ts - start_ts, int(dur * 1000)))
# else:
# print("%.3f: took %d ms" % (ts - start_ts, int(dur * 1000)))
if enable_record == True and frame_id == 0:
# create files for data recording
keyfile = open(params.rec_csv_file, 'w+')
keyfile.write("ts,frame,wheel\n") # ts (ms)
try:
fourcc = cv2.cv.CV_FOURCC(*'XVID')
except AttributeError as e:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
vidfile = cv2.VideoWriter(params.rec_vid_file, fourcc,
cfg_cam_fps, cfg_cam_res)
if enable_record == True and frame is not None:
# increase frame_id
frame_id += 1
# write input (angle)
str = "{},{},{}\n".format(int(ts*1000), frame_id, angle)
keyfile.write(str)
if args.dnn and fpv_video:
textColor = (255,255,255)
bgColor = (0,0,0)
newImage = Image.new('RGBA', (100, 20), bgColor)
drawer = ImageDraw.Draw(newImage)
drawer.text((0, 0), "Frame #{}".format(frame_id), fill=textColor)
drawer.text((0, 10), "Angle:{}".format(angle), fill=textColor)
newImage = cv2.cvtColor(np.array(newImage), cv2.COLOR_BGR2RGBA)
frame = overlay_image(frame,
newImage,
x_offset = 0, y_offset = 0)
# write video stream
vidfile.write(frame)
#img_name = "cal_images/opencv_frame_{}.png".format(frame_id)
#cv2.imwrite(img_name, frame)
if frame_id >= 1000:
print ("recorded 1000 frames")
break
print ("%.3f %d %.3f %d(ms)" %
(ts, frame_id, angle, int((time.time() - ts)*1000)))
# update previous steering angle
if prev_steering_deg != steering_deg or prev_throttle_pct != throttle_pct:
prev_steering_deg = steering_deg
prev_throttle_pct = throttle_pct
print ("Finish..")
if len(actuator_times):
print ("Actuator latency measurements: {} trials".format(len(actuator_times)))
print_stats(actuator_times)
turn_off()