-
Notifications
You must be signed in to change notification settings - Fork 24
/
detectsatip.py
executable file
·98 lines (88 loc) · 2.8 KB
/
detectsatip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
""" Simple tool to detect SAT>IP devices as JSON.
"""
import json
import socket
import sys
import xml.etree.ElementTree as ET
import requests
SSDP_BIND = "0.0.0.0"
SSDP_ADDR = "239.255.255.250"
SSDP_PORT = 1900
SSDP_MX = 1
SSDP_ST = "urn:ses-com:device:SatIPServer:1"
SSDP_REQUEST = "\r\n".join(
[
"M-SEARCH * HTTP/1.1",
f"HOST: {SSDP_ADDR}:{SSDP_PORT}",
'MAN: "ssdp:discover"',
f"MX: {SSDP_MX}",
f"ST: {SSDP_ST}",
"USER-AGENT: vdr-detectsatip",
"\r\n",
]
)
def parse_satip_xml(data):
"""Parse SAT>IP XML data.
Args:
data (str): XML input data..
Returns:
dict: Parsed SAT>IP device name and frontend information.
"""
result = {"name": "", "frontends": {}}
if data:
root = ET.fromstring(data)
name = root.find(".//*/{urn:schemas-upnp-org:device-1-0}friendlyName")
result["name"] = name.text
satipcap = root.find(".//*/{urn:ses-com:satip}X_SATIPCAP")
if satipcap is None:
# fallback for non-standard Panasonic
satipcap = root.find(".//*/{urn-ses-com:satip}X_SATIPCAP")
caps = {}
for system in satipcap.text.split(","):
cap = system.split("-")
if cap:
count = int(cap[1])
if cap[0] in caps:
count = count + caps[cap[0]]
caps[cap[0]] = count
result["frontends"] = caps
return result
def detect_satip_devices():
"""Detect available SAT>IP devices by sending a broadcast message.
Returns:
list: Found SAT>IP devices.
"""
urls = []
devices = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except BaseException:
pass
sock.settimeout(1)
sock.bind((SSDP_BIND, SSDP_PORT))
sock.sendto(SSDP_REQUEST.encode("utf-8"), (SSDP_ADDR, SSDP_PORT))
try:
while 1:
data = sock.recv(1024).decode("utf-8")
if data:
for row in data.split("\r\n"):
if "LOCATION:" in row:
url = row.replace("LOCATION:", "").strip()
if url in urls:
continue
urls.append(url)
info = requests.get(url, timeout=2)
devices.append(parse_satip_xml(info.text))
else:
break
except BaseException:
pass
sock.close()
return devices
if __name__ == "__main__":
json.dump(detect_satip_devices(), fp=sys.stdout, sort_keys=True, indent=2)