-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
54 lines (46 loc) · 1.61 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
print("TwitchPixel!")
try:
import board
import neopixel
IsRaspberryPi = True
pixels = neopixel.NeoPixel(board.D18, 10)
print('Application running on Raspberry Pi')
except (ImportError, NotImplementedError):
IsRaspberryPi = False
print('Application running on PC')
import asyncio
import json
import re
from LedStrip import LedStrip
from PubSubClient import PubSubClient
from PubSubParser import PubSubParser
twitchWebsocketsUrl = 'wss://pubsub-edge.twitch.tv'
ledStrip = LedStrip(10)
def setLed(r=0, g=0, b=0):
print("Setting RGB: {} {} {}".format(r,g,b))
ledStrip.setColor((r,g,b))
with open('.env', 'r') as secretsFile:
secretsJson = json.load(secretsFile)
accessToken = secretsJson['AccessToken']
async def main():
while True:
msg = await pubsub.getNextMessage()
print("main(): " + msg)
try:
parsedMsg = PubSubParser.parse(msg)
msg = json.loads(msg)["data"]["message"]
msg = json.loads(msg)["data"]["redemption"]["user_input"]
colors = msg.split()
print("main(): " + str(colors))
setLed(int(colors[0]),int(colors[1]),int(colors[2]))
except Exception as e:
print("Exception:", e)
if __name__ == "__main__":
pubsub = PubSubClient(accessToken, ["channel-points-channel-v1.56618017"])
asyncio.get_event_loop().run_until_complete(pubsub.connect())
tasks = [
asyncio.ensure_future(pubsub.receive()),
asyncio.ensure_future(pubsub.heartbeat()),
asyncio.ensure_future(main())
]
asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))