-
Notifications
You must be signed in to change notification settings - Fork 0
/
kvBroker.py
248 lines (194 loc) · 8.97 KB
/
kvBroker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import argparse
import pickle
import random
import socket
import sys
from utils.Parser import Parser
from utils.Request import Request
from utils.Response import Response
class KvBroker:
def __init__(self, options) -> None:
self._options = options
self._configuration, self._sockets = self.init_connections()
print('=== Welcome to kvBroker ===')
self.init_data()
self.start_shell()
def init_connections(self):
try:
f = open(self._options.s, 'r')
configuration = f.readlines()
f.close()
except FileNotFoundError as e:
sys.exit(f'{e}')
sockets = {}
for c in configuration:
try:
ip, port = c.split(' ')
new_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
new_socket.connect((ip, int(port)))
sockets[(ip, int(port))] = new_socket
except socket.error as e:
print(f'{str(e)} {ip} : {str(port)}')
# Check if there are enough servers available to support replication
if len(sockets) < int(self._options.k):
print('[ERROR] Number of servers is lower than the replication number')
exit()
return configuration, sockets
# Load and parse data from the given file
def load_data(self):
return Parser.file_parser(self._options.i)
# Initialize data with respect to replication factor
def init_data(self):
print('Data initialization...')
data = self.load_data()
for d in data:
req = Request('PUT', d)
rdm_sockets = random.sample(list(self._sockets.keys()), self._options.k)
for i in rdm_sockets:
cur_socket = self._sockets[i]
# Send request's OPTIONS
cur_socket.send(req.get_options())
cur_socket.recv(100)
# Send request
cur_socket.send(req.get_request())
# Receive response's OPTIONS
res_options = pickle.loads(cur_socket.recv(100))
# Send confirmation
confirm = Response()
cur_socket.send(confirm.get_response())
# Receive response
res = pickle.loads(cur_socket.recv(res_options['res_size']))
def print_result(self, query, results):
if 200 in results:
print(f'{query} : {results[200]}')
elif 201 in results:
print(f'{results[201]}')
elif 404 in results:
print(f'{query} : {results[404]}')
elif 400 in results:
print(f'{results[400]}')
elif 422 in results:
print(f'[Error | Invalid data format] {results[422]}')
# Start the terminal interface for accepting new requests
def start_shell(self):
while True:
cmd = input('kvStore> ').strip()
if not cmd:
continue
if cmd == '!DISCONNECT':
sys.exit('kvBroker has been disconnected! Bye!')
else:
self._heart_beat()
is_replication_valid = self._is_replication_valid()
try:
req_type, payload = cmd.split(' ', 1)
except ValueError as e:
print('[Error] Payload is missing!')
continue
if req_type == 'PUT':
if self._is_replication_supported():
print('[WARNING] Not enough servers to support replication! Please restart the servers!')
continue
# Check if a record with the given root key already exists
try:
self._is_root_key_duplicated(payload)
except Exception as e:
print(e)
continue
# Create request
req = Request(req_type, payload)
# Select random sockets
rdm_sockets = random.sample(list(self._sockets.keys()), self._options.k)
# Send request to the selected servers
result = {}
for ip in rdm_sockets:
self._stream(req, ip, result)
self.print_result(payload, result)
elif req_type == 'GET' or req_type == 'QUERY':
result = {}
req = Request(req_type, payload)
for ip in self._sockets.keys():
self._stream(req, ip, result)
if not is_replication_valid:
print(f'[WARNING] Data may be inconsistent. {self._options.k} or more servers are unavailable!')
self.print_result(payload, result)
elif req_type == 'DELETE':
if not self._is_all_servers_available():
print('[WARNING] Cannot perform delete operation. One or more servers are unavailable!')
continue
result = {}
req = Request(req_type, payload)
for ip in self._sockets.keys():
self._stream(req, ip, result)
self.print_result(payload, result)
else:
print('[INFO] Invalid request type!')
print('[INFO] Supported types: PUT | GET | QUERY | DELETE')
# Send hear beat request to check the status of the servers
def _heart_beat(self):
req = Request('HEART_BEAT')
ip_address = list(self._sockets.keys())
for ip in ip_address:
try:
# Send request's OPTIONS
self._sockets[ip].send(req.get_options())
pickle.loads(self._sockets[ip].recv(100))
# Send request
self._sockets[ip].send(req.get_request())
# Receive response
pickle.loads(self._sockets[ip].recv(100))
except socket.error as e:
print(f'[Error] {e}')
except EOFError as e:
del self._sockets[ip]
# Check server availability
def _is_all_servers_available(self):
return len(self._sockets) == len(self._configuration)
# Check if replication rules are valid
def _is_replication_valid(self):
return len(self._configuration) - len(self._sockets) < self._options.k
# Check if there are enough servers to support replication
def _is_replication_supported(self):
return len(self._sockets) < self._options.k
# Check if there is a record with the given root key
def _is_root_key_duplicated(self, payload):
root_key, temp_result = payload.split(':', 1)[0].strip('"'), {}
req = Request('GET', root_key)
for ip in self._sockets.keys():
self._stream(req, ip, temp_result)
# If there is a record with a given key, it will be removed
if 200 in temp_result:
if not self._is_all_servers_available():
raise Exception('[WARNING] Cannot perform PUT operation in order to update a record. One or more servers are unavailable!')
else:
req = Request('DELETE', root_key)
for ip in self._sockets.keys():
self._stream(req, ip, {})
# Send data to the servers
def _stream(self, req, ip, result):
try:
cur_socket = self._sockets[ip]
# Send request OPTIONS
cur_socket.send(req.get_options())
pickle.loads(cur_socket.recv(100))
# Send request
cur_socket.send(req.get_request())
# Receive response's OPTIONS
res_options = pickle.loads(cur_socket.recv(100))
# Send confirmation
confirm = Response()
cur_socket.send(confirm.get_response())
# Receive response
res = pickle.loads(cur_socket.recv(res_options['res_size']))
result[res['code']] = res['payload']
except socket.error as e:
print(f'[Error] {e}')
if __name__ == '__main__':
args_parser = argparse.ArgumentParser(description='kvBroker')
args_parser.add_argument('-s', '--serverFile', dest='s', type=str, default='serverFile.txt', metavar='', required=True, help='The serverFile is a space separated list of server IPs and their respective ports that will be listening for queries and indexing commands.')
args_parser.add_argument('-i', '--dataToIndex', dest='i', type=str, default='dataToIndex.txt', metavar='', required=True, help='The dataToIndex is a file containing data that was generated using the data generator.')
args_parser.add_argument('-k', '--kReplication', dest='k', type=int, default=1, metavar='', required=True, help='The k value is the replication factor.')
args = args_parser.parse_args()
if args.k < 1:
sys.exit('[Error] Replication factor should be greater equal to 1.')
kv_broker = KvBroker(args)