-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinker.py
338 lines (300 loc) · 10.4 KB
/
inker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import sys
import numpy as np
import imageio
from matplotlib import pyplot as plt
from tqdm import tqdm
# check if a pixel is ``black''
# takes uint8
def is_black(pixel, threshold):
# do we need to convert to int so that it works properly?
return pixel < threshold
# convert a frame to grayscale
def grayscale(frame):
frame = np.average(frame, axis=2)
return frame.astype(np.uint8)
# read in a block of frames from a VideoCapture
# size = number of frames
# stride = n means only use every nth frame
def read_block(reader, block_size, stride, crop):
block = []
for i in range(block_size*stride):
try:
frame, _ = reader._read_frame()
except StopIteration:
break
if i % stride == 0:
# convert to grayscale and crop
frame = grayscale(frame)
frame = frame[
crop['top']:crop['bottom'],
crop['left']:crop['right']
]
block.append(frame)
return np.asarray(block, dtype=np.uint8)
# find the frame on which a certain pixel gets inked
# returns the index of the frame in the buffer, or None if no ink frame found
def find_ink_frame(frame_buffer, i, j, block_size, bw_cutoff):
# x works forwards
# y works backwards
# start from the middle
x = block_size - 1
y = 0
while x < len(frame_buffer):
# step back until we find a white pixel or the beginning of the buffer
while is_black(frame_buffer[x-y,i,j], bw_cutoff) and x-y >= 0:
y += 1
# once we find a white pixel
# if we've found a long enough run, return
if y >= block_size:
return x-y+1 # +1 because the current pixel is white
# if not, skip forward to the next possible end of a long enough run
else:
x = x-y + block_size
y = 0
# at this point we've reached the end of the buffer without finding a long enough run
return None
# ask for parameters
# returns list of parameters
def get_parameters():
# defaults
stride = 10 # stride = n means only use every nth frame
block_size = 60 # number of frames before a pixel is 'inked'
bw_cutoff = 120 # threshold below which a pixel is 'black'
only_ink_frames = False # skip frames where no pixel changes
outro_length = 3 # number of seconds to hold final frame at end
print("\nplease enter parameters (or leave blank for default value)")
try:
stride = int(input(
f"stride (int n -> keep every nth frame, default {stride}): "
))
except ValueError:
pass
try:
block_size = int(
input(f"block_size (int frames, default {block_size}): ")
)
except ValueError:
pass
try:
bw_cutoff = int(
input(f"bw_cutoff (0-255 pixel intensity, default {bw_cutoff}): ")
)
except ValueError:
pass
try:
only_ink_frames = bool(int(input(
f"only_ink_frames (0-1 bool, default {int(only_ink_frames)}): "
)))
except ValueError:
pass
try:
outro_length = int(
input(f"outro_length(int seconds, default {outro_length}): ")
)
except ValueError:
pass
return stride, block_size, bw_cutoff, only_ink_frames, outro_length
# get crop
# returns dict crop = {'top' : int, 'bottom' : int, 'left' : etc...
def get_crop(final_frame, width, height):
# show the final frame and ask for cropping
print("please enter cropping", flush=True)
plt.imshow(final_frame, cmap='gray', vmin=0, vmax=255)
plt.show(block=True)
yn = False
while not yn:
crop = {
'top' : 0,
'bottom' : height,
'left' : 0,
'right' : width
}
try:
crop['top'] = int(
input(f"first row to include (int pixels, default {crop['top']}): ")
)
except ValueError:
pass
try:
crop['bottom'] = int(
input(f"last row to include (int pixels, default {crop['bottom']}): ")
)
except ValueError:
pass
try:
crop['left'] = int(input(
f"first column to include (int pixels, default {crop['left']}): "
))
except ValueError:
pass
try:
crop['right'] = int(input(
f"last column to include (int pixels, default {crop['right']}): "
))
except ValueError:
pass
print("Is this crop correct?", flush=True)
plt.imshow(final_frame[
crop['top']:crop['bottom'],
crop['left']:crop['right']
], cmap='gray', vmin=0, vmax=255)
plt.show(block=True)
yn = input("Y/y to accept, N/n to redo: ")[0] in ['Y','y']
return crop
# main
def inker(reader, writer, verbose=False):
if verbose: print('reading metadata ... ', flush=True)
# read metadata
metadata = reader.get_meta_data()
width, height = metadata['size']
fps = metadata['fps']
total_frames = reader.count_frames()
if verbose:
print(f"width={width}", flush=True)
print(f"height={height}", flush=True)
print(f"fps={fps}", flush=True)
print(f"total_frames={total_frames}", flush=True)
print('done', flush=True)
[
stride,
block_size,
bw_cutoff,
only_ink_frames,
outro_length
] = get_parameters()
# define effective frames, i.e. number of frames we'll actually use
eff_frames = total_frames // stride
# grab final frame
if verbose: print('processing final frame ... ', end='', flush=True)
final_index = total_frames - 1
final_index = final_index - (final_index % stride) # final effective index
while True: # hack because imageio really struggles here
try:
final_frame = reader.get_data(final_index)
break
except IndexError:
if verbose: print(
f"could not get frame {final_index}, trying {final_index-stride}"
)
final_index -= stride
# convert -> numpy array -> grayscale -> black & white
final_frame = np.array(final_frame, dtype=np.uint8)
final_frame = grayscale(final_frame)
final_frame = np.where(is_black(final_frame, bw_cutoff), 0, 255)
# expect entries to be np.int64 at this point
# ask for cropping
crop = get_crop(final_frame, width, height)
# crop final frame
final_frame = final_frame[
crop['top']:crop['bottom'],
crop['left']:crop['right']
]
# re-initialize reader to get back to first frame
reader._initialize()
# make list of pixels which are black in the final, i.e. need to be inked
uninked = np.where(final_frame == 0)
uninked = list(zip(uninked[0],uninked[1]))
if verbose: print('done', flush=True)
# initialize matrix of ink frames
ink_frame = np.full((
crop['bottom']-crop['top'],
crop['right']-crop['left']
), eff_frames)
# initialize frame buffer
if verbose: print('initializing frame buffer ... ', end='', flush=True)
assert eff_frames > 2*block_size, "not enough frames, reduce block size or stride"
frame_buffer = read_block(reader, block_size, stride, crop)
frame_buffer = np.concatenate((
frame_buffer,
read_block(reader, block_size, stride, crop)
))
if verbose: print('done', flush=True)
# main processing loop
for block_number in tqdm(range(eff_frames//block_size - 1)):
# search for new ink frames
if verbose: print('searching for ink frames ... ', end='', flush=True)
new_inked_pixels = [] # pixels to remove from uninked list
new_inked_frames = [] # list of frames in which a new pixel is inked
for [i,j] in uninked:
f = find_ink_frame(frame_buffer, i, j, block_size, bw_cutoff)
if f != None:
ink_frame[i][j] = block_number*block_size + f
new_inked_pixels.append([i,j])
new_inked_frames.append(f)
# remove pixels we just inked from uninked list
if verbose: count_new_inked = 0
for [i,j] in new_inked_pixels:
if verbose: count_new_inked += 1
uninked.remove((i,j))
if verbose:
print(
f'{count_new_inked} pixels inked in block {block_number}\ndone',
flush=True
)
# rewrite frames in front block
if verbose: print('computing frames ... ', end='', flush=True)
for f in range(block_size):
current_frame = block_number*block_size + f
frame_buffer[f] = np.where(ink_frame <= current_frame, 0, 255)
if verbose: print('done', flush=True)
# write front block to output
if verbose: print('writing frames to file ... ', end='', flush=True)
for f in range(block_size):
if only_ink_frames and f not in new_inked_frames:
pass
else:
writer.append_data(frame_buffer[f])
if verbose: print('done', flush=True)
# delete front block and read in new block
if verbose: print('reading new block ... ', end='', flush=True)
frame_buffer = frame_buffer[block_size:]
try:
frame_buffer = np.concatenate((
frame_buffer,
read_block(reader, block_size, stride, crop)
))
except Exception as error:
print(f"block number: {block_number}/{eff_frames//block_size-2}")
print(f"total frames: {total_frames}")
print(f"effective frames: {eff_frames}")
print(f"block size: {block_size}")
raise error
if verbose: print('done', flush=True)
# Final processing when we've run out of blocks
# at this point frame_buffer should contain 1 full block
# plus the rest of the frames
if verbose: print('final processing loop ... ', end='', flush=True)
block_number = eff_frames//block_size - 1
# search for new ink frames
new_inked_pixels = []
new_inked_frames = []
for [i,j] in uninked:
# step back until we reach a white frame or run out of buffer
x = len(frame_buffer) - 1
y = 0
if not is_black(frame_buffer[x-y][i][j], bw_cutoff):
y = 1
# shouldn't happen, uninked list only has pixels black in final frame
print("WARNING: uninked pixel not black in final frame")
while x-y >= 0 and is_black(frame_buffer[x-y][i][j], bw_cutoff):
y += 1
f = x-y+1
ink_frame[i][j] = block_number*block_size + f
new_inked_pixels.append([i,j])
new_inked_frames.append(f)
for [i,j] in new_inked_pixels:
uninked.remove((i,j))
# set the rest of pixel values
for f in range(len(frame_buffer)):
current_frame = block_number*block_size + f
frame_buffer[f] = np.where(ink_frame > current_frame, 255, 0)
# write everything to output
for f in range(len(frame_buffer)):
if only_ink_frames and f not in new_inked_frames:
pass
else:
writer.append_data(frame_buffer[f])
# write outro
for _ in range(int(outro_length * fps)):
writer.append_data(frame_buffer[-1])
if verbose: print('done', flush=True)