-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathjvccommands.py
381 lines (312 loc) · 11.6 KB
/
jvccommands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
"""Commands for the jvc_projector library."""
import socket
import logging
from dataclasses import dataclass, field
from typing import Dict, Tuple
_LOGGER = logging.getLogger(__name__)
# Headers
OPR = b"!\x89\x01" # operation (set)
REF = b"?\x89\x01" # reference (get)
RES = b"@\x89\x01" # response
ACKH = b"\x06\x89\x01" # projector ack
COM_ACK_LENGTH = 6 # length of ACKs sent by the projector
# use a dataclass so that we have a nice repr
@dataclass(init=False)
class Command:
"""Base class for defining a set of read/write commands.
Args:
cmd (bytes): the base command bytes, for examplee b"PW" for power, b"PMPM" for picture mode.
*args (Dict[str, bytes]): dictionaries of str:bytes pairs defining the read and write values.
If two dicts are provided as args, then the first is used for the write commands and
the second is used for read values. If only a single dict is provided,
we use it for both the write commands and the read values.
write_only(bool, optional): If true, this command group does not read from the projector
Defaults to False.
read_only(bool, optional): If True, this command group cannot write to the projector
Defaults to False.
verify_write (bool, optional): Whether we should wait for an ACK once we send a write command.
Defaults to True.
Examples:
See jvccommands.Commands, jvcprojector.JVCProjector
"""
cmd: bytes
name: str
readwritevals: Tuple[Dict[str, bytes]] = field(repr=False)
write_only: bool
read_only: bool
verify_write: bool
write_vals: Dict[str, bytes]
read_vals: Dict[str, bytes]
def __init__(
self,
cmd: bytes,
*readwritevals: Dict[str, bytes],
write_only: bool = False,
read_only: bool = False,
verify_write: bool = True,
):
self.cmd = cmd
self.verify_write = verify_write
self.ack = ACKH + self.cmd[0:2] + b"\n"
try:
assert len(readwritevals) <= 2
except AssertionError:
raise AssertionError(
"(set_vals, get_vals) AND setget_vals cannot be defined at the same time."
)
self.write_only = write_only
self.read_only = read_only
if len(readwritevals) == 1:
if write_only:
self.write_vals = readwritevals[0]
self.read_vals = {}
elif read_only:
self.write_vals = {}
self.read_vals = readwritevals[0]
else:
self.write_vals = readwritevals[0]
self.read_vals = readwritevals[0]
elif len(readwritevals) == 2:
self.write_vals = readwritevals[0]
self.read_vals = readwritevals[1]
else:
self.write_vals = {}
self.read_vals = {}
self.write_valsinv = {
self.write_vals[key]: key for key in self.write_vals.keys()
}
self.read_valsinv = {self.read_vals[key]: key for key in self.read_vals.keys()}
# TODO: this only works if the commands are defined within a class.
# A globally defined Command won't have the`name` property.
def __set_name__(self, owner, name: str):
self.name = name
def __send(self, sock: socket.socket, command: bytes) -> None:
try:
sock.sendall(command)
except OSError as e:
sock.close()
raise JVCCommunicationError(
f"Socket exception of `{self.name}` command when sending bytes: `{command}`."
) from e
def __verify_ack(self, sock: socket.socket, command: bytes) -> None:
try:
ACK = sock.recv(COM_ACK_LENGTH)
# check if the ACK is valid (compare to user provided ack if available)
if not ACK.startswith(ACKH):
sock.close()
raise JVCCommunicationError(
f"Malformed ACK response from the projector when sending command: `{self.name}` with bytes: `{command}`. "
f"Received ACK: `{ACK}` does not have the correct header"
)
elif not ACK == self.ack:
sock.close()
raise JVCCommunicationError(
f"Malformed ACK response from the projector when sending command: `{self.name}` with bytes: `{command}`. "
f"Expected `{self.ack}`, received `{ACK}`"
)
except socket.timeout as e:
sock.close()
raise JVCCommunicationError(
f"Timeout when waiting for the specified ACK: `{self.ack}` for command: `{self.name}` with bytes: `{command}`"
) from e
except OSError as e:
sock.close()
raise JVCCommunicationError(
f"Socket exception when waiting for the specified ACK: `{self.ack}` for command: `{self.name}` with bytes: `{command}`"
) from e
def write(self, sock: socket.socket, value: str = "") -> None:
if self.read_only:
sock.close()
raise JVCCommandNotFoundError(
f"The command group `{self.name}` does not implement any writable properties"
)
try:
command = OPR + self.cmd + self.write_vals[value] + b"\n"
except KeyError as e:
if not value and not self.write_vals:
command = OPR + self.cmd + b"\n"
elif not value and self.write_only:
sock.close()
raise JVCCommandNotFoundError(
f"Write only command group: `{self.name}` has to be called with a corresponding key to complete the write operation. "
f"Must be one of: {list(self.write_vals.keys())}."
) from e
else:
sock.close()
raise JVCCommandNotFoundError(
f"The command: `{self.name}` does not contain operation: `{value}`. "
f"Must be one of: {list(self.write_vals.keys())}."
) from e
if not self.verify_write:
_LOGGER.debug(
f"ACK verification disabled for the command: `{jfrmt.highlight(self.name)} `. Error handling will be less robust"
)
self.__send(sock, command)
# no need to wait for ACK or message as this is not a reference command without ACK specified
if not self.verify_write:
sock.close()
return
self.__verify_ack(sock, command)
sock.close()
def read(self, sock: socket.socket) -> str:
if self.write_only:
sock.close()
raise JVCCommandNotFoundError(
f"The command group: `{self.name}` does not implement any readable properties"
)
command = REF + self.cmd + b"\n"
self.__send(sock, command)
self.__verify_ack(sock, command)
try:
resp = sock.recv(1024)
except socket.timeout as e:
sock.close()
raise JVCCommunicationError(
f"Timeout when waiting for response for read command: `{self.name}`"
) from e
except OSError as e:
sock.close()
raise JVCCommunicationError(
f"Socket exception when waiting for response for read command: `{self.name}`"
) from e
sock.close()
try:
assert resp.startswith(RES + self.cmd[0:2])
except AssertionError as e:
raise JVCCommunicationError(
f"Malformed response header for read command: `{self.name}`"
) from e
resp = resp[len(RES) + 2 : -1]
# read_vals not defined but we still received a response
resp_ascii = resp.decode("ascii")
if not self.read_vals:
return resp_ascii
# decode the response if it's known, otherwise just return the
# raw ascii code and log a warning.
try:
return self.read_valsinv[resp]
except KeyError as e:
_LOGGER.warning(
f"Could not decode response: `{resp_ascii}` for command: `{jfrmt.highlight(self.name)}`. "
f"It is not in the list of known responses, returning the raw ascii value instead."
)
return resp_ascii
class Commands:
"""A container for Commands"""
# power commands
power = Command(
b"PW",
{"on": b"1", "off": b"0"},
{
"standby": b"0",
"lamp_on": b"1",
"cooling": b"2",
"reserved": b"3",
"emergency": b"4",
},
)
# lens memory commands
memory = Command(
b"INML",
{"1": b"0", "2": b"1", "3": b"2", "4": b"3", "5": b"4", "6": b"5", "7": b"6", "8": b"7", "9": b"8", "10": b"9"},
)
# input commands, input is technically a keyword, but should be okay...
input = Command(b"IP", {"hdmi1": b"6", "hdmi2": b"7"})
# picture mode commands
picture_mode = Command(
b"PMPM",
{
"film": b"00",
"cinema": b"01",
"natural": b"03",
"hdr10": b"04",
"thx": b"06", # not present in NZ series
"frame_adapt_hdr": b"0B", # new in NZ series
"user1": b"0C",
"user2": b"0D",
"user3": b"0E",
"user4": b"0F",
"user5": b"10",
"user6": b"11",
"hlg": b"14",
"hdr10p": b"15", # new in NZ series
"pana_pq": b"16", # new in NZ series
},
)
# low latency enable/disable
low_latency = Command(b"PMLL", {"on": b"1", "off": b"0"})
# mask commands
mask = Command(
b"ISMA",
{"off": b"2", "custom1": b"0", "custom2": b"1", "custom3": b"3"},
)
# lamp commands
lamp = Command(
b"PMLP",
{"high": b"1", "low": b"0", "mid": b"2"},
)
# menu controls
menu = Command(
b"RC73",
{
"menu": b"2E",
"down": b"02",
"left": b"36",
"right": b"34",
"up": b"01",
"ok": b"2F",
"back": b"03",
},
write_only=True,
)
# Intelligent Lens Aperture commands
aperture = Command(
b"PMDI",
{"off": b"0", "auto1": b"1", "auto2": b"2"},
)
# Anamorphic commands
anamorphic = Command(
b"INVS",
{"off": b"0", "a": b"1", "b": b"2", "c": b"3", "d": b"4"},
)
# active signal
signal = Command(
b"SC",
{"no_signal": b"0", "active_signal": b"1"},
read_only=True,
)
# MAC address, model, null command
macaddr = Command(
b"LSMA",
read_only=True,
)
modelinfo = Command(
b"MD",
read_only=True,
)
nullcmd = Command(
b"\x00\x00",
write_only=True,
)
class JVCConfigError(Exception):
"""Exception when the user supplied config is wrong"""
pass
class JVCCannotConnectError(Exception):
"""Exception when we can't connect to the projector"""
pass
class JVCHandshakeError(Exception):
"""Exception when there was a problem with the 3 step handshake"""
pass
class JVCCommunicationError(Exception):
"""Exception when there was a communication issue"""
pass
class JVCCommandNotFoundError(Exception):
"""Exception when the requested command doesn't exist"""
pass
class JVCPoweredOffError(Exception):
"""Exception when projector is powered off and can't accept some commands."""
pass
class jfrmt:
@staticmethod
def highlight(value: str) -> str:
return "{:s}".format("\u035F".join(value))