-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
255 lines (229 loc) · 9.24 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
from ping3 import ping
import socket
import concurrent.futures
import requests
import xml.etree.ElementTree as ET
from messenger import *
import codecs
import json
import ssl
import ipaddress
import subprocess
import sys
import re
def ip_to_network(ip):
parts = ip.split('.')
parts[-1] = '0'
network = '.'.join(parts)
return network
def get_local_ip_address():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
IP = s.getsockname()[0]
finally:
s.close()
return IP
def get_network_range():
ip_address = get_local_ip_address()
network = ip_to_network(ip_address)
broadcast = network[:-1] + '255' # TODO, now it's class C, change that
return network, broadcast
NETWORKING, BROADCAST = get_network_range()
SENDER_NAME = "sender-0"
RECEIVER_NAME = "receiver-0"
BIG_BUCK_BUNNY_VIDEO = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"
MIMETYPE_VIDEO = "video/mp4"
MIMETYPE_VIDEO_URL = "application/x-mpegURL"
MIMETYPE_AAC = "audio/aac"
MIMETYPE_MP3 = "audio/mpeg"
TVE1_STREAM = "https://ztnr.rtve.es/ztnr/1688877.m3u8"
TELEMADRID_STREAM = "https://new-international-23-secure2.akamaized.net/index.m3u8"
def get_content_type_from_url(url):
if url.endswith('.aac'):
return MIMETYPE_AAC
elif url.endswith('.mp3'):
return MIMETYPE_MP3
elif url.endswith('.m3u8'):
return MIMETYPE_VIDEO_URL
elif url.endswith('.mp4'):
return MIMETYPE_VIDEO
else:
return "application/octet-stream" # Default type if unknown
def is_active(ip):
try:
delay = ping(ip, timeout=1)
if delay is not None:
return ip, True
else:
return ip, False
except:
return ip, False
def scan(ip, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
try:
s.connect((ip, port))
return ip, port, True
except:
return ip, port, False
def detect_chromecasts(ip):
headers = {
'User-Agent': 'curl/7.54.0',
}
# TODO change that to not use a loop, it's just one port
for port in [8008, 8009]: #8009
try:
response = requests.get(f'http://{ip}:{port}/ssdp/device-desc.xml', headers=headers)
res = response.text
if 'Chromecast' in res:
print("Using SSDP")
chromecast_info = parse_chromecast_info(res)
return chromecast_info
else:
print("Using eureka Info")
response = requests.get(f'http://{ip}:{port}/setup/eureka_info', headers=headers)
res = response.text
if '"name":"' in res:
chromecast_info = json.loads(res)
return {
'ip': chromecast_info['ip_address'],
'friendlyName': chromecast_info['name']
}
except requests.exceptions.RequestException:
pass
return None
def parse_chromecast_info(response_text):
root = ET.fromstring(response_text)
namespace = {'ns': 'urn:schemas-upnp-org:device-1-0'}
friendly_name = root.find('.//ns:friendlyName', namespace).text
model_name = root.find('.//ns:modelName', namespace).text
udn = root.find('.//ns:UDN', namespace).text
ip = root.find('.//ns:URLBase', namespace).text.split('//')[1].split(':')[0]
return {
'ip': ip,
'friendlyName': friendly_name,
'modelName': model_name,
'UDN': udn,
}
def search_device(req = None):
chromecasts = []
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
if req != None and "." in req:
ip_futures = {executor.submit(is_active, req): req}
else:
ip_futures = {executor.submit(is_active, socket.inet_ntoa((ip_int).to_bytes(4, 'big'))): ip_int for ip_int in range(int.from_bytes(socket.inet_aton(NETWORKING), 'big'), int.from_bytes(socket.inet_aton(BROADCAST), 'big'))}
for future in concurrent.futures.as_completed(ip_futures):
ip, active = future.result()
if active:
print(f"Active IP: {ip}")
port_futures = {executor.submit(scan, ip, port): (ip, port) for port in [8008, 8009]}
for future in concurrent.futures.as_completed(port_futures):
ip, port, open = future.result()
if open:
print(f"Open port: {ip}:{port}")
detected_chromecast = detect_chromecasts(ip)
if req != None and "." not in req and detected_chromecast!=None and detected_chromecast["friendlyName"] == req:
chromecasts.append(detected_chromecast)
elif req == None or "." in req:
chromecasts.append(detected_chromecast)
return chromecasts
'''
This method is used to study chromecast protocol
'''
def go_chromecast(chromecast, url=TVE1_STREAM):
app_id = APP_MEDIA_RECEIVER
media_url = url
content_type = get_content_type_from_url(media_url)
source_id = SENDER_NAME
destination_id = RECEIVER_NAME
requestId = 1
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as raw_s:
raw_s.connect((chromecast['ip'], 8009))
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
s = context.wrap_socket(raw_s, server_hostname=chromecast['ip'])
# CONNECT message
s.sendall(format_connect_message(source_id, destination_id))
print(f"CONNECT sent to ip {chromecast['ip']}")
s.sendall(format_get_status_message(source_id, destination_id, requestId))
requestId += 1
response = s.recv(4096)
response_data = parse_cast_response(response)
print("Response after GET_STATUS:", response_data)
session_id = None
transport_id = None
while(not session_id):
print("sending LAUNCH")
s.sendall(format_launch_message(source_id, destination_id, app_id, requestId))
requestId += 1
response = s.recv(4096)
response_data = parse_cast_response(response)
print("Response after LAUNCH:", response_data)
if not response_data or "status" not in response_data or "applications" not in response_data["status"]:
print("Invalid response data, try again")
else:
session_id = response_data["status"]["applications"][0]["sessionId"]
print("Session ID:", session_id)
transport_id = response_data["status"]["applications"][0]["transportId"]
print("Transport ID:", transport_id)
print("continue...")
destination_id = transport_id
# CONNECT message other time
s.sendall(format_connect_message(source_id, destination_id))
print(f"re-CONNECT sent to ip {chromecast['ip']}")
response = s.recv(4096)
response_data = parse_cast_response(response)
print("Response after re-CONNECT:", response_data)
s.sendall(format_load_message(source_id, destination_id, session_id, media_url, content_type, autoplay=True, requestId=requestId, namespace='urn:x-cast:com.google.cast.media'))
requestId += 1
response = s.recv(4096)
response_data = parse_cast_response(response)
print("Response after LOAD:", response_data)
s.sendall(format_get_status_message(source_id, destination_id, requestId))
requestId += 1
response = s.recv(4096)
response_data = parse_cast_response(response)
print("Response after GET_STATUS:", response_data)
media_session_id = response_data["status"][0]["mediaSessionId"]
s.sendall(format_play_message(source_id, destination_id, media_session_id, requestId=requestId))
requestId += 1
response = s.recv(4096)
response_data = parse_cast_response(response)
print("Response after PLAY:", response_data)
# Keep the connection open to maintain the streaming
while True:
response = s.recv(4096)
if not response:
break
response_data = parse_cast_response(response)
print("Streaming response:", response_data)
if __name__ == '__main__':
# get parameters from terminal
if len(sys.argv) == 1:
# no parameters provided
ip = None
url = TVE1_STREAM
elif len(sys.argv) == 2:
# one parameter - check if IP or URL
# Check if param starts with http, if not treat as IP
if sys.argv[1].startswith('http'):
ip = None
url = sys.argv[1]
else:
ip = sys.argv[1]
url = TVE1_STREAM
else:
# both parameters provided
if "http" in sys.argv[2]:
ip = sys.argv[1]
url = sys.argv[2]
else:
ip = sys.argv[2]
url = sys.argv[1]
chromecasts = search_device(ip)
for chromecast in chromecasts:
status = go_chromecast(chromecast, url)
print(f"Status for {chromecast['friendlyName']} in ip {chromecast['ip']}: {status}")
break