-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathafproto.py
70 lines (57 loc) · 1.69 KB
/
afproto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from crc8 import Crc8
import struct
start_byte = 0xA3
escape_byte = 0x85
end_byte = 0x59
crcer = Crc8()
def serialize_payload(payload):
out = struct.pack('BBB', start_byte, len(payload), crcer.digest(payload))
for ch in payload:
if ch == start_byte or ch == end_byte or ch == escape_byte:
out += escape_byte
out += ch
out += chr(end_byte)
return out
def extract_payload(data):
# Find start_ndx
start_ndx = 0
while start_ndx >= 0:
start_ndx = data.find(chr(start_byte), start_ndx)
if start_ndx >= 0:
if start_ndx > 0 and data[start_ndx-1] != chr(escape_byte):
break
elif start_ndx == 0:
break
elif start_ndx == -1:
break
start_ndx += 1
# If no start found trash data
if start_ndx == -1:
return None, ''
# Not a full message
if len(data) < start_ndx + 4:
return None, data[start_ndx:]
length, crc = struct.unpack('BB', data[start_ndx+1:start_ndx+3])
# Check end byte
end_ndx = start_ndx + length + 3
try:
if data[end_ndx] != chr(end_byte) or data[end_ndx-1] == chr(escape_byte):
print 'No end byte'
return None, data[start_ndx+1:]
except IndexError:
return None, data[start_ndx:]
payload = data[start_ndx+3:end_ndx]
if crc == crcer.digest(payload):
# unescape
payload = payload.replace(chr(escape_byte) + chr(start_byte), chr(start_byte))
payload = payload.replace(chr(escape_byte) + chr(end_byte), chr(end_byte))
payload = payload.replace(chr(escape_byte) + chr(escape_byte), chr(escape_byte))
return payload, data[end_ndx+1:]
print 'No payload'
return None, data[end_ndx+1:]
if __name__=='__main__':
print extract_payload("\xa3\x05\xeb\x48\x65\x6c\x6c\x6f\x59")
out = serialize_payload('Hello')
for ch in out:
print '%x' % ord(ch),
print