-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscreencap.py
95 lines (78 loc) · 2.38 KB
/
screencap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import time
import driver
import time
from PIL import Image
from mss import mss
import queue
from threading import Thread
from utils import debug
import driver
from workers import FrameWriter
lcd = driver.KrakenLCD()
lcd.setupStream()
class RawProducer(Thread):
def __init__(self, rawBuffer: queue.Queue):
Thread.__init__(self)
self.daemon = True
self.rawBuffer = rawBuffer
def run(self):
debug("Screencap worker started")
sct = mss()
while True:
if self.rawBuffer.full():
time.sleep(0.005)
continue
startTime = time.time()
screenshot = sct.grab(
{
"top": 500,
"left": 500,
"width": lcd.resolution.width,
"height": lcd.resolution.height,
}
)
self.rawBuffer.put((screenshot, time.time() - startTime))
class FrameProducer(Thread):
def __init__(self, rawBuffer: queue.Queue, frameBuffer: queue.Queue):
Thread.__init__(self)
self.daemon = True
self.rawBuffer = rawBuffer
self.frameBuffer = frameBuffer
def run(self):
print("Image converter worker started")
while True:
if self.frameBuffer.full():
time.sleep(0.001)
continue
(screenshot, rawTime) = self.rawBuffer.get()
startTime = time.time()
img = Image.frombytes(
"RGB",
(screenshot.width, screenshot.height),
screenshot.rgb,
)
self.frameBuffer.put(
(lcd.imageToFrame(img, adaptive=True), rawTime, time.time() - startTime)
)
rawBuffer = queue.Queue(maxsize=1)
frameBuffer = queue.Queue(maxsize=1)
rawProducer = RawProducer(rawBuffer)
frameProducer = FrameProducer(rawBuffer, frameBuffer)
frameWriter = FrameWriter(frameBuffer, lcd)
rawProducer.start()
frameProducer.start()
frameWriter.start()
print("SignalRGB screencap demo started")
try:
while True:
time.sleep(1)
if not (
rawProducer.is_alive()
and frameProducer.is_alive()
and frameWriter.is_alive()
):
raise KeyboardInterrupt("Some thread is dead")
except KeyboardInterrupt:
frameWriter.shouldStop = True
frameWriter.join()
exit()