-
Notifications
You must be signed in to change notification settings - Fork 0
/
exploit_runner.py
77 lines (66 loc) · 2.58 KB
/
exploit_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
from flags import flag_parser
from flags import flag_submission
import subprocess
import ipaddress
import logging
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
logger = logging.getLogger(__name__)
running = True
EXPLOIT_DIR = 'exploits'
START_IP = '10.10.40.101' #inclusive
END_IP = '10.10.40.128' #inclusive
OWN_IP = '10.10.40.126'
EXPLOIT_TIMEOUT = 3 # Max timeout for exploit to finish
TIME_BETWEEN_RUNS = 60 # In seconds
def sigint_handler(sig, frame):
global running
running = False
raise KeyboardInterrupt()
def run_exploit(path: Path):
logger.info(f'Executing {path}...')
output_list = []
for ip_num in range(int(ipaddress.IPv4Address(START_IP)), int(ipaddress.IPv4Address(END_IP)) + 1):
ip = ipaddress.IPv4Address(ip_num)
if str(ip) == OWN_IP:
continue
try:
process = subprocess.run([path, str(ip)], timeout=EXPLOIT_TIMEOUT, stdout=subprocess.PIPE)
except subprocess.TimeoutExpired:
logger.warning(f'Timeout when exploiting file {path} for IP {ip}')
continue
except PermissionError:
logger.warning(f'File {path} does not have execute permissions! Please run "chmod a+x {path}"')
return
output_list.append(process.stdout.decode('UTF-8'))
concatinated_output = "\n".join(output_list)
flags = flag_parser.parse_string(concatinated_output)
flag_submission.submit_flags(flags)
def run_exploits_in_folder():
exploit_folder = Path(EXPLOIT_DIR)
with ThreadPoolExecutor() as thread_pool:
while running:
start = time.perf_counter()
threads = []
for exploit in exploit_folder.iterdir():
threads.append(thread_pool.submit(run_exploit, exploit))
logger.debug('Waiting for all threads to finish...')
wait(threads, return_when=ALL_COMPLETED)
duration = time.perf_counter() - start
logger.info(f'Threadpool finished in {duration:.1f}s!')
try:
if TIME_BETWEEN_RUNS > duration:
time.sleep(TIME_BETWEEN_RUNS - duration)
except KeyboardInterrupt:
logger.info("Interrupted...")
if __name__ == '__main__':
import signal
signal.signal(signal.SIGINT, sigint_handler)
logging.basicConfig(
format='%(asctime)s.%(msecs)03d <%(threadName)s> %(levelname)-8s %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S')
run_exploits_in_folder()
logger.info('Exiting...')