-
Notifications
You must be signed in to change notification settings - Fork 0
/
chromectrl.py
106 lines (90 loc) · 3.66 KB
/
chromectrl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import itertools
import json
import threading
import urllib.request as urllib
try:
import websocket
except ImportError:
websocket = None
class ChromeDebuggerControl(object):
# Control Chrome using the debugging socket.
# Chrome must be launched using the --remote-debugging-port=<port> option for this to work!
def __init__(self, port):
if websocket is None:
raise NotImplementedError("websocket-client library not available; cannot control Chrome.\n"
"Please install it (pip install websocket-client) then try again.")
# Obtain the list of pages
pages = json.loads(urllib.urlopen('http://localhost:%d/json/list' % port).read())
if len(pages) == 0:
raise Exception("No pages to attach to!")
elif len(pages) == 1:
page = pages[0]
else:
print("Select a page to attach to:")
for i, page in enumerate(pages):
print("%d) %s" % (i + 1, page['title'].encode('unicode_escape')))
while 1:
try:
pageidx = int(input("Selection? "))
page = pages[pageidx - 1]
break
# except Exception, e:
except Exception as e:
print("Invalid selection:", e)
# Configure debugging websocket
wsurl = page['webSocketDebuggerUrl']
self.ws = websocket.create_connection(wsurl)
self.requests = {} # dictionary containing in-flight requests
self.results = {}
self.req_counter = itertools.count(1)
self.thread = threading.Thread(target=self._receive_thread)
self.thread.daemon = True
self.thread.start()
self._send_cmd_noresult('Runtime.enable')
def _receive_thread(self):
#Continually read events and command results
while 1:
try:
message = json.loads(self.ws.recv())
if 'id' in message:
id = message['id']
event = self.requests.pop(id, None)
if event is not None:
self.results[id] = message
event.set()
except Exception as e:
break
def _send_cmd_noresult(self, method, **params):
#Send a command and ignore the result.
id = next(self.req_counter)
out = {'id': id, 'method': method}
if params:
out['params'] = params
self.ws.send(json.dumps(out))
def _send_cmd(self, method, **params):
#Send a command and wait for the result to be available.
id = next(self.req_counter)
out = {'id': id, 'method': method}
if params:
out['params'] = params
# Receive thread will signal us when the response is available
event = threading.Event()
self.requests[id] = event
self.ws.send(json.dumps(out))
event.wait()
resp = self.results.pop(id)
if 'error' in resp:
raise Exception("Command %s(%s) failed: %s (%d)" % (
method, ', '.join('%s=%r' % (k, v) for k, v in params.iteritems()), resp['error']['message'],
resp['error']['code']))
return resp['result']
def execute(self, cmd):
resp = self._send_cmd('Runtime.evaluate', expression=cmd)
# if resp['wasThrown']:
# raise Exception("JS evaluation threw an error: %s" % resp['result']['description'])
result = resp['result']
if 'value' in result:
return result['value']
if 'description' in result:
return result['description']
return None