-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhs110exporter.py
executable file
·318 lines (272 loc) · 11.8 KB
/
hs110exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#!/usr/local/bin/python3 -u
# -*- coding: utf-8 -*-
import time
import socket
import argparse
import json
import sys
import re
from typing import Union
from dpcontracts import require, ensure
from prometheus_client import start_http_server, Gauge
# import VERSION
VERSION = 0.99
SOCKET_RETRY = 100
@require("ip must be a string", lambda args: isinstance(args.ip, str))
@require("ip must not be empty", lambda args: len(args.ip) > 0)
@ensure("result is part of input", lambda args, result: result in args.ip)
def valid_ip(ip: str) -> str:
""" Check type format and valid IP for input parameter """
ip = ip.strip() # Remove trailing spaces
try:
socket.inet_pton(socket.AF_INET, ip)
except socket.error:
raise ValueError("Invalid IP Address %s" % ip) from None
return ip
@require("label must be a string", lambda args: isinstance(args.label, str))
@require("label must not be empty", lambda args: len(args.label) > 0)
def valid_label(label: str) -> tuple:
""" Check type format of label=value """
label = label.strip() # Revove triling spaces
regex = re.search('(\w+)=(\w+)', label) # noqa: W605
if (regex is None) or \
(len(regex.groups()) != 2) or \
(regex.start(0) != 0) or \
(regex.end(0) != len(label)):
raise ValueError("Invalid Label %s" % label) from None
return tuple(regex.groups())
class HS110data:
""" Storage and management for HS110 data """
@require("ip must be a valid IP", lambda args: valid_ip(args.ip))
@require("hardware_version must be string",
lambda args: isinstance(args.hardware_version, str))
@require("hardware_version must be 'h1' or 'h2' ",
lambda args: args.hardware_version in ['h1', 'h2'])
@require("port must be intenger",
lambda args: isinstance(args.port, int)
and args.port >= 0 and args.port <= 65535)
def __init__(self,
hardware_version: str = 'h2',
ip: str = '192.168.1.53',
port: int = 9999) -> None:
""" Constructor for HS110 data
hardware_version: defaults to 'h2' can also be 'h1'
port: hss110 target port, for h1 and h2 is 9999,
"""
self.__hardware = hardware_version
self.__keyname = {
"h1": { # Hardware version 1.x
"current": "current",
"voltage": "voltage",
"power": "power",
"total": "total"
},
"h2": { # Hardware version 2.x
"current": "current_ma",
"voltage": "voltage_mv",
"power": "power_mw",
"total": "total_wh"
}
}
self.__received_data = self.__empty_data()
# Encryption and Decryption of TP-Link Smart Home Protocol
# XOR Autokey Cipher with starting key = 171
self.__hs110_key = 171
# HS110 address and port
self.__ip = ip
self.__port = port
self.__socket_counter = SOCKET_RETRY
@require("The encrypt parameter must be str type",
lambda args: isinstance(args.string, str))
@require("String must not be empty", lambda args: len(args.string) > 0)
@ensure("Result must be bytes", lambda args, result: isinstance(result, bytes))
def __encrypt(self, string: str) -> bytes:
""" Encrypts string to send to HS110 """
key = self.__hs110_key
result = b"\0\0\0" + bytes([len(string)])
for i in bytes(string.encode('latin-1', 'replace')):
a = key ^ i
key = a
result += bytes([a])
return result
@require("The decrypt parameter must be bytes type",
lambda args: isinstance(args.data, bytes))
@require("Parameter must have more than 3 bytes starting with 000",
lambda args: len(args.data) > 3 and args.data[0:3] == b"\0\0\0")
@ensure("Result must be str", lambda args, result: isinstance(result, str))
def __decrypt(self, data: bytes) -> str:
""" Decrypts bytestring received by HS110 """
data = data[4:]
key = self.__hs110_key
result = b""
for i in bytes(data):
a = key ^ i
key = i
result += bytes([a])
return result.decode('latin-1', 'replace')
@ensure("Result must be str", lambda args, result: isinstance(result, str))
@ensure("Result str must not be empty", lambda args, result: len(result) > 0)
def __str__(self) -> str:
""" Prints content of received HS110 data """
return ', '.join(
['{key}={value}'.format(
key=key, value=self.__received_data['emeter']['get_realtime'].get(key)
) for key in self.__received_data['emeter']['get_realtime']]
)
@ensure("Result must be dict", lambda args, result: isinstance(result, dict))
def __empty_data(self) -> dict:
""" Clear received data to 0 values """
return {
"emeter": {
"get_realtime": {
self.__keyname[self.__hardware]['current']: 0,
self.__keyname[self.__hardware]['voltage']: 0,
self.__keyname[self.__hardware]['power']: 0,
self.__keyname[self.__hardware]['total']: 0,
"err_code": 0
}
}
}
@require("Parameter data must be bytes type", lambda args: isinstance(args.data, bytes))
@require("Parameter must have more than 3 bytes starting with 000",
lambda args: len(args.data) > 3 and args.data[0:3] == b"\0\0\0")
def receive(self, data: bytes) -> None:
""" Receive encrypted data, decrypts and stores into self.reived_data """
try:
self.__received_data = json.loads(self.__decrypt(data))
except Exception:
raise ValueError("json.loads decrypt data") from None
if "current_ma" in self.__received_data['emeter']['get_realtime']:
self.__hardware = 'h2'
else:
self.__hardware = 'h1'
@ensure("Result must be a bytes", lambda args, result: isinstance(result, bytes))
def get_cmd(self) -> bytes:
""" Get encrypted command to get realtime info from HS110 """
cmd = '{"emeter":{"get_realtime":{}}}'
return self.__encrypt(cmd)
@require("Parameter data must be str type", lambda args: isinstance(args.item, str))
@ensure("Result must be a float or int",
lambda args, result: isinstance(result, (float, int)))
def get_data(self, item: str) -> Union[float, int]:
""" Get item (power, current, voltage or total) from HS110 array of values """
try:
return float(
self.__received_data["emeter"]
["get_realtime"]
[self.__keyname[self.__hardware][item]]
)
except KeyError:
raise KeyError(
'get_data parameter must be one of: ['
+ ', '.join(self.__received_data["emeter"]["get_realtime"].keys()) + ']'
) from None
@ensure("Result must be a string", lambda args, result: isinstance(result, str))
def get_connection_info(self) -> str:
return 'HS110 connection: %s:%s' % (self.__ip, str(self.__port))
def reset_data(self) -> None:
""" Reset self.__received_data values to 0 """
self.__received_data = self.__empty_data()
def connect(self) -> None:
""" Connect to hss110 with get command to receive metrics """
self.send(self.get_cmd())
def send(self, command: bytes) -> None:
""" Send command to hs110 and receive data """
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.settimeout(2)
try:
sock_tcp.connect((self.__ip, self.__port))
sock_tcp.send(command)
data = sock_tcp.recv(2048)
sock_tcp.close()
self.__socket_counter = SOCKET_RETRY
# Sample return value received:
# HS110 Hardware 1:
# {"emeter":{"get_realtime":
# {"voltage":229865,"current":1110,"power":231866,"total":228,"err_code":0}}}
# HS110 Hardware 2:
# {"emeter":{"get_realtime":
# {"voltage_mv":229865,"current_ma":1110,"power_mw":231866,"total_wh":228,"err_code":0}}}
self.receive(data) # Receive and decrypts data
except socket.error as error:
print(
"[error] Could not connect to the host %s" % format(error)
+ self.__ip
+ ":"
+ str(self.__port)
+ " Keeping last values"
)
self.__socket_counter -= 1
if self.__socket_counter == 0:
sys.exit('Connection retry limit %s reached' % SOCKET_RETRY)
except ValueError:
self.reset_data()
print("[warning] Could not decrypt data from hs110. Reseting values.")
@require("Parameter data must be argparse.Namespace type",
lambda args: isinstance(args.args, argparse.Namespace))
@require("Parameter port must be int type",
lambda args: isinstance(args.args.port, int))
@require("Parameter frequency must be int type",
lambda args: isinstance(args.args.frequency, int))
@require("Parameter target must be str type",
lambda args: isinstance(args.args.target, str))
def main(args: argparse.Namespace) -> None:
# Init object
hs110 = HS110data(hardware_version='h2', ip=args.target)
name = args.label[0]
value = args.label[1]
print('[info] Extra label %s="%s"' % (name, value))
# Create a metric to track time spent and requests made.
# Gaugage: it goes up and down, snapshot of state
request_power = Gauge('hs110_power_watt', 'HS110 Watt measure', [name])
request_current = Gauge('hs110_current', 'HS110 Current measure', [name])
request_voltage = Gauge('hs110_voltage', 'HS110 Voltage measure', [name])
request_total = Gauge('hs110_total', 'HS110 Energy measure', [name])
request_power \
.labels(value) \
.set_function(lambda: hs110.get_data('power')) # pragma: no cover
request_current \
.labels(value) \
.set_function(lambda: hs110.get_data('current')) # pragma: no cover
request_voltage \
.labels(value) \
.set_function(lambda: hs110.get_data('voltage')) # pragma: no cover
request_total \
.labels(value) \
.set_function(lambda: hs110.get_data('total')) # pragma: no cover
print('[info] %s' % hs110.get_connection_info())
# Start up the server to expose the metrics.
start_http_server(args.port)
print("[info] Exporter listening on TCP: " + str(args.port))
# Main loop
while True:
hs110.connect()
print('[info] %s' % hs110)
time.sleep(args.frequency)
@ensure("Result must be args.Namespace", # pragma: no cover
lambda args, result: isinstance(result, argparse.Namespace))
def command_line_args() -> argparse.Namespace:
# Parse commandline arguments
parser = argparse.ArgumentParser(
description="TP-Link HS110 Wi-Fi Smart Plug Prometheus exporter v" + str(VERSION)
)
parser.add_argument(
"-t",
"--target", metavar="<ip>", required=True,
help="Target IP Address", type=valid_ip)
parser.add_argument(
"-f",
"--frequency", metavar="<seconds>", required=False,
help="Interval in seconds between checking measures", default=1, type=int)
parser.add_argument(
"-p",
"--port", metavar="<port>", required=False,
help="Port for listenin", default=8110, type=int)
parser.add_argument(
"-l",
"--label", metavar="<string>", required=False,
help="Extra label on metrics", default=('location', 'home'), type=valid_label)
return parser.parse_args()
# Main entry point
if __name__ == '__main__':
main(command_line_args())