-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDNSGenerator.py
142 lines (117 loc) · 4.22 KB
/
DNSGenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import re
hosts = {}
def load_records():
global hosts
hosts_file = open('/etc/hosts', 'r')
for line in hosts_file:
if line[0] != '#':
IP, domain = line.split('\t')
if line.endswith('\n'):
domain = domain[:-1]
if hosts.keys().__contains__(domain):
records = hosts.get(domain)
records.append(IP)
hosts[domain] = records
else:
hosts[domain] = [IP]
load_records()
def extract_domain_record(domain):
global hosts
if hosts.keys().__contains__(domain):
return hosts.get(domain)
else:
return []
class DNSGenerator:
def __init__(self, data):
self.data = data
self.format_error = 0
self.current_index_of_input = 0
self.QR = '1'
self.OPCODE = None
self.AA = '1'
self.TC = '0'
self.RD = '0'
self.RA = '0'
self.Z = '000'
self.RCODE = None
self.QDCOUNT = b'\x00\x01'
self.NSCOUNT = b'\x00\x00'
self.ARCOUNT = b'\x00\x00'
self.domain = None
def extract_question_type(self):
self.current_index_of_input += 2
def extract_domain_name(self):
self.current_index_of_input = 12
try:
self.check_if_question_is_empty()
labels = []
while self.data[self.current_index_of_input] != 0:
label = self.extract_one_label()
labels.append(label)
self.current_index_of_input += 1
self.domain = '.'.join(labels)
except IndexError:
self.domain = ''
def extract_one_label(self):
number_of_chars = self.data[self.current_index_of_input]
label = ''
for i in range(number_of_chars):
new_char = chr(self.data[self.current_index_of_input + i + 1])
label = label + new_char
self.current_index_of_input += number_of_chars + 1
return label
def run(self):
records = self.extract_record()
response = self.create_response(records)
return response
def extract_record(self):
self.extract_domain_name()
self.extract_question_type()
self.extract_class_type()
records = extract_domain_record(self.domain)
return records
def check_if_question_is_empty(self):
question_count = self.data[3:5]
if question_count == 0:
raise IndexError
def create_response(self, records):
header = self.create_response_header(records)
question = self.create_response_question()
answer = self.make_response_answer(records)
return header + question + answer
def create_response_question(self):
return self.data[12:self.current_index_of_input]
def create_response_header(self, records):
ID = self.extract_query_id()
flags = self.create_flags(records)
ANCOUNT = len(records).to_bytes(2, byteorder='big')
return ID + flags + self.QDCOUNT + ANCOUNT + self.NSCOUNT + self.ARCOUNT
def extract_query_id(self):
return self.data[:2]
def generate_opcode(self):
opcode_byte = self.data[2:3]
self.OPCODE = ''
for bit in range(1, 5):
self.OPCODE += str(ord(opcode_byte) & (1 << bit))
def create_flags(self, records):
self.generate_opcode()
first_byte = int(self.QR + self.OPCODE + self.AA + self.TC + self.RD, 2).to_bytes(1, byteorder='big')
self.generate_response_code(records)
second_byte = int(self.RA + self.Z + self.RCODE, 2).to_bytes(1, byteorder='big')
return first_byte + second_byte
def generate_response_code(self, records):
if len(records) == 0:
self.RCODE = '0011'
else:
self.RCODE = '0000'
def make_response_answer(self, records):
answer = b''
for record in records:
answer += self.data[12:self.current_index_of_input]
answer += int(400).to_bytes(4, byteorder='big')
answer += b'\x00\x04'
for part in record.split('.'):
answer += bytes([int(part)])
return answer
def extract_class_type(self):
self.current_index_of_input += 2