-
Notifications
You must be signed in to change notification settings - Fork 1
/
rotating.py
76 lines (62 loc) · 2.03 KB
/
rotating.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import time
import driver
from PIL import Image, ImageDraw
from io import BytesIO
import queue
import colorsys
from threading import Thread, Event
from utils import FPS
from mss import mss
from utils import FPS, debug
from workers import FrameWriter
lcd = driver.KrakenLCD()
lcd.setupStream()
class FrameProducer(Thread):
def __init__(self, frameBuffer: queue.Queue):
Thread.__init__(self)
self.daemon = True
self.frameBuffer = frameBuffer
def hsv2rgb(self, h, s, v):
return tuple(round(i * 255) for i in colorsys.hsv_to_rgb(h, s, v))
def run(self):
print("Frame generator worker started")
frameCount = 0
while True:
if self.frameBuffer.full():
time.sleep(0.001)
continue
startTime = time.time()
color = self.hsv2rgb(((5 * frameCount) % 360) / 360, 1, 1)
img = Image.new("RGB", lcd.resolution)
draw = ImageDraw.Draw(img)
draw.rectangle(
[(0, 0), (lcd.resolution.width // 2, lcd.resolution.height // 2)],
fill=color,
)
draw.rectangle(
[
(lcd.resolution.width // 2, lcd.resolution.height // 2),
(lcd.resolution.width, lcd.resolution.height),
],
fill=color,
)
img = img.rotate(frameCount * 2)
self.frameBuffer.put(
(lcd.imageToFrame(img, adaptive=True), 0, time.time() - startTime)
)
frameCount += 1
frameBuffer = queue.Queue(maxsize=1)
frameProducer = FrameProducer(frameBuffer)
frameWriter = FrameWriter(frameBuffer, lcd)
frameProducer.start()
frameWriter.start()
print("SignalRGB rotating demo started")
try:
while True:
time.sleep(1)
if not (frameProducer.is_alive() and frameWriter.is_alive()):
raise KeyboardInterrupt("Some thread is dead")
except KeyboardInterrupt:
frameWriter.shouldStop = True
frameWriter.join()
exit()