forked from rsalveti/esp8266
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt-ds18s20.py
59 lines (51 loc) · 1.78 KB
/
mqtt-ds18s20.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#
# Copyright: Ricardo Salveti <[email protected]>
#
# SPDX-License-Identifier: MIT
import time
import machine
import onewire, ds18x20
import ubinascii
import utils
from umqtt.robust import MQTTClient
def main():
config = utils.Config()
# On board LED
led = machine.Pin(2, machine.Pin.OUT, machine.Pin.PULL_UP)
ds_data = machine.Pin(config.get("w1_gpio"), 14)
ds = ds18x20.DS18X20(onewire.OneWire(ds_data))
# Scan for devices on the bus
roms = ds.scan()
if roms:
print("Found DS18S20 sensors:",
"".join("%s " % ubinascii.hexlify(b).format() for b in roms))
else:
print("No DS18S20 sensor found, nothing to be reported")
return
client_id = "esp8266_" + ubinascii.hexlify(machine.unique_id()).format()
client = MQTTClient(client_id, config.get("mqtt_broker"),
config.get("mqtt_port"), config.get("mqtt_user"),
config.get("mqtt_passwd"))
try:
client.connect()
except OSError as e:
# Just report and continue, since publish will try to reconnect
print("Error when connecting to the MQTT broker")
else:
print("Connected to {}".format(config.get("mqtt_broker")))
# Iterate and publish
while True:
ds.convert_temp()
time.sleep_ms(750)
led.low()
for rom in roms:
ds_id = ubinascii.hexlify(rom).format()
ds_topic = "w1_" + ds_id
client.publish("{}/{}/temperature".format(config.get("mqtt_topic"),
config.get(ds_topic, ds_id)),
bytes(str(ds.read_temp(rom)),
'utf-8'))
led.high()
time.sleep(5)
if __name__ == '__main__':
main()