-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
292 lines (260 loc) · 10.2 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# SPDX-License-Identifier: BSD-3-Clause
from flask import Flask, request, send_from_directory, render_template, make_response, session
from datetime import timedelta
from dotenv import load_dotenv
import bcrypt, os, json, shutil, re, argparse
app = Flask(__name__)
absRootPath = os.path.dirname(os.path.abspath(__file__))
app.permanent_session_lifetime = timedelta(hours=8)
load_dotenv("./.env")
app.secret_key = os.environ.get("SECRET_KEY")
admin_name = os.environ.get("ADMIN_NAME")
pwd = os.environ.get("PASSWORD").encode('utf-8')
# HTTPS header variables to be changed
strictTransportSecurity = "max-age=31536000; includeSubDomains"
contentSecurityPolicy = "default-src https: 'unsafe-eval' 'unsafe-inline'; object-src 'none'"
# https://flask.palletsprojects.com/en/3.0.x/web-security/
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
)
def isDrivePresent(serialNumber: str, firmwareVersion: str):
filesSaved = os.listdir("./Public/Outputs")
for fileName in filesSaved:
try:
with open("./Public/Outputs/" + fileName, "r") as fd:
data = json.load(fd)
if(serialNumber == data["Identify"]["Serial number"] and firmwareVersion == data["Identify"]["Firmware version"]):
return True
except:
print(f"Error while reading file {fileName}, skipping")
return False
def saveDrive(clientJSON):
count = len(os.listdir("./Public/Outputs"))
lastFile = sorted(os.listdir("./Public/Outputs"))[count - 1]
newIndex = int(re.search('(\d+)\.json', lastFile).group(1)) + 1
with open(f"./Public/Outputs/drive{newIndex}.json", "x") as fd:
json.dump(obj=clientJSON, fp=fd, ensure_ascii=True, indent=4)
def saveMetadata(clientJSON : object):
if(not(f"drive{clientJSON['index']}.json" in os.listdir("./Public/Metadata"))):
with open(f"./Public/Metadata/drive{clientJSON['index']}.json", "x") as fd:
pass
data = {}
with open(f"./Public/Metadata/drive{clientJSON['index']}.json", "r+") as fd:
# Strore read data and clear the file so that we can overwrite it
if(len(fd.readlines()) == 0):
mdIndex = "0"
json.dump(obj={mdIndex : clientJSON["metadata"]}, fp=fd, ensure_ascii=True, indent=4)
return mdIndex
else:
shutil.copy(f"./Public/Metadata/drive{clientJSON['index']}.json", f"./Public/Metadata/drive{clientJSON['index']}.tmp")
try:
fd.seek(0) # to account for the readlines()
data = json.load(fd)
fd.seek(0)
# if metadata were removed to the point of only brackets remaining
if(not(data == {})):
# create a temporary file in case of error
fd.truncate(0)
# convert string keys to int
indexes = [int(k) for k in data]
mdIndex = max(indexes) + 1
mdIndex = f"{mdIndex}"
data[mdIndex] = clientJSON["metadata"]
json.dump(obj=data, fp=fd, ensure_ascii=True, indent=4)
os.remove(f"./Public/Metadata/drive{clientJSON['index']}.tmp")
return mdIndex
else:
fd.truncate(0)
mdIndex = "0"
json.dump(obj={mdIndex : clientJSON["metadata"]}, fp=fd, ensure_ascii=True, indent=4)
os.remove(f"./Public/Metadata/drive{clientJSON['index']}.tmp")
return mdIndex
except Exception as error:
print(error)
os.remove(f"./Public/Metadata/drive{clientJSON['index']}.json")
shutil.copy(f"./Public/Metadata/drive{clientJSON['index']}.tmp", f"./Public/Metadata/drive{clientJSON['index']}.json")
os.remove(f"./Public/Metadata/drive{clientJSON['index']}.tmp")
def removeMetadata(index, mdIndex):
if(f"drive{index}.json" in os.listdir("./Public/Metadata")):
data = {}
with open(f"./Public/Metadata/drive{index}.json", "r+") as fd:
# Strore read data and clear the file so that we can overwrite it
data = json.load(fd)
if(not mdIndex in data):
print(f"Metadata with index {mdIndex} not found")
return
fd.truncate(0)
with open(f"./Public/Metadata/drive{index}.json", "r+") as fd:
del data[mdIndex]
json.dump(obj=data, fp=fd, ensure_ascii=True, indent=4)
else:
# File doesn't exist
pass
print("File to be removed doesn't exist")
def saveSSC(SSC):
if(f"{SSC['SSC name']}.json" in os.listdir("./Public/SSCs")):
print(f"{SSC['SSC name']} already exists in SSCs folder")
else:
with open(f"./Public/SSCs/{SSC['SSC name']}.json", "x") as fd:
json.dump(obj=SSC, fp=fd, ensure_ascii=True, indent=4)
def removeDrive(index):
try:
os.remove(f"{absRootPath}/Public/Outputs/drive{index}.json")
return True
except OSError as error:
print(error)
return False
def isAuthorized():
if('user' in session):
return True
else:
return False
def isInt(object):
try:
int(object)
return True
except:
return False
@app.after_request
def setHeaders(response):
response.headers['Strict-Transport-Security'] = strictTransportSecurity
response.headers['Content-Security-Policy'] = contentSecurityPolicy
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
return response
# Default routing of files
@app.get('/')
def defaultRoute():
return send_from_directory("./Public/HTML/", "index.html")
@app.get('/<path:path>')
def returnFile(path):
return send_from_directory("./Public/", path)
@app.get('/names')
def fetchFilenames():
savedFiles = os.listdir("./Public/Outputs")
returnJSON = {}
for file in savedFiles:
# Python returns timestamp seconds and float, so we need to convert it for JS
returnJSON[file] = int(os.path.getmtime(f"./Public/Outputs/{file}")) * 1000
return returnJSON
@app.get('/SSCs')
def fetchSSCs():
savedFiles = os.listdir("./Public/SSCs")
savedFiles = sorted(savedFiles)
returnString = ""
for file in savedFiles: # ",".join()
returnString += file + ","
returnString = returnString[:-1] # remove last ,
return returnString
@app.delete('/SSCs')
def deleteSSC():
if(isAuthorized()):
filename = request.json["SSCfile"]
savedFiles = os.listdir("./Public/SSCs")
if(filename in savedFiles):
os.remove(f"./Public/SSCs/{filename}")
return '', 200
else:
return '', 400
else:
return '', 401
@app.post('/SSCs')
def receiveSSC():
if(isAuthorized()):
newSSC = request.json
if(not("SSC" in newSSC and "SSC name" in newSSC and "SSC fset" in newSSC and
"mandatory" in newSSC)):
return 'JSON needs to contain SSC, SSC name, SSC fset and mandatory values', 400
else:
saveSSC(newSSC)
return '', 200
else:
return '', 401
@app.post('/token')
def verifyToken():
if(isAuthorized()):
return '', 200
else:
return '', 401
@app.post('/login')
def login():
auth = request.authorization
if not auth:
return '', 401
else:
username = auth.username
password = auth.password.encode('utf-8')
if((username == admin_name) and (bcrypt.checkpw(password, pwd))):
session['user'] = auth.username
session.permanent = True
return '', 200
else:
return '', 401
@app.post('/logout')
def logout():
session.pop('user', None)
return '', 200
@app.post('/metadata')
def metadataActions():
if(not (isAuthorized())):
return '', 401
else:
clientJSON = request.json
action = clientJSON["action"]
if(action == "addMetadata"):
if(not isInt(clientJSON["index"])):
return '', 400
mdIndex = saveMetadata(clientJSON)
return mdIndex, 202
elif(action == "remMetadata"):
if(not (isInt(clientJSON["index"]) and isInt(clientJSON["mdIndex"]))):
return 'Invalid index or mdIndex', 400
removeMetadata(clientJSON["index"], clientJSON["mdIndex"])
print(f"Removed {clientJSON['mdIndex']} metadata from disk d{clientJSON['index']}")
return '', 200
@app.post('/outputs')
def addDrive():
if(not (isAuthorized())):
return '', 401
else:
clientJSON = request.json
if(isDrivePresent(clientJSON["Identify"]["Serial number"], clientJSON["Identify"]["Firmware version"])):
return '', 202
else:
try:
if(not("Identify" in clientJSON and "Discovery 0" in clientJSON)):
return 'Identify and Discovery 0 needed', 400
saveDrive(clientJSON)
return '', 200
except Exception as error:
print(error)
print("Failed to save given JSON")
return '', 400
@app.delete('/outputs')
def outputDelete():
if(not (isAuthorized())):
return '', 401
else:
clientJSON = request.json
if(isInt(clientJSON["index"]) ):
if(removeDrive(clientJSON["index"])):
return '', 200
else:
return 'Failed to remove drive', 400
else :
return 'Provided drive index is not valid', 400
if __name__ == "__main__":
parser = argparse.ArgumentParser(description = 'Application server for SED Storage Visualiser')
parser.add_argument('--HTTP',
help='Launch the server without requirement for HTTPS (just keep in mind that security of authentication relies on HTTPS)',
action="store_true")
args = parser.parse_args()
if args.HTTP:
app.config.update(
SESSION_COOKIE_SECURE=False,
)
strictTransportSecurity = ""
contentSecurityPolicy = ""
app.run('0.0.0.0', port=8000)