forked from axinc-ai/ailia-models
-
Notifications
You must be signed in to change notification settings - Fork 0
/
posenet_util.py
275 lines (217 loc) · 11.3 KB
/
posenet_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import cv2
import numpy as np
from numpy.lib.stride_tricks import as_strided
LOCAL_MAXIMUM_RADIUS = 1
PART_NAMES = [
"nose", "leftEye", "rightEye", "leftEar", "rightEar", "leftShoulder",
"rightShoulder", "leftElbow", "rightElbow", "leftWrist", "rightWrist",
"leftHip", "rightHip", "leftKnee", "rightKnee", "leftAnkle", "rightAnkle"
]
NUM_KEYPOINTS = len(PART_NAMES)
PART_IDS = {pn: pid for pid, pn in enumerate(PART_NAMES)}
POSE_CHAIN = [
("nose", "leftEye"), ("leftEye", "leftEar"), ("nose", "rightEye"),
("rightEye", "rightEar"), ("nose", "leftShoulder"),
("leftShoulder", "leftElbow"), ("leftElbow", "leftWrist"),
("leftShoulder", "leftHip"), ("leftHip", "leftKnee"),
("leftKnee", "leftAnkle"), ("nose", "rightShoulder"),
("rightShoulder", "rightElbow"), ("rightElbow", "rightWrist"),
("rightShoulder", "rightHip"), ("rightHip", "rightKnee"),
("rightKnee", "rightAnkle")
]
PARENT_CHILD_TUPLES = [(PART_IDS[parent], PART_IDS[child]) for parent, child in POSE_CHAIN]
CONNECTED_PART_NAMES = [
("leftHip", "leftShoulder"), ("leftElbow", "leftShoulder"),
("leftElbow", "leftWrist"), ("leftHip", "leftKnee"),
("leftKnee", "leftAnkle"), ("rightHip", "rightShoulder"),
("rightElbow", "rightShoulder"), ("rightElbow", "rightWrist"),
("rightHip", "rightKnee"), ("rightKnee", "rightAnkle"),
("leftShoulder", "rightShoulder"), ("leftHip", "rightHip")
]
CONNECTED_PART_INDICES = [(PART_IDS[a], PART_IDS[b]) for a, b in CONNECTED_PART_NAMES]
def traverse_to_targ_keypoint(
edge_id, source_keypoint, target_keypoint_id, scores, offsets, output_stride, displacements
):
height = scores.shape[1]
width = scores.shape[2]
source_keypoint_indices = np.clip(
np.round(source_keypoint / output_stride), a_min=0, a_max=[height - 1, width - 1]).astype(np.int32)
displaced_point = source_keypoint + displacements[
edge_id, source_keypoint_indices[0], source_keypoint_indices[1]]
displaced_point_indices = np.clip(
np.round(displaced_point / output_stride), a_min=0, a_max=[height - 1, width - 1]).astype(np.int32)
score = scores[target_keypoint_id, displaced_point_indices[0], displaced_point_indices[1]]
image_coord = displaced_point_indices * output_stride + offsets[
target_keypoint_id, displaced_point_indices[0], displaced_point_indices[1]]
return score, image_coord
def decode_pose(
root_score, root_id, root_image_coord,
scores,
offsets,
output_stride,
displacements_fwd,
displacements_bwd
):
num_parts = scores.shape[0]
num_edges = len(PARENT_CHILD_TUPLES)
instance_keypoint_scores = np.zeros(num_parts)
instance_keypoint_coords = np.zeros((num_parts, 2))
instance_keypoint_scores[root_id] = root_score
instance_keypoint_coords[root_id] = root_image_coord
for edge in reversed(range(num_edges)):
target_keypoint_id, source_keypoint_id = PARENT_CHILD_TUPLES[edge]
if (instance_keypoint_scores[source_keypoint_id] > 0.0 and
instance_keypoint_scores[target_keypoint_id] == 0.0):
score, coords = traverse_to_targ_keypoint(
edge,
instance_keypoint_coords[source_keypoint_id],
target_keypoint_id,
scores, offsets, output_stride, displacements_bwd)
instance_keypoint_scores[target_keypoint_id] = score
instance_keypoint_coords[target_keypoint_id] = coords
for edge in range(num_edges):
source_keypoint_id, target_keypoint_id = PARENT_CHILD_TUPLES[edge]
if (instance_keypoint_scores[source_keypoint_id] > 0.0 and
instance_keypoint_scores[target_keypoint_id] == 0.0):
score, coords = traverse_to_targ_keypoint(
edge,
instance_keypoint_coords[source_keypoint_id],
target_keypoint_id,
scores, offsets, output_stride, displacements_fwd)
instance_keypoint_scores[target_keypoint_id] = score
instance_keypoint_coords[target_keypoint_id] = coords
return instance_keypoint_scores, instance_keypoint_coords
def within_nms_radius_fast(pose_coords, squared_nms_radius, point):
if not pose_coords.shape[0]:
return False
return np.any(np.sum((pose_coords - point) ** 2, axis=1) <= squared_nms_radius)
def get_instance_score_fast(
exist_pose_coords,
squared_nms_radius,
keypoint_scores, keypoint_coords):
if exist_pose_coords.shape[0]:
s = np.sum((exist_pose_coords - keypoint_coords) ** 2, axis=2) > squared_nms_radius
not_overlapped_scores = np.sum(keypoint_scores[np.all(s, axis=0)])
else:
not_overlapped_scores = np.sum(keypoint_scores)
return not_overlapped_scores / len(keypoint_scores)
def pool2d(A, kernel_size, stride, padding=0, pool_mode='max'):
# Padding
A = np.pad(A, padding, mode='constant')
# Window view of A
output_shape = ((A.shape[0] - kernel_size) // stride + 1,
(A.shape[1] - kernel_size) // stride + 1)
shape_w = (output_shape[0], output_shape[1], kernel_size, kernel_size)
strides_w = (stride*A.strides[0], stride*A.strides[1], A.strides[0], A.strides[1])
A_w = as_strided(A, shape_w, strides_w)
# Return the result of pooling
if pool_mode == 'max':
return A_w.max(axis=(2, 3))
elif pool_mode == 'avg':
return A_w.mean(axis=(2, 3))
def build_part_with_score(score_threshold, local_max_radius, scores):
lmd = 2 * local_max_radius + 1
max_vals = np.array([pool2d(channel, lmd, 1,1) for channel in np.array(scores)])
max_loc = (scores == max_vals) & (scores >= score_threshold)
max_loc_idx = np.argwhere(max_loc)
scores_vec = (scores[max_loc])
sort_idx = np.argsort(-scores_vec)
return scores_vec[sort_idx], max_loc_idx[sort_idx]
def decode_multiple_poses(
scores, offsets, displacements_fwd, displacements_bwd, output_stride,
max_pose_detections=10, score_threshold=0.5, nms_radius=20, min_pose_score=0.5):
# perform part scoring step on GPU as it's expensive
# TODO determine how much more of this would be worth performing on the GPU
part_scores, part_idx = build_part_with_score(score_threshold, LOCAL_MAXIMUM_RADIUS, scores)
height = scores.shape[1]
width = scores.shape[2]
# change dimensions from (x, h, w) to (x//2, h, w, 2) to allow return of complete coord array
offsets = offsets.reshape(2, -1, height, width).transpose((1, 2, 3, 0))
displacements_fwd = displacements_fwd.reshape(2, -1, height, width).transpose((1, 2, 3, 0))
displacements_bwd = displacements_bwd.reshape(2, -1, height, width).transpose((1, 2, 3, 0))
squared_nms_radius = nms_radius ** 2
pose_count = 0
pose_scores = np.zeros(max_pose_detections)
pose_keypoint_scores = np.zeros((max_pose_detections, NUM_KEYPOINTS))
pose_keypoint_coords = np.zeros((max_pose_detections, NUM_KEYPOINTS, 2))
for root_score, (root_id, root_coord_y, root_coord_x) in zip(part_scores, part_idx):
root_coord = np.array([root_coord_y, root_coord_x])
root_image_coords = root_coord * output_stride + offsets[root_id, root_coord_y, root_coord_x]
if within_nms_radius_fast(
pose_keypoint_coords[:pose_count, root_id, :], squared_nms_radius, root_image_coords):
continue
keypoint_scores, keypoint_coords = decode_pose(
root_score, root_id, root_image_coords,
scores, offsets, output_stride,
displacements_fwd, displacements_bwd)
pose_score = get_instance_score_fast(
pose_keypoint_coords[:pose_count, :, :], squared_nms_radius, keypoint_scores, keypoint_coords)
# NOTE this isn't in the original implementation, but it appears that by initially ordering by
# part scores, and having a max # of detections, we can end up populating the returned poses with
# lower scored poses than if we discard 'bad' ones and continue (higher pose scores can still come later).
# Set min_pose_score to 0. to revert to original behaviour
if min_pose_score == 0. or pose_score >= min_pose_score:
pose_scores[pose_count] = pose_score
pose_keypoint_scores[pose_count, :] = keypoint_scores
pose_keypoint_coords[pose_count, :, :] = keypoint_coords
pose_count += 1
if pose_count >= max_pose_detections:
break
return pose_scores, pose_keypoint_scores, pose_keypoint_coords
def valid_resolution(width, height, output_stride=16):
target_width = (int(width) // output_stride) * output_stride + 1
target_height = (int(height) // output_stride) * output_stride + 1
return target_width, target_height
def process_input(source_img, scale_factor=1.0, output_stride=16):
target_width, target_height = valid_resolution(
source_img.shape[1] * scale_factor, source_img.shape[0] * scale_factor, output_stride=output_stride)
scale = np.array([source_img.shape[0] / target_height, source_img.shape[1] / target_width])
input_img = cv2.resize(source_img, (target_width, target_height), interpolation=cv2.INTER_LINEAR)
input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB).astype(np.float32)
input_img = input_img * (2.0 / 255.0) - 1.0
input_img = input_img.transpose((2, 0, 1)).reshape(1, 3, target_height, target_width)
return input_img, source_img, scale
def draw_keypoints(
img, instance_scores, keypoint_scores, keypoint_coords,
min_pose_confidence=0.5, min_part_confidence=0.5):
cv_keypoints = []
for ii, score in enumerate(instance_scores):
if score < min_pose_confidence:
continue
for ks, kc in zip(keypoint_scores[ii, :], keypoint_coords[ii, :, :]):
if ks < min_part_confidence:
continue
cv_keypoints.append(cv2.KeyPoint(kc[1], kc[0], 10. * ks))
out_img = cv2.drawKeypoints(img, cv_keypoints, outImage=np.array([]))
return out_img
def get_adjacent_keypoints(keypoint_scores, keypoint_coords, min_confidence=0.1):
results = []
for left, right in CONNECTED_PART_INDICES:
if keypoint_scores[left] < min_confidence or keypoint_scores[right] < min_confidence:
continue
results.append(
np.array([keypoint_coords[left][::-1], keypoint_coords[right][::-1]]).astype(np.int32),
)
return results
def draw_skel_and_kp(
img, instance_scores, keypoint_scores, keypoint_coords,
min_pose_score=0.5, min_part_score=0.5):
out_img = img
adjacent_keypoints = []
cv_keypoints = []
for ii, score in enumerate(instance_scores):
if score < min_pose_score:
continue
new_keypoints = get_adjacent_keypoints(
keypoint_scores[ii, :], keypoint_coords[ii, :, :], min_part_score)
adjacent_keypoints.extend(new_keypoints)
for ks, kc in zip(keypoint_scores[ii, :], keypoint_coords[ii, :, :]):
if ks < min_part_score:
continue
cv_keypoints.append(cv2.KeyPoint(kc[1], kc[0], 10. * ks))
if cv_keypoints:
out_img = cv2.drawKeypoints(
out_img, cv_keypoints, outImage=np.array([]), color=(255, 255, 0),
flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
out_img = cv2.polylines(out_img, adjacent_keypoints, isClosed=False, color=(255, 255, 0))
return out_img