forked from declension/squeeze-alexa
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt_squeeze.py
executable file
·92 lines (73 loc) · 2.66 KB
/
mqtt_squeeze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2018 Nick Boultbee
# This file is part of squeeze-alexa.
#
# squeeze-alexa is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# See LICENSE for full license
import socket
import sys
import telnetlib
from os.path import dirname, abspath
import paho.mqtt.client as mqtt
# Sort out running directly
path = dirname(dirname(abspath(__file__)))
sys.path.append(path)
from squeezealexa.settings import MQTT_SETTINGS, LMS_SETTINGS
from squeezealexa.transport.mqtt import CustomClient
from squeezealexa.utils import print_d, print_w
telnet = None
def on_connect(client, data, flags, rc):
print_d("Connect: {msg}", msg=mqtt.error_string(rc))
client.subscribe(MQTT_SETTINGS.topic_req, qos=1)
def on_subscribe(client, data, mid, granted_qos):
print_d("Subscribed to {topic} @ QOS {granted_qos})",
topic=MQTT_SETTINGS.topic_req, **locals())
def on_message(client, userdata, message):
num_lines = message.payload.count(b'\n')
msg = message.payload.decode('utf-8')
if MQTT_SETTINGS.debug:
print_d(">>> {msg} (@QoS {qos})", msg=msg.strip(), qos=message.qos)
telnet.write(message.payload.strip() + b'\n')
resp_lines = []
while len(resp_lines) < num_lines:
resp_lines.append(telnet.read_until(b'\n').strip())
rsp = b'\n'.join(resp_lines)
if rsp:
if MQTT_SETTINGS.debug:
print_d("<<< {msg}", msg=rsp.decode('utf-8'))
client.publish(MQTT_SETTINGS.topic_resp, rsp, qos=1)
else:
print_d("No reply")
def connect_cli():
global telnet
telnet = telnetlib.Telnet(host=MQTT_SETTINGS.internal_server_hostname,
port=LMS_SETTINGS.cli_port, timeout=5)
print_d("Connected to Squeezeserver CLI")
return telnet
if __name__ == "__main__":
if not MQTT_SETTINGS.configured:
print("MQTT transport not configured. Check your settings")
exit(1)
try:
telnet = connect_cli()
except socket.timeout as e:
print_w("Couldn't connect to Squeeze CLI using {settings} ({err})",
settings=MQTT_SETTINGS, err=e)
exit(3)
else:
client = CustomClient(MQTT_SETTINGS)
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message
client.connect()
# Continue the network loop
client.loop_forever(retry_first_connection=True)
finally:
if telnet:
telnet.close()