-
Notifications
You must be signed in to change notification settings - Fork 2
/
data_utils.py
312 lines (278 loc) · 10.8 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""
Utils for parsing and visualizing IMU and GNSS data files generated by GNSS/IMU Logger app
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from coordinate_utils import posGeodetic2Ecef, ecef2Ned, deg2rad
PLOT_LABELS = {1: "Acceleration [m/s^2]",
2: "Angular Rate [rad/s]",
3: "Magnetic Field [uT]",
4: "GNSS Position",
5: "GNSS Velocity [m/s]"
}
AXES_NAMES = {1: ['x', 'y', 'z'],
2: ['x', 'y', 'z'],
3: ['x', 'y', 'z'],
4: ['lat', 'lon', 'alt'],
5: ['Vn', 'Ve', 'Vd']
}
def extract_single_imu_sensor_data(in_file_path, sensor_id, zero_times=False, time_in_seconds=False):
"""
Extracts the data from a single sensor in the form
of a dictionary
-sensor_id: 1->accel, 2->gyro, 3->mag
-zero_times: subtract the initial time from all time values
"""
data_dict = {"time" : [], # [ms]
"x" : [],
"y" : [],
"z" : []}
start_time = None
with open(in_file_path, 'r') as f:
for line in f:
if not line[0].isnumeric():
continue # skip header
stamp, sensor, x, y, z = line.rstrip('\n').split(',')
if time_in_seconds:
stamp = float(stamp) / 1000.0
else:
stamp = float(stamp)
if int(sensor) == sensor_id:
if zero_times:
if not start_time:
start_time = stamp
time = 0
else:
time = stamp - start_time
else:
time = stamp
data_dict["time"].append(time)
data_dict["x"].append(float(x))
data_dict["y"].append(float(y))
data_dict["z"].append(float(z))
print(f"Loaded IMU sensor<{sensor_id}> data")
return data_dict
def extract_gnss_sensor_data(in_file_path, zero_times=False, time_in_seconds=False):
"""
Extracts the data into a dictionary
Inputs:
-zero_times, subtract the initial time from all time values
"""
data_dict = {"time": [], # [ms] or [s]
"lat": [], # [rad]
"lon": [], # [rad]
"alt": [], # [m]
"speed": [], # [m/s]
"accuracy": []} # [m]
start_time = None
with open(in_file_path, 'r') as f:
for line in f:
if not line[0:3] == "Fix":
continue # skip header
_, _, lat, lon, alt, speed, accuracy, stamp = line.rstrip('\n').split(',')
if time_in_seconds:
stamp = float(stamp) / 1000
else:
stamp = float(stamp)
if zero_times:
if not start_time:
start_time = stamp
time = 0
else:
time = stamp - start_time
else:
time = stamp
data_dict["time"].append(time)
data_dict["lat"].append(deg2rad(float(lat)))
data_dict["lon"].append(deg2rad(float(lon)))
data_dict["alt"].append(float(alt))
data_dict["speed"].append(float(speed))
data_dict["accuracy"].append(float(accuracy))
print(f"Loaded GNSS sensor data")
return data_dict
def load_processed_data_file(in_file_path):
"""
Loads the processed data file into a dictionary
"""
data_dict = {'time': [], 'accel': [], 'gyro': [], 'mag': [], 'vel': [], 'pos': []}
with open(in_file_path, 'r') as f:
for line in f:
if not line[0].isnumeric():
continue # skip header
data = line.rstrip('\n').split(',')
data_dict['time'].append(float(data[0]))
data_dict['accel'].append(np.array([float(val) for val in data[1:4]]))
data_dict['gyro'].append(np.array([float(val) for val in data[4:7]]))
if data[7] == 'nan': # no mag measurement
data_dict['mag'].append(np.full(3, np.nan, dtype=float))
else:
data_dict['mag'].append(np.array([float(val) for val in data[7:10]]))
if data[10] == 'nan': # no gnss measurement
data_dict['vel'].append(np.full(3, np.nan, dtype=float))
data_dict['pos'].append(np.full(3, np.nan, dtype=float))
else:
data_dict['vel'].append(np.array([float(val) for val in data[10:13]]))
data_dict['pos'].append(np.array([float(val) for val in data[13:16]]))
print(f"Loaded <processed sensor data>: [{in_file_path.split('/')[-1]}]")
return data_dict
def compute_gnss_velocity(pos_geo1, speed1, pos_geo2, speed2):
"""
Uses the speed and the change in geodetic location to estimate the velocity vector in NED coords
Inputs:
- pos_geo1, geodetic coords at initial position (3,1)
- speed1, speed at initial position
- pos_geo2, geodetic coords at new position (3,1)
- speed2, speed at new position
"""
shape = pos_geo1.shape
speed_avg = (speed1 + speed2) / 2
pos_ecef1 = posGeodetic2Ecef(pos_geo1.reshape(-1, 1))
pos_ecef2 = posGeodetic2Ecef(pos_geo2.reshape(-1, 1))
vel_dir = (pos_ecef2 - pos_ecef1)
norm = np.linalg.norm(vel_dir)
if norm < 1e-4:
vel_ecef = np.zeros((3, 1))
else:
vel_ecef = speed_avg * vel_dir / norm
vel_ned = ecef2Ned(vel_ecef, pos_geo2.reshape(-1, 1))
return vel_ned.reshape(shape)
def compute_logging_rates(sensor_data):
"""
Computes the time steps between samples for a sensor
"""
times = np.array(sensor_data["time"])
time_steps = times[1:] - times[0:-1]
rates = 1.0 / time_steps
return list(time_steps), list(rates)
def get_subsets_between_times(sensor_data, clip_points):
"""
Returns subsets of the input dataset between the requested
times
-sensor_data: (pandas DataFrame) the input dataset
-clip points: list(tuples) the start and end points to section
"""
sub_datasets = []
for time_slice in clip_points:
start_time = time_slice[0]
end_time = time_slice[1]
if start_time > end_time:
raise ValueError("Start time must be before end time")
start_condition = sensor_data.time >= start_time
end_condition = sensor_data.time < end_time
data_slice = sensor_data[start_condition & end_condition]
if len(clip_points) == 1:
return data_slice
else:
sub_datasets.append(data_slice)
return sub_datasets
def single_measurement_correction(measurement, biases, scale_inv):
measurement = np.array(measurement).reshape(-1, 1)
corrected_meas = scale_inv @ (measurement - biases)
return list(corrected_meas.reshape(-1,))
def batch_measurement_correction(data, biases, scale_inv):
"""
Performs a batch calibration correction on a Pandas Dataframe
or numpy array with columns [time, x, y, z].
Returns an object with same shape and type as the input
"""
try:
out_data = data.to_numpy()
is_data_frame = True
except AttributeError:
out_data = data.copy()
is_data_frame = False
out_data = out_data[:, 1:] # drop time col
out_data -= biases.reshape(-1,)[np.newaxis, :]
out_data = np.dot(scale_inv, out_data.T).T
if is_data_frame:
data.iloc[:, 1:] = out_data
else:
data[:, 1:] = out_data
return data
def visualize_3axis_timeseries(sensor_data, sensor_id):
"""
Takes input of dictionary or pandas DataFrame
for a single sensor and plots the data
"""
labels = AXES_NAMES[sensor_id]
fig, ax = plt.subplots(1, 1, figsize=(8, 8))
ax.plot(sensor_data["time"], sensor_data[labels[0]], label=labels[0])
ax.plot(sensor_data["time"], sensor_data[labels[1]], label=labels[1])
ax.plot(sensor_data["time"], sensor_data[labels[2]], label=labels[2])
ax.set_xlabel("time")
ax.set_ylabel(PLOT_LABELS[sensor_id])
ax.grid()
ax.legend()
plt.show()
def visualize_3D_scatter(sensor_data, sensor_id):
"""
Takes input of dictionary or pandas DataFrame
for a single sensor and scatters each point in 3D
"""
labels = AXES_NAMES[sensor_id]
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, projection='3d')
xs, ys, zs = [sensor_data[labels[0]], sensor_data[labels[1]], sensor_data[labels[2]]]
ax.scatter(xs, ys, zs)
ax.set_box_aspect((np.ptp(xs), np.ptp(ys), np.ptp(zs)))
ax.set(xlabel=labels[0], ylabel=labels[1], zlabel=labels[2], title=PLOT_LABELS[sensor_id])
ax.grid()
plt.show()
def visualize_2D_projection(sensor_data, sensor_id):
"""
Takes input of dictionary or pandas DataFrame
for a single sensor and overlays the 3 planar projections
"""
labels = AXES_NAMES[sensor_id]
fig, ax = plt.subplots(1, 1, figsize=(8,8))
ax.axis('equal')
ax.scatter(sensor_data[labels[0]], sensor_data[labels[1]], label='x-y')
ax.scatter(sensor_data[labels[0]], sensor_data[labels[2]], label='x-z')
ax.scatter(sensor_data[labels[1]], sensor_data[labels[2]], label='y-z')
ax.set_title(PLOT_LABELS[sensor_id])
ax.grid()
ax.legend()
plt.show()
def visualize_vec_rotation(vec1, vec2, axis=None):
"""
Visualization rotation from vec1 to vec2 about axis
"""
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
v1_plt = np.hstack((np.zeros(3,1), vec1))
v2_plt = np.hstack((np.zeros(3,1), vec2))
if axis:
ax_plt = np.hstack((np.zeros(3,1), axis))
ax.plot(ax_plt[0], ax_plt[1], ax_plt[2], '-b')
ax.plot(v1_plt[0], v1_plt[1], v1_plt[2], '-g')
ax.plot(v2_plt[0], v2_plt[1], v2_plt[2], '-r')
ax.plot()
plt.show()
# def get_clip_conditions_from_user(sensor_data, message=None):
# """
# Asks for inputs of times to clip the data at.
# Takes input of pandas dataframe and string message to display
# """
# print('\n')
# if message:
# print(message)
# else:
# print("Data clip points")
# valid_times = False
# while not valid_times:
# start_time = float(input("Enter start time (ms): "))
# end_time = float(input("Enter end time (ms): "))
# if start_time > sensor_data.iloc[-5, 0]:
# print("start time too close to end of file")
# continue
# elif end_time > sensor_data.iloc[-1, 0]:
# print("Warning end time outside end of file")
# end_time = sensor_data.iloc[-1, 0]
# elif end_time < start_time:
# print("end time cannot be before start time")
# continue
# valid_times = True
# start_condition = sensor_data.time > start_time
# end_condition = sensor_data.time < end_time
# return start_condition, end_condition