-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathgt06Client.py
158 lines (116 loc) · 3.75 KB
/
gt06Client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import codecs
import socket
import logging, sys
import crc_itu
class GT06Client():
serverAddress = None
serverPort = None
timeOut = None
infoSerial = 0
socket = None
def __init__(self, serverAddress=None, serverPort=None, timeOut=60):
if serverAddress:
self.serverAddress = serverAddress
if serverPort:
self.serverPort = serverPort
self.timeOut = timeOut
def getServerPort(self):
return self.serverPort
def setServerPort(self, serverPort):
self.serverPort = serverPort
def getServerAddress(self):
return self.serverAddress
def setServerAddress(self, serverAddress):
self.serverAddress = serverAddress
def getTimeOut(self):
return self.timeOut
def setTimeOut(self, timeOut):
self.timeOut = timeOut
def connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.serverAddress, self.serverPort))
except OSError as e:
logging.exception(f"Cannot connect to {self.serverAddress}:{self.serverPort}")
sys.exit(255)
self.socket.settimeout(self.timeOut)
def disconnect(self):
self.socket.close()
def makeBasicMessage(self, protocolIn, messageIn):
self.infoSerial += 1
self.infoSerial %= 0x10000
returnBytes = None
packetLen = 0
try:
packetLen = int(len(bytes.fromhex(messageIn))+5)
except ValueError:
pass
crcCheckFormat = "{:02x}{:02x}{}{:04x}"
crcCheckString = crcCheckFormat.format(
packetLen,
int(protocolIn),
messageIn,
int(self.infoSerial)
)
messageFormat = "7878{}{:04x}0d0a"
errorCheck = 0
try:
errorCheck = crc_itu.crc16(bytes.fromhex(crcCheckString))
except ValueError:
pass
messageOut = messageFormat.format(
crcCheckString,
errorCheck
)
returnBytes = bytes.fromhex(messageOut)
return returnBytes
def makeBasicMessageResponse(self, protocolIn):
returnBytes = None
crcCheckFormat = "{:02x}{:02x}{:04x}"
crcCheckString = crcCheckFormat.format(
5,
int(protocolIn),
int(self.infoSerial)
)
messageFormat = "7878{}{:04x}0d0a"
errorCheck = 0
try:
errorCheck = crc_itu.crc16(bytes.fromhex(crcCheckString))
except ValueError:
pass
messageOut = messageFormat.format(
crcCheckString,
errorCheck
)
returnBytes = bytes.fromhex(messageOut)
return returnBytes
def sendLoginMessage(self, imeiIn):
serverResp = None
try:
int(imeiIn)
except ValueError:
raise ValueError("IMEI string contains non-numeric digits.")
self.socket.sendall(self.makeBasicMessage(1, imeiIn))
try:
serverResp = self.socket.recv(1024)
except socket.timeout:
pass
if self.makeBasicMessageResponse(1) != serverResp:
return False
else:
return True
def sendGPSMessage(self, gpsHash):
serverResp = None
try:
bytes.fromhex(gpsHash)
except ValueError:
raise ValueError("GPS hash string contains non-hexadecimal digits.")
self.socket.sendall(self.makeBasicMessage(16, gpsHash))
try:
serverResp = self.socket.recv(1024)
except socket.timeout:
pass
if self.makeBasicMessageResponse(16) != serverResp:
return False
else:
return True