-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoap_client_get_post.py
71 lines (56 loc) · 2.04 KB
/
coap_client_get_post.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import logging
import asyncio
import time
import numpy as np
import sys
from aiocoap import *
logging.basicConfig(level=logging.INFO)
addr='[fd00::212:4b00:1204:cb03]' #'134.102.218.18' #coap.me #'localhost'
port='5683'
geter='sen/opt/light'
poster='lt/g'
clock=1 #min 5 sec
async def get(uri):
protocol = await Context.create_client_context()
request = Message(code=GET, uri=uri)
response = await protocol.request(request).response
return response.payload
async def post(uri, payload):
protocol = await Context.create_client_context()
print(uri, payload)
request = Message(code=POST, payload=payload, uri=uri)
response = await protocol.request(request).response
#print(response)
end = False
green_limit = 200
red_limit = 100
green_on = False
red_on = False
k = 0
while not end:
try:
k+=1
uri_get = 'coap://'+addr+':'+port+'/'+geter
uri_post= 'coap://'+addr+':'+port+'/'+poster
raw_res = asyncio.get_event_loop().run_until_complete(get(uri_get)).decode("utf-8")
#print(raw_res)
res = float(raw_res)
#if k%20==0: print(res)
if res > green_limit and not green_on :
asyncio.get_event_loop().run_until_complete(post('coap://'+addr+':'+port+'/lt/g', b"on"))
green_on = True
if res <= green_limit and green_on:
asyncio.get_event_loop().run_until_complete(post('coap://'+addr+':'+port+'/lt/g', b"off"))
green_on = False
if res > red_limit and not red_on :
asyncio.get_event_loop().run_until_complete(post('coap://'+addr+':'+port+'/lt/r', b"on"))
red_on = True
if res <= red_limit and red_on:
asyncio.get_event_loop().run_until_complete(post('coap://'+addr+':'+port+'/lt/r', b"off"))
red_on = False
except KeyboardInterrupt:
end = True
if green_on:
asyncio.get_event_loop().run_until_complete(post('coap://'+addr+':'+port+'/lt/g', b"off"))
if red_on:
asyncio.get_event_loop().run_until_complete(post('coap://'+addr+':'+port+'/lt/r', b"off"))