-
Notifications
You must be signed in to change notification settings - Fork 0
/
analyze.py
715 lines (645 loc) · 34.2 KB
/
analyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
import argparse
import socket
import traceback
from enum import Enum
from typing import Tuple, Union
import h2.connection
class ReturnType(Enum):
"""
Return types for analysis functions.
"""
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
TIMEOUT = "TIMEOUT"
MAX_REDIRECT = "MAX REDIRECT"
REDIRECT = "REDIRECT"
HTTPS_REDIRECT = "HTTPS REDIRECT"
NOT_ANALYZED = "NOT ANALYZED"
BUFFER_SIZE = 4096
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0"
def main():
"""
Main method. Initializes parser and starts Analyzer.
"""
# prepare argument parser
parser = argparse.ArgumentParser(description='Analyzes servers for unencrypted HTTP support.',
usage='%(prog)s [options]', add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('hostname', type=str, help='The hostname of the server to analyze. (required)')
parser.add_argument('--path', type=str, default="/", help='The path to request from the server.')
parser.add_argument('--ip', type=str, default=None, help='The IP of the server to analyze. If not provided, the hostname is resolved. If present, prevents domain resolution after redirects.')
parser.add_argument('--port', type=int, default=80, help='The port of the server to analyze.')
parser.add_argument('--http09', type=bool, default=False, action=argparse.BooleanOptionalAction, help="By default, HTT/0.9 is not analyzed. Provide --http09 to analyze the server for HTT/0.9 support. Return Type of HTT/0.9 probe is inconclusive, so run with debug or external analysis tool like Wireshark to verify the actual server answer.")
parser.add_argument('--debug', type=bool, default=False, action=argparse.BooleanOptionalAction,
help='Whether to print debug output.')
parser.add_argument('--redirect_depth', type=int, default=2, help='The maximum depth of redirects to follow.')
parser.add_argument('--timeout', type=int, default=5, help='The timeout for socket operations.')
args = parser.parse_args()
# start analyzing
UnencryptedHTTPAnalyzer(args.hostname, args.path, args.ip, args.port, args.http09, args.debug, args.redirect_depth, args.timeout).analyze()
class UnencryptedHTTPAnalyzer:
"""
Analyzes a website for unencrypted HTTP support.
"""
def __init__(self, hostname: str, path: str, ip: str = None, port: int = 80, http09:bool=False, debug: bool = False, redirect_depth: int = 1, timeout: int = 5):
"""
Initializes Analyzer.
:param hostname: The hostname of the server to analyze. Used in the Host header of HTTP requests. (required)
:param path: The path to request from the server. (required)
:param ip: The IP of the server to analyze. If not provided, the hostname is resolved. (optional)
:param port: The port of the server to analyze. (default: 80)
:param http09: Whether to analyze HTTP/0.9 support. (default: False)
:param debug: Enables debug statements if True. (default: False)
:param redirect_depth: The maximum depth of HTTP redirects to follow. (default: 1)
:param timeout: The timeout for socket operations in seconds. (default: 5)
"""
self.hostname = hostname
self.path = path
self.ip = ip
self.port = port
self.http09 = http09
self.debug = debug
self.redirect_depth = redirect_depth
self.timeout = timeout
self.resolve_hostname = self.ip is None
def analyze(self) -> ReturnType:
"""
Analyzes the server for unencrypted HTTP support.
:return: ReturnType.SUCCESS if the server is reachable and all analyses were successful, ReturnType.TIMEOUT if
the server cannot be reached due to a socket timeout, ReturnType.FAILURE otherwise.
The following steps are performed:
1. Resolve hostname if necessary
2. Check TCP connectivity of server
3. Analyze HTTP/0.9 support if requested
4. Analyze HTTP/1.0 support
5. Analyze HTTP/1.1 support
6. Analyze HTTP/2.0 support with prior knowledge
7. Analyze HTTP/2.0 support with upgrade mechanism
8. Print results
"""
print(self.hostname + " analysis started.")
# resolve hostname if necessary
if self.resolve_hostname:
self._debug("No IP provided, attempting to resolve hostname")
if self._resolve_hostname() == ReturnType.FAILURE:
print(
"No IP provided and cannot resolve hostname for " + self.hostname + ". Provide reachable IP address or resolvable hostname.")
exit(255)
else:
print("Using given IP " + self.ip + " on " + self.hostname)
# reachability check
reachable = self.analyze_tcp_reachability()
if reachable == ReturnType.TIMEOUT:
print("Cannot open TCP connection to " + self.hostname + ":" + str(self.port) + " due to ReturnType.TIMEOUT (" + str(self.timeout) + "s). Is the server online?")
return ReturnType.TIMEOUT
elif reachable == ReturnType.FAILURE:
print("Cannot open TCP connection to " + self.hostname + str(self.port) + " due to non-ReturnType.TIMEOUT error. Is the server online?")
return ReturnType.FAILURE
elif reachable == ReturnType.SUCCESS:
print("Server online. Scanning!")
ret_09 = ReturnType.NOT_ANALYZED
if self.http09:
try:
self._debug("## Starting HTTP/0.9 analysis ##", 1)
ret_09 = self.analyze_http09()
except Exception as e:
self._debug("Error while analyzing HTTP/0.9: " + str(e))
traceback.print_exc()
ret_09 = ReturnType.FAILURE.value
try:
self._debug("## Starting HTTP/1.0 analysis ##", 1)
ret_10 = self.analyze_http10(self.redirect_depth)
except Exception as e:
self._debug("Error while analyzing HTTP/1.0: " + str(e))
traceback.print_exc()
ret_10 = ReturnType.FAILURE.value
try:
self._debug("## Starting HTTP/1.1 analysis ##", 1)
ret_11 = self.analyze_http11(self.redirect_depth)
except Exception as e:
self._debug("Error while analyzing HTTP/1.1: " + str(e))
traceback.print_exc()
ret_11 = ReturnType.FAILURE.value
try:
self._debug("## Starting HTTP/2.0 prior knowledge analysis ##", 1)
ret_20_prior = self.analyze_http2_prior_knowledge(self.redirect_depth)
except Exception as e:
self._debug("Error while analyzing HTTP/2 prior knowledge: " + str(e))
traceback.print_exc()
ret_20_prior = ReturnType.FAILURE.value
try:
self._debug("## Starting HTTP/2.0 upgrade analysis ##", 1)
ret_20_upgrade = self.analyze_http2_upgrade(self.redirect_depth)
except Exception as e:
self._debug("Error while analyzing HTTP/2 upgrade: " + str(e))
traceback.print_exc()
ret_20_upgrade = ReturnType.FAILURE.value
print("\n#####################\n")
if self.http09:
print("HTTP/0.9: " + ret_09)
print("HTTP/1.0: " + ret_10)
print("HTTP/1.1: " + ret_11)
print("HTTP/2 (Prior Knowledge): " + ret_20_prior)
print("HTTP/2 (Upgrade): " + ret_20_upgrade)
def analyze_http09(self) -> str:
"""
Analyzes the server for HTTP/0.9 support.
:return: SUCCESS if HTTP/0.9 is supported, FAILURE otherwise.
HTTP/0.9 is supported if the server responds with an HTML response.
"""
# open and connect socket
sock = self.open_socket()
if self.connect_socket(sock) != ReturnType.SUCCESS:
return ReturnType.FAILURE.value
# send 09 request to server
try:
sock.send(self.create_http_09_request())
except Exception as e:
self._debug("Could not send HTTP/0.9 request to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE.value
# receive response
response = self.receive_ascii_response(sock)
if isinstance(response, ReturnType):
return response.value
try:
if response.lower().startswith("<html>") or response.lower().startswith("<!doctype html"):
self._debug("HTTP/0.9 response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
return ReturnType.SUCCESS.value
else:
# TODO: find better way to check for HTTP/0.9 response, maybe check for 1.0 response too?
self._debug("Could not interpret response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") as HTTP/0.9 response.")
return ReturnType.FAILURE.value
except Exception as e:
self._debug("Could not interpret response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE.value
def analyze_http10(self, recursion: int) -> str:
"""
Analyzes the server for HTTP/1.0 support.
:param recursion: The maximum depth of redirects to follow.
:return: A string that describes the result of the analysis including redirects and failures.
HTTP/1.0 is supported if the server responds with a 200 status code.
"""
return self.analyze_http1x("HTTP/1.0", recursion)
def analyze_http11(self, recursion: int) -> str:
"""
Analyzes the server for HTTP/1.1 support.
:param recursion: The maximum depth of redirects to follow.
:return: A string that describes the result of the analysis including redirects and failures.
HTTP/1.1 is supported if the server responds with a 200 status code.
"""
return self.analyze_http1x("HTTP/1.1", recursion)
def analyze_http1x(self, version: str, recursion: int) -> str:
"""
Analyzes the server for HTTP/1.x support.
:param version: The HTTP version to analyze. Provide the complete string, e.g. "HTTP/1.0".
:param recursion: The maximum depth of redirects to follow.
:return: A string that describes the result of the analysis including redirects and failures.
HTTP/1.x is supported if the server responds with a 200 status code.
"""
if recursion == -1:
return ReturnType.MAX_REDIRECT.value
# open and connect socket
sock = self.open_socket()
if self.connect_socket(sock) != ReturnType.SUCCESS:
return ReturnType.FAILURE.value
# send 1x request to server
try:
sock.send(self.create_http1x_request(version))
except Exception as e:
self._debug("Could not send " + version + " request to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE.value
# receive response
response = self.receive_http1x_response(sock)
if isinstance(response, ReturnType):
return response.value
else:
status, headers, http_version = response
if status == 200:
self._debug("Received " + version + " response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
return ReturnType.SUCCESS.value
elif status == 301 or status == 302:
# redirect
# turn list of lists into dict
redirect = self.update_redirect(dict(headers))
if redirect != ReturnType.SUCCESS:
return redirect.value
return ReturnType.REDIRECT.value + "(" + self.hostname + self.path + ") -> " + self.analyze_http1x(version, recursion - 1)
else:
self._debug("Received status code " + str(status) + " leading to ReturnType.FAILURE.")
return ReturnType.FAILURE.value
def analyze_http2_prior_knowledge(self, recursion: int) -> str:
"""
Analyzes the server for HTTP/2 support with prior knowledge.
:param recursion: The maximum depth of redirects to follow.
:return: A string that describes the result of the analysis including redirects and failures.
HTTP/2 with prior knowledge is supported if the server immediately responds with a 200 status code in an HTTP/2
message.
"""
return self.analyze_http2_core(recursion, True)
def analyze_http2_upgrade(self, recursion: int) -> str:
"""
Analyzes the server for HTTP/2 support with upgrade mechanism.
:param recursion: The maximum depth of redirects to follow.
:return: A string that describes the result of the analysis including redirects and failures.
HTTP/2 with upgrade mechanism is supported if the server responds with a 101 status code in an HTTP/1.1 message
and a 200 status code in an HTTP/2 message.
"""
return self.analyze_http2_core(recursion, False)
def analyze_http2_core(self, recursion: int, prior_knowledge: bool) -> str:
"""
Analyzes the server for HTTP/2 support using either prior knowledge or the upgrade mechanism.
:param recursion: The maximum depth of redirects to follow.
:param prior_knowledge: Whether to use prior knowledge or the upgrade mechanism. True for prior knowledge,
False for upgrade.
:return: A string that describes the result of the analysis including redirects and failures.
This function provides the core HTTP/2 functionality. It performs the upgrade mechanism or prior knowledge and
interprets all HTTP/2 responses in a shared loop.
"""
if recursion == -1:
return ReturnType.MAX_REDIRECT.value
# open and connect socket
sock = self.open_socket()
if self.connect_socket(sock) != ReturnType.SUCCESS:
return ReturnType.FAILURE.value
# initialize http/2 connection
h2_connection = h2.connection.H2Connection()
# for prior knowledge send HTTP/2 initialization packets and then the HTTP/2 request.
if prior_knowledge:
# send initialization packets
h2_connection.initiate_connection()
try:
sock.send(h2_connection.data_to_send())
except Exception as e:
self._debug("Could not initialize HTTP/2 connection to " + self.ip + ":" + str(
self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE.value
# send http/2 request to server
headers = [
(':method', 'GET'),
(':path', self.path),
(':authority', self.hostname),
(':scheme', 'http'),
('user-agent', USER_AGENT)
]
h2_connection.send_headers(1, headers, end_stream=True)
try:
sock.send(h2_connection.data_to_send())
except Exception as e:
self._debug("Could not send HTTP/2 GET to " + self.ip + ":" + str(
self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE.value
data = None
# for upgrade mechanisms send only HTTP/1.1 request with upgrade header
else:
settings_header_value = h2_connection.initiate_upgrade_connection()
try:
sock.send(self.create_http11_upgrade_request(settings_header_value))
except Exception as e:
self._debug("Could not send HTTP/1.1 upgrade request to " + self.ip + ":" + str(
self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE.value
# parse upgrade response
response = self.receive_http2_upgrade_response(sock)
if isinstance(response, ReturnType):
return response.value
status_code, headers, http2_response = response
# check for redirect
if status_code == 301 or status_code == 302:
# redirect
redirect = self.update_redirect(headers)
if redirect != ReturnType.SUCCESS:
return redirect.value
return ReturnType.REDIRECT.value + "(" + self.hostname + self.path + ") -> " + self.analyze_http2_upgrade(recursion - 1)
# check for 101 status code
if status_code != 101:
self._debug("Received status code " + str(status_code) + " instead of 101 during HTTP/1.1 upgrade to HTTP/2.")
return ReturnType.FAILURE.value
# check for upgrade header presence
if "upgrade" not in headers:
self._debug("No upgrade header in response to HTTP/1.1 upgrade to HTTP/2.")
return ReturnType.FAILURE.value
if headers["upgrade"].lower() not in ["h2c", "http/2"]:
self._debug("Received upgrade header " + headers["upgrade"] + " instead of HTTP/2 during HTTP/1.1 upgrade to HTTP/2.")
return ReturnType.FAILURE.value
data = http2_response
##### MAIN LOOP FROM HERE ON #####
# receive response
finished_receiving = False
response_received = False
event_list = []
# receive all data and events
while not finished_receiving:
try:
# TODO: allow prior data from prior knowledge here
if data is None:
data = sock.recv(BUFFER_SIZE)
except TimeoutError:
# can be expected, pass
pass
if not data:
break
events = h2_connection.receive_data(data)
for event in events:
event_list.append(event)
if isinstance(event, h2.events.DataReceived):
self._debug("Received HTTP/2 data from " + self.ip + ":" + str(self.port) + "(" + self.hostname + "). Data: " + event.data.hex())
h2_connection.acknowledge_received_data(event.flow_controlled_length, event.stream_id)
break
if isinstance(event, h2.events.ResponseReceived):
self._debug("Received HTTP/2 response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + "). Response headers: " + str(event.headers))
# parse response_headers to dict of strings
try:
response_headers = dict([(key.decode("ASCII"), value.decode("ASCII")) for key, value in event.headers])
except Exception as e:
self._debug("Could not decode response headers from HTTP/2 response: " + str(event.headers) + " with exception: " + str(e))
return ReturnType.FAILURE.value
# analyze all received headers for ReturnType.SUCCESS, redirect or other
for key, value in response_headers.items():
if key == ":status":
try:
status_code = int(value)
except Exception as e:
self._debug("Could not extract status code from HTTP/2 response headers: " + str(response_headers) + " with exception: " + str(e))
return ReturnType.FAILURE.value
if status_code == 200:
return ReturnType.SUCCESS.value
elif status_code == 301 or status_code == 302:
# redirect
redirect = self.update_redirect(response_headers)
if redirect != ReturnType.SUCCESS:
return redirect.value
return ReturnType.REDIRECT.value + "(" + self.hostname + self.path + ") -> " + self.analyze_http2_prior_knowledge(recursion - 1)
else:
self._debug("Received status code " + str(status_code) + " leading to ReturnType.FAILURE.")
return ReturnType.FAILURE.value
response_received = True
self._debug("Received HTTP/2 response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
elif isinstance(event, h2.events.StreamEnded):
finished_receiving = True
break
# send any acknowledgement
sock.send(h2_connection.data_to_send())
# reset data for next iteration
data = None
if not response_received:
self._debug("No response received from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
return ReturnType.FAILURE.value
else:
self._debug("Received response but no status header from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
return ReturnType.FAILURE.value
def update_redirect(self, headers: dict) -> ReturnType:
"""
Updates the hostname and path based on a redirect response's headers.
:param headers: The headers of the redirect response.
:return: ReturnType.SUCCESS if the redirect was successful, ReturnType.MAX_REDIRECT for a circular
redirect, ReturnType.FAILURE otherwise.
The hostname and path are updated based on the Location header of the redirect response. Updates the object
variables.
"""
if "location" not in headers:
self._debug("No location header in redirect response.")
return ReturnType.FAILURE
# dont follow https redirects
redirect_hostname = headers["location"]
if redirect_hostname.startswith("https://"):
self._debug("Redirect to HTTPS site " + redirect_hostname + ", not following")
return ReturnType.HTTPS_REDIRECT
# extract new hostname and path
if "://" in redirect_hostname:
# cut optional http://
redirect_hostname = redirect_hostname.split("://")[1]
if "/" in redirect_hostname:
redirect_hostname, redirect_path = redirect_hostname.split("/", 1)
redirect_path = "/" + redirect_path
else:
redirect_path = "/"
# detect stagnant redirects
if redirect_path == self.path and redirect_hostname == self.hostname:
self._debug("Redirect to same hostname detected")
return ReturnType.MAX_REDIRECT
else:
# update path and hostname
self._debug("Redirect to " + redirect_hostname + redirect_path)
self.path = redirect_path
self.hostname = redirect_hostname
# also update ip address
if self.resolve_hostname:
if self._resolve_hostname() == ReturnType.FAILURE:
return ReturnType.FAILURE
else:
self._debug("Not resolving hostname after redirect because static IP was given.")
return ReturnType.SUCCESS
def create_http_09_request(self) -> bytes:
"""
Creates an HTTP/0.9 request.
:return: The encoded HTTP/0.9 request.
"""
return b"GET " + self.path.encode("ASCII") + b"\r\n"
def create_http1x_request(self, version: str) -> bytes:
"""
Creates an HTTP/1.x request.
:param version: The HTTP version to use. Provide the complete string, e.g. "HTTP/1.0".
:return: The encoded HTTP/1.x request.
"""
return (
b"GET " + self.path.encode("ASCII") + b" " + version.encode("ASCII") + b"\r\n" +
b"Host: " + self.hostname.encode("ASCII") + b"\r\n" +
b"User-Agent: " + USER_AGENT.encode("ASCII") + b"\r\n"
b"Connection: close\r\n" +
b"\r\n"
)
def create_http11_upgrade_request(self, settings_header_value: bytes) -> bytes:
"""
Creates an HTTP/1.1 request to upgrade to HTTP/2. I.e., an HTTP/1.1 request with an Upgrade header.
:param settings_header_value: The value of the HTTP2-Settings header.
:return: The encoded HTTP/1.1 request.
"""
return (
b"GET " + self.path.encode("ASCII") + b" HTTP/1.1\r\n" +
b"Host: " + self.hostname.encode("ASCII") + b"\r\n" +
b"User-Agent: " + USER_AGENT.encode("ASCII") + b"\r\n"
b"Connection: Upgrade, HTTP2-Settings\r\n" +
b"Upgrade: h2c\r\n" +
b"HTTP2-Settings: " + settings_header_value + b"\r\n" +
b"\r\n"
)
def open_socket(self) -> socket.socket:
"""
Opens a TCP socket to the server.
:return: The opened socket.
"""
self._debug("Opening TCP socket to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
return sock
def connect_socket(self, sock: socket.socket) -> ReturnType:
"""
Connects a previously opened socket to the server.
:param sock: The socket to connect.
:return: ReturnType.SUCCESS if the connection was successful, ReturnType.FAILURE otherwise.
"""
try:
sock.connect((self.ip, self.port))
except Exception as e:
self._debug("Could not open TCP socket to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE
self._debug("Successfully opened TCP socket to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
return ReturnType.SUCCESS
def receive_bytes(self, sock: socket.socket) -> Union[bytes, ReturnType]:
"""
Reads bytes from the socket.
:param sock: The socket to read from.
:return: The received bytes.
If the response is empty, ReturnType.TIMEOUT is returned. If the response cannot be read, ReturnType.FAILURE
is returned.
"""
try:
response = sock.recv(BUFFER_SIZE)
except socket.timeout as e:
self._debug("Could not receive response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.TIMEOUT
except Exception as e:
self._debug("Could not receive response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE
self._debug("Received response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + "): " + response.hex())
return response
def decode_bytes(self, response: bytes) -> Union[str, ReturnType]:
"""
Decodes bytes to an ASCII string. Returns the decoded string or ReturnType.FAILURE if the bytes cannot be
decoded.
:param response: The bytes to decode.
:return: The decoded response as a string.
If the response cannot be decoded, ReturnType.FAILURE is returned.
"""
try:
response = response.decode("ASCII")
self._debug("Decoded response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + "): " + response)
except Exception as e:
self._debug("Could not decode response from " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE
return response
def receive_ascii_response(self, sock: socket.socket, header_only: bool=False) -> Union[str, ReturnType]:
"""
Reads bytes from the socket and returns them as an ASCII string.
:param sock: The socket to read from.
:param header_only: Whether to only read the header of the response. (default: False)
:return: The ASCII response as a string.
If the response is empty, ReturnType.TIMEOUT is returned. If the response cannot be read, ReturnType.FAILURE
is returned.
"""
response = self.receive_bytes(sock)
if isinstance(response, ReturnType):
return response
if header_only:
# discard everything except the header
if not b"\r\n\r\n" in response:
self._debug("No \\r\\n\\r\\n separator between HTTP/1.x response header and body or request end.")
return ReturnType.FAILURE
response = response.split(b"\r\n\r\n")[0] + b"\r\n\r\n"
return self.decode_bytes(response)
def parse_http1_response(self, response: str) -> Union[Tuple[int, dict, str], ReturnType]:
"""
Parses an HTTP/1.x response. Extracts status code, headers, and HTTP version.
:param response: The response to parse.
:return: The status code, headers, and HTTP version as a tuple.
The response is expected to be a valid HTTP/1.x response.
"""
try:
response_lines = response.split('\r\n')
status = int(response_lines[0].split(' ')[1])
http_version = response_lines[0].split(' ')[0].split('/')[1]
headers = {}
for line in response_lines[1:]:
if line == '':
break
key, value = line.split(':', 1)
key = key.strip().lower()
headers[key] = value.strip().lower()
self._debug("Extracted Status code: " + str(status) + ", Headers: " + str(headers) + ", HTTP version: " + http_version)
except Exception as e:
self._debug("Failed to parse HTTP/1 response: " + response + " with exception: " + str(e))
return ReturnType.FAILURE
return status, headers, http_version
def receive_http1x_response(self, sock: socket.socket) -> Union[Tuple[int, dict, str], ReturnType]:
"""
Receives an HTTP/1.x response from the server. Parses the response and returns the status code, headers, and
HTTP version.
:param sock: The socket to receive the response from.
:return: The status code, headers, and HTTP version as a tuple.
If the response is empty, ReturnType.TIMEOUT is returned. If the response cannot be parsed, ReturnType.FAILURE is returned. Merges
the functionality of receive_ascii_response and parse_http1_response.
"""
response = self.receive_ascii_response(sock, header_only=True)
if isinstance(response, ReturnType):
return response
# extract status code, headers and http version
return self.parse_http1_response(response)
def receive_http2_upgrade_response(self, sock: socket.socket) -> Union[Tuple[int, dict, bytes], ReturnType]:
"""
Receives an HTTP/1.x response with status code 101 from the server. Parses the 101 response and returns its
status code and headers and the subsequent HTTP/2 response.
:param sock: The socket to receive the response from.
:return: The HTTP/2 response as bytes.
If the response is empty, ReturnType.TIMEOUT is returned. If the response cannot be parsed, ReturnType.FAILURE is returned.
"""
response = self.receive_bytes(sock)
if isinstance(response, ReturnType):
return response
# require \r\n\r\n to separate HTTP/1.1 and HTTP/2 response
if b"\r\n\r\n" not in response:
self._debug("No separator between HTTP/1.1 and HTTP/2 response.")
return ReturnType.FAILURE
# extract HTTP/1.1 and HTTP/2 response
http1_response, http2_response = response.split(b"\r\n\r\n", 1)
http1_response += b"\r\n\r\n"
self._debug("Extracted HTTP/1.1 response: " + http1_response.hex() + ", HTTP/2 response: " + http2_response.hex())
self._debug("Extracted HTTP/2 response: " + http2_response.hex())
# parse HTTP/1 response as ASCII
http1_response = self.decode_bytes(http1_response)
if isinstance(http1_response, ReturnType):
return http1_response
# extract status code, headers and http version
http1_response = self.parse_http1_response(http1_response)
if isinstance(http1_response, ReturnType):
return http1_response
status_code, headers, _ = http1_response
return status_code, headers, http2_response
def _resolve_hostname(self) -> ReturnType:
"""
Resolves the hostname to an IP address. Returns ReturnType.FAILURE if the hostname cannot be resolved. Sets the
object variable ip to the resolved IP address.
"""
try:
self.ip = socket.gethostbyname(self.hostname)
self._debug("Resolved hostname " + self.hostname + " to IP: " + self.ip)
except Exception as e:
self._debug("Failed to resolve hostname " + self.hostname + " with Exception: " + str(e))
return ReturnType.FAILURE
def analyze_tcp_reachability(self) -> ReturnType:
"""
Analyzes the server for TCP reachability. Opens a TCP socket to the server. Returns ReturnType.SUCCESS if the
server is reachable, ReturnType.FAILURE otherwise. Returns ReturnType.TIMEOUT if the server is not reachable due
to a timeout.
"""
# open tcp socket to ip
sock = self.open_socket()
try:
sock.connect((self.ip, self.port))
self._debug("Successfully opened TCP socket to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ")")
return ReturnType.SUCCESS
except socket.timeout as e:
self._debug("Could not open TCP socket to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.TIMEOUT
except Exception as e:
self._debug("Could not open TCP socket to " + self.ip + ":" + str(self.port) + "(" + self.hostname + ") with exception : " + str(e))
return ReturnType.FAILURE
def _debug(self, string: str, linebreaks: int=0):
"""
Prints a debug message if debug is enabled.
:param string: The debug message to print.
"""
if self.debug:
print(linebreaks * "\n" + "DEBUG:" + string)
# start main method
if __name__ == '__main__':
main()