-
Notifications
You must be signed in to change notification settings - Fork 50
/
mitm.py
245 lines (216 loc) · 8.93 KB
/
mitm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
""" Mitm proxy server for intercepting data to/from the game server"""
import threading
import asyncio
import queue
import json
from dataclasses import dataclass
from urllib.parse import urlparse, parse_qs
from mitmproxy.http import HTTPFlow
from mitmproxy import options
from mitmproxy.tools.dump import DumpMaster
import common.utils as utils
from common.utils import Folder
from common.log_helper import LOGGER
class WsType:
""" websocket msg type"""
START = 1
END = 2
MESSAGE = 3
@dataclass
class WSMessage:
""" Websocket message"""
flow_id:str
timestamp:float = None
content:bytes = None
type:int = WsType.MESSAGE
class WSDataInterceptor:
""" mitm websocket addon that intercepts data"""
def __init__(self, allowed_domains:list=None):
""" pass flow_message_dict for storing intercepted flow data
params:
allowed_domains: list of allowed domains to intercept. websocket connection for other websites will be killed"""
if allowed_domains:
self.allowed_domains = allowed_domains
else:
self.allowed_domains = None
self.message_queue = queue.Queue()
"""Queue for unretrieved messages
each element is: WSMessage"""
def allow_url(self, url:str) -> bool:
""" return true if url is allowed"""
if not self.allowed_domains:
# no filtering if None/empty
return True
if any(d in url for d in self.allowed_domains):
# allowed
return True
return False
def websocket_start(self, flow:HTTPFlow):
""" ws start handler"""
if self.allow_url(flow.request.pretty_url):
self.message_queue.put(WSMessage(flow.id, flow.timestamp_start, None, WsType.START))
else:
flow.kill()
LOGGER.info("Killing flow since it is not in allowed domains: %s", flow.request.pretty_url)
def websocket_message(self, flow:HTTPFlow):
""" ws message handler"""
msg = flow.websocket.messages[-1]
if self.allow_url(flow.request.pretty_url):
self.message_queue.put(WSMessage(flow.id, msg.timestamp, msg.content))
def websocket_end(self, flow:HTTPFlow):
""" ws flow end handler"""
if self.allow_url(flow.request.pretty_url):
self.message_queue.put(WSMessage(flow.id, flow.timestamp_start, None, WsType.END))
def replace_next_msg(self):
pass
def request(self, flow: HTTPFlow):
""" handler for request"""
parsed_url = urlparse(flow.request.url)
if parsed_url.hostname == "majsoul-hk-client.cn-hongkong.log.aliyuncs.com":
qs = parse_qs(parsed_url.query)
try:
content = json.loads(qs["content"][0])
if content["type"] == "re_err":
LOGGER.warning("Majsoul Aliyun Error (killed): %s", str(qs))
flow.kill()
else:
# LOGGER.debug("Majsoul Aliyun Log detected, len = %d", len(str(qs)))
LOGGER.debug("Majsoul Aliyun Log: %s", qs)
except:
return
SOCKS5 = "socks5"
HTTP = "http"
class MitmController:
""" Controlling mitm proxy server interactions and managing threads
mitm proxy server intercepts data to/from the game server"""
def __init__(self, allowed_domains:list=None) -> None:
"""
params:
proxy_port(int): proxy server port to open
allowed_domains(list): Intercept data only from allowed domains. Other websocket traffic will be blocked.
filtering turned off if None/empty"""
self.mitm_config_folder = utils.sub_folder(Folder.MITM_CONF)
self.cert_file = utils.sub_file(Folder.MITM_CONF, 'mitmproxy-ca-cert.cer')
self.mitm_thread = None
self.dump_master = None
self.proxy_port = None
self.mode = None
self.upstream_proxy = None
self.proxy_str:str = None
self.ws_data_addon = WSDataInterceptor(allowed_domains)
def start(self, port:int, mode=HTTP, upstream_proxy:str=None):
""" Start mitm server thread
params:
port(int): port to open
upstream_proxy(str): upstream proxy server to forward data to. Format: http://ip:port"""
self.proxy_port = port
self.upstream_proxy = upstream_proxy
self.mode = mode
# Start thread
self.mitm_thread = threading.Thread(
name="MitmThread",
target=self._run_mitm_task,
daemon=True
)
self.mitm_thread.start()
def _run_mitm_task(self):
"""Thread target: this runs the event loop for the async part."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run_mitm_async())
async def _run_mitm_async(self):
""" async run mitm proxy server"""
ip_port = f"127.0.0.1:{self.proxy_port}"
up_log_str = ""
if self.mode==HTTP:
if self.upstream_proxy:
opts = options.Options(
listen_port=self.proxy_port,
confdir=str(self.mitm_config_folder),
mode=[f"upstream:{self.upstream_proxy}"],
)
up_log_str = f" (upstream_proxy={self.upstream_proxy})"
else:
opts = options.Options(
listen_port=self.proxy_port,
confdir=str(self.mitm_config_folder),
)
self.proxy_str = f"{HTTP}://{ip_port}"
elif self.mode==SOCKS5:
opts = options.Options(
listen_port=self.proxy_port,
confdir=str(self.mitm_config_folder),
mode=[SOCKS5],
)
self.proxy_str = f"{SOCKS5}://{ip_port}"
else:
raise ValueError(f"Unsupported mitm mode: {self.mode}")
self.dump_master = DumpMaster(
opts,
with_termlog=False,
with_dumper=False,
)
try:
LOGGER.info("Starting mitm server%s, proxy=%s", up_log_str, self.proxy_str)
self.dump_master.addons.add(self.ws_data_addon)
await self.dump_master.run()
except Exception as e:
LOGGER.error("Exception in starting MITM server: %s", e, exc_info=True)
except BaseException as e:
LOGGER.error("Exception in starting MITM server: %s", e, exc_info=True)
finally:
self.proxy_str = None
LOGGER.debug("mitm thread exiting")
def stop(self):
""" shutdown mitm proxy server and join thread"""
if self.mitm_thread and self.mitm_thread.is_alive():
self.dump_master.shutdown()
del self.dump_master
self.dump_master = None
self.mitm_thread.join()
def is_running(self) -> bool:
""" return True if mitm proxy server is running"""
if self.mitm_thread and self.mitm_thread.is_alive():
return True
else:
return False
def get_message(self, block:bool=False, timeout:float=None) -> WSMessage:
""" pop ws message from the queue. raise queue.Empty if timeout or non-blocked"""
msg = self.ws_data_addon.message_queue.get(block, timeout)
return msg
def install_mitm_cert(self, timeout:float=5):
"""Check MITM cert, and install if needed
Return:
bool: True if cert installed already or successfully. False if failed or timeout"""
if not utils.wait_for_file(self.cert_file, timeout):
LOGGER.error("MITM certificate not found: %s", self.cert_file)
return False
res, text = utils.is_certificate_installed(self.cert_file)
if res:
LOGGER.info("MITM certificate already installed: %s", self.cert_file)
return True
else:
LOGGER.info("MITM cert not installed:\n%s", text)
LOGGER.info("Installing MITM certificate: %s", self.cert_file)
install_success, msg = utils.install_root_cert(self.cert_file)
if install_success:
LOGGER.info("Installed MITM certificate successfully. Output:\n%s",msg)
return True
else:
LOGGER.error("Failed to install MITM certificate. Please install manually. Output:\n%s",msg)
return False
if __name__ == '__main__':
# Test code
utils.initialize_logging('MITM_TEST')
mitm = MitmController(8999)
LOGGER.info("Starting MITM")
mitm.start()
LOGGER.info("Installing certificate")
res = mitm.install_mitm_cert()
LOGGER.info("MITM on:%s", mitm.is_running())
LOGGER.info("Shutting down mitm")
mitm.stop()
LOGGER.info("MITM on:%s", mitm.is_running())
import time
time.sleep(1)
LOGGER.info("MITM on:%s", mitm.is_running())