-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
1186 lines (1044 loc) · 53.2 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# permanent import
import os
import uuid
from dotenv import load_dotenv
from flask import Flask, jsonify, redirect, render_template, request, url_for, session
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from flask_socketio import SocketIO, disconnect, emit
from authlib.integrations.flask_client import OAuth
from datetime import datetime, timedelta, timezone
from dateutil import parser
from sqlalchemy.orm import validates # for validation of data in tables
from sqlalchemy import Column, or_ # used for reference to tables' column name
from player import player
from threading import Thread
import string
from text_generator import Text_Generator
# temporary imports, which will be deleted later
import random
# STR_MAX_SIZE = 65535
class App:
"""
This will serve as the Flask backend of the application which will contain the logic for each of the routes.
_app : Flask application which creates and controls the url routes
_db : database connection which allows for interaction with the SQL database
"""
_app = Flask(__name__)
# Use cors to faciliates api requests/responses time
# Particularly to retrieve userinfo from google servers
CORS(_app)
_app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///ThrillTyper.db"
db = SQLAlchemy(_app)
# Explicitly load env
load_dotenv()
_api_key = os.environ.get("API_KEY")
# Configuration of flask app
appConf = {
"OAUTH2_CLIENT_ID": os.environ.get("CLIENT_ID"),
"OAUTH2_CLIENT_SECRET": os.environ.get("CLIENT_SECRET"),
"OAUTH2_META_URL": "https://accounts.google.com/.well-known/openid-configuration",
"FLASK_SECRET": os.environ.get("FLASK_SECRET"),
"FLASK_PORT": 5000
}
_app.secret_key = appConf.get("FLASK_SECRET")
oauth = OAuth(_app)
# list of google scopes - https://developers.google.com/identity/protocols/oauth2/scopes
oauth.register(
"ttyper",
client_id=appConf.get("OAUTH2_CLIENT_ID"),
client_secret=appConf.get("OAUTH2_CLIENT_SECRET"),
client_kwargs={
"scope": "openid profile email",
"code_challenge_method": "S256" # enable PKCE
},
server_metadata_url=f"{appConf.get('OAUTH2_META_URL')}",
)
socketio = SocketIO(_app)
players = {}
players_finished = {}
broadcast_active = False
broadcast_thread = None
def run(self, host: str | None = None, port: int | None = None, debug: bool | None = None, load_dotenv: bool = True, **options):
"""
Calls Flask's run function with the specified parameters to run the backend for the web application.\n
Preconditions: host is a valid IP address and port is a valid and open port\n
Flask"s descriptions of the parameters:
:param host: the hostname to listen on. Set this to ``'0.0.0.0'`` to
have the server available externally as well. Defaults to
``'127.0.0.1'`` or the host in the ``SERVER_NAME`` config variable
if present.
:param port: the port of the webserver. Defaults to ``5000`` or the
port defined in the ``SERVER_NAME`` config variable if present.
:param debug: if given, enable or disable debug mode. See
:attr:`debug`.
:param load_dotenv: Load the nearest :file:`.env` and :file:`.flaskenv`
files to set environment variables. Will also change the working
directory to the directory containing the first file found.
:param options: the options to be forwarded to the underlying Werkzeug
server. See :func:`werkzeug.serving.run_simple` for more
information.
"""
self._app.run(host,port,debug,load_dotenv)
@socketio.on('event')
def handle_chat_global(json):
global socketio
print('received my event: ' + str(json))
App.socketio.emit('global chat', json)
"""
added by Hu
"""
def broadcast_player_list():
print("Broadcast thread started")
while App.broadcast_active:
App.socketio.sleep(0.5) # Sleep for 100 milliseconds
App.socketio.emit('current player lists', list(App.players.values()))
print("Broadcast thread ended")
@socketio.on('connect')
def handle_connect():
player_id = request.sid
# player_id = 'player' + str(len(App.players)+1)
App.players[player_id] = {'id': player_id, 'currentCharIndex': 0}
emit('your player id', {'player_id': player_id}, room=player_id)
emit('update players', list(App.players.values()), broadcast=True)
print('a new player has connected')
print(f'User connected: {player_id}, players list updated')
print(f'Current players list: {App.players}') # Print the entire list of players
@socketio.on('disconnect')
def handle_disconnect():
if request.sid in App.players:
player_id = request.sid # Store the ID of the disconnecting player
del App.players[request.sid]
emit('client disconnected', {'player_id': player_id}, broadcast=True)
print('Player has disconnected:', player_id)
print(f'Current players list: {App.players}') # Print the entire list of players
@socketio.on('start game')
def handle_start_game(data):
print('Received start game signal:', data)
# App.socketio.emit('start game', {'message': 'Start the game!', 'textKey': data['text']})
emit('start game', {'message': 'Start the game!', 'textKey': data['textKey']}, broadcast=True)
# Start the broadcast thread only if it's not already running
if not App.broadcast_active:
App.broadcast_active = True
App.broadcast_thread = Thread(target=App.broadcast_player_list)
App.broadcast_thread.start()
@socketio.on('stop game')
def handle_stop_game(data):
print('Received stop game signal:', data)
# App.socketio.emit('stop game', {'message': 'Stop the game!'})
emit('stop game', {'message': 'Stop the game!'}, broadcast=True)
# Stop the broadcasting thread
if App.broadcast_active:
App.broadcast_active = False
if App.broadcast_thread.is_alive():
App.broadcast_thread.join() # Ensure the thread stops gracefully
@socketio.on('update char index')
def handle_update_char_index(data):
player_id = request.sid # Assuming you're using the default SocketIO session ID as the player ID
current_char_index = data['currentCharIndex']
# Update the player's current character index in the players dictionary
if player_id in App.players:
App.players[player_id]['currentCharIndex'] = current_char_index
print(f"Updated currentCharIndex for {player_id}: {current_char_index}")
@socketio.on('player finished')
def handle_player_finished(data):
print("Received data:", data) # Add this line to debug what you receive
player_id = data['playerId']
finished_time = data['finishedTime']
if player_id in App.players:
App.players_finished[player_id] = {'finishedTime': finished_time}
print(f'Player {player_id} finished at {finished_time}')
print("Players Finished", App.players_finished) # Add this line to debug what you receive
# Check if all players have finished
if len(App.players_finished) == len(App.players):
# Create a sorted list of players based on their finish times
sorted_players = sorted(App.players_finished.items(), key=lambda x: x[1]['finishedTime'])
rankings = [{'playerId': player_id, 'rank': i + 1} for i, (player_id, _) in enumerate(sorted_players)]
# Send rankings to all clients
emit('game rankings', rankings, broadcast=True)
App.players_finished.clear()
print('Rankings sent:', rankings)
if App.broadcast_active:
App.broadcast_active = False
if App.broadcast_thread.is_alive():
App.broadcast_thread.join() # Ensure the thread stops gracefully
@_app.route("/login", methods=["GET", "POST"])
def login():
"""
Handles the requests made to the login page where users can log in
:return : a str html page that redirects the user to the login page
"""
error = session.pop("error", None)
if request.method == "POST":
# Authenticate the user Close Session when done
pass
return render_template("login.html", error=error)
@_app.route("/", methods=["POST", "GET"])
def home():
"""
Handles the requests made to the home page.
:return : a str html page that redirects the user to the home page
"""
return render_template("base.html", userSession=session.get("user"))
@_app.route("/google-signin", methods=["GET", "POST"])
def google_login():
"""
Handles the requests made to the website where users can log in to google
:postcondition: a google user login successfully
:return : a str html page that redirects the user to the callback method on success
"""
return App.oauth.ttyper.authorize_redirect(redirect_uri=url_for("google_callback", _external=True))
@_app.route("/google-logged")
def google_callback():
"""
Handles the returned redirect requests from google signin
:postcondition: a new user will be registered with a message saying "Successfully registered" and the database will update with the new user
info, the user will be redirected to the home page
:postcondition: create the user session
:return : a str html page that redirects the user to the menu page
"""
try:
# Obtain the access token from Google OAuth
token = App.oauth.ttyper.authorize_access_token()
# Check if the "id_token" is present in the token
if "id_token" in token:
# If the "id_token" is present, indicating a successful login
# Extract and store necessary user information in the session
uname = token["userinfo"]["email"]
picture = token["userinfo"]["picture"]
# Instantiate a player object to store in user session
playerObj = player(username=uname, avatar=picture)
# Establish user session, use the json format of the website
session["user"] = playerObj.__json__()
# Insert user info into the database if doesn"t exists yet
if Database.query(uname, "UserInfo") is None:
Database.insert(UserInfo, _username=uname, _password=token["access_token"], _email=uname, _profile_photo=picture)
Database.insert(UserData, _username=uname,_email=uname,_accuracy=0,_wins=0,_losses=0,_freq_mistyped_words=0,_total_playing_time=0,_top_wpm=0,_num_races=0,_user_in_game_picture=picture,_last_login_time=datetime.now(timezone.utc))
user_letter_data = {
"_username": uname,
"_email": uname,
**{f"_{letter}": 0 for letter in string.ascii_lowercase},
"_comma": 0,
"_period": 0,
"_exclamation": 0,
"_question": 0,
"_hyphen": 0,
"_semicolon": 0,
"_single_quote": 0,
"_double_quote": 0,
}
Database.insert(UserLetter, **user_letter_data)
else:
Database.update(uname, "UserData",
_last_login_time=datetime.now(timezone.utc))
else:
# Handle the case where access is denied (user cancelled the login)
return "Access denied: Google login was canceled or failed."
# Redirect to the desired page after successful authentication
return redirect("/")
except Exception as e:
# For if user cancels the login
return redirect("/login")
@_app.route("/authentication", methods=["POST"])
def authenticate():
"""
Endpoint called to authenticate users attempting to login. Session is created on successful login.
:pre-condition: The request form will have a username and password field
"""
try:
# Retrieves data from the requests
# The keys must exist
username = request.form["username"]
password = request.form["password"]
# Retrieve data from database
# Exist if returned value of query is not None
user = Database.query(username, "UserInfo")
# Performs validation
if user is not None and user._password == password:
Database.update(user._username, "UserData",
_last_login_time=datetime.now(timezone.utc))
# Gets avatar
playerObj = player(username, user._profile_photo)
# Stores the Player object in the session
session["user"] = playerObj.__json__()
# Redirects to a desired page when authentication success
return redirect("/#/menu")
else:
# Raises an error for wrong match
raise ValueError("Invalid username or password")
except Exception as e:
# Handles errors
error = f"{e}"
session["error"] = error
return redirect("login")
@_app.route("/signup", methods=["GET", "POST"])
def signup():
"""
A route path for signup page layout
:return: str html page for signup layout
"""
error = session.pop("error", None)
return render_template("signup.html", error=error)
@_app.route("/register", methods=["POST"])
def register():
"""
Creates and logs a new user account
:precondition: form contained valid input
:postcondition: new user info will be inserted into the database on success
"""
# Gets input
username = request.form["username"]
email = request.form["email"]
password = request.form["password"]
# Validates contraints
if Database.query(username, "UserInfo"):
session["error"] = "Username already used "
return redirect("/signup")
if Database.query(email, "UserInfo"):
session["error"] = "Email already used "
return redirect("/signup")
# Stores into database
avatar = url_for("static", filename="pics/anonymous.png")
Database.insert(UserInfo, _username=username, _email=email,
_password=password, _profile_photo=avatar)
Database.insert(UserData, _username=username, _email=email, _accuracy=0, _wins=0, _losses=0, _freq_mistyped_words=0,
_total_playing_time=0, _top_wpm=0, _num_races=0, _last_login_time=datetime.now(timezone.utc))
user_letter_data = {
"_username": username,
"_email": email,
**{f"_{letter}": 0 for letter in string.ascii_lowercase},
"_comma": 0,
"_period": 0,
"_exclamation": 0,
"_question": 0,
"_hyphen": 0,
"_semicolon": 0,
"_single_quote": 0,
"_double_quote": 0,
}
Database.insert(UserLetter, **user_letter_data)
# Store session
playerObj = player(username, avatar)
# Stores the Player object in the session
session["user"] = playerObj.__json__()
# Redirects to the result page
return redirect("/#/menu")
@_app.route("/logout", methods=["GET", "POST"])
def logout():
"""
Log out user from the session
:postcondition: session is None
"""
# Pop out the user session
session.pop("user", None)
return redirect("/")
@_app.route("/get_avg_txt_len/", methods=["GET"])
def get_avg_txt_len():
"""
Handles requests to get the average length of a word/sentence from a list
:param difficulty
:param form : Specifies the form of text generated. Values: 'sentences' or 'words'
"""
difficulty = request.args.get("difficulty")
if not difficulty:
difficulty = ""
else:
difficulty += "_"
return str(Text_Generator.get_avg_txt_len(Text_Generator.get_txt_list(difficulty+request.args.get("form")+".txt")))
def get_test_client(self):
return self._app.test_client()
@_app.route('/user/<username>')
def get_user_data(username):
userData = Database.query(str(username), "UserData")
if userData is None:
return jsonify({'error': 'User not found'}), 404
else:
return jsonify({
"username": userData._username,
"highestWPM": userData._top_wpm,
"wins": userData._wins,
"losses": userData._losses,
"accuracy": userData._accuracy,
"frequentMisTypedWord": userData._freq_mistyped_words,
"totalTime": userData._total_playing_time,
"frequentMisTypedWord": userData._freq_mistyped_words
})
@_app.route('/leaderboard/top_n_highest_wpm/<int:n>', methods=['GET'])
def get_top_n_highest_wpm_leaderboard(n):
try:
top_scores = UserData.query \
.with_entities(UserData._username, UserData._top_wpm, UserData._accuracy, UserInfo._profile_photo) \
.join(UserInfo, UserData._username == UserInfo._username) \
.order_by(UserData._top_wpm.desc()) \
.limit(n) \
.all()
leaderboard_info = [{
'username': player._username,
'highest_wpm': player._top_wpm,
'accuracy': player._accuracy,
'profile_photo': player._profile_photo
} for player in top_scores]
return jsonify(leaderboard_info)
except Exception as e:
return jsonify({'error': str(e)}), 500
@_app.route("/update_db", methods=["POST"])
def update_db():
"""
Endpoint called to update user stats post-game
"""
# TODO: need to secure data transfer and verify origin
if request.is_json:
usr_session = session.get("user")
if usr_session:
usr = usr_session["userinfo"]["given_name"]
user_data = Database.query(usr, "UserData")
game_data = request.json
num_races = int(user_data._num_races)
game_wpm = game_data["wpm"]
Database.update(usr, "UserData", _accuracy=(game_data["accuracy"]+float(user_data._accuracy)*num_races)/(num_races+1), _num_races=num_races+1,
_total_playing_time=user_data._total_playing_time+game_data["elapsedTime"], _top_wpm=game_wpm if game_wpm > int(user_data._top_wpm) else int(user_data._top_wpm))
last_user_race = UserRace.query.filter_by(
_username=usr).order_by(UserRace._game_num.desc()).first()
if last_user_race:
Database.insert(UserRace, _game_num=int(last_user_race._game_num)+1, _username=usr, _email=str(user_data._email), _average_wpm=game_wpm,
_selected_mode=game_data["mode"], _time_limit=game_data.get("timeLimit"), _date_played=parser.parse(game_data["date"]))
else:
Database.insert(UserRace, _game_num=1, _username=usr, _email=str(user_data._email), _average_wpm=game_wpm, _selected_mode=game_data["mode"], _time_limit=game_data.get(
"timeLimit"), _date_played=parser.parse(game_data["date"])) # messed this up, so adding this line for testing
# expect a dict for this {"_char":mistyped_count}
mistyped_chars = game_data.get("mistypedChars")
if mistyped_chars:
user_letter = Database.query(usr, "UserLetter")
try:
for char, num in mistyped_chars.items():
# frontend args for the returned json must match those of the db
setattr(user_letter, f"{char}", getattr(
user_letter, f"{char}")+num)
App.db.session.commit()
except Exception as e:
print(e)
App.db.session.rollback()
return "Not successful"
return "Successful"
return "Not successful"
@_app.route("/generate_text/",methods=["GET"])
def generate_text():
"""
Sends back text for the requestor to use
:param difficulty
:param form : Specifies the form of text generated. Values: 'sentences' or 'word_list'
"""
difficulty = request.args.get("difficulty")
wpm = request.args.get("wpm")
if wpm:
wpm = int(wpm)
if wpm>=0 and wpm<=45:
difficulty="easy"
elif wpm>=46 and wpm<=80:
difficulty="medium"
else:
difficulty="hard"
if not difficulty:
difficulty=""
return Text_Generator.generate_text(difficulty,request.args.get("form"),request.args.get("amount"),request.args.get("genre"))
@_app.route('/raceData/<username>', methods=['GET', 'POST'])
def getUserRaceData(username):
try:
userData = UserRace.query \
.with_entities(UserRace._average_wpm, UserRace._selected_mode, UserRace._time_limit, UserRace._date_played) \
.filter(UserRace._username == username) \
.order_by(UserRace._date_played.desc()) \
.all()
raceData = [{
'average_wpm': player._average_wpm,
'selected_mode': player._selected_mode,
'time_limit': player._time_limit,
'date_played': player._date_played
} for player in userData]
return jsonify(raceData)
except Exception as e:
return jsonify({'error': str(e)}), 500
class Database:
"""
A class representing a database connection and operations.
Attributes:
_app (App): The Flask application instance associated with the database.
_models (dict): A dictionary containing model classes representing database tables. Keys are model names, and values are the corresponding model classes.
Methods:
__init__(app: App, **models)
Initializes a Database instance with the provided Flask application and model classes.
insert(username: str, psw: str, wpm: int = None, accuracy: float = None, wins: int = None, losses: int = None, freq_mistyped_words: str = None)
Inserts a new user record into the database.
update(username: str, **kwargs)
Updates a user record in the database.
query(username: str)
Queries a user record from the database.
"""
def __init__(self, app: App, **models):
"""
Initializes a Database instance with the provided Flask application and model classes.
:param app: The Flask application instance associated with the database.
:type app: App
:param models: Keyword arguments representing model classes representing database tables. Keys are model names, and values are the corresponding model classes.
:type models: dict
:returns: None
:precondition: App and App.db are fully configured
:precondition: model(s) are fully configured and set-up
"""
self._app = app
self._models = models
# temporary auto populate and inserting method for anyone want to test the database
# which will be deleted later
@staticmethod
def populate_sample_date(num_rows):
"""
Responsible for auto populating
"""
try:
current_datetime = datetime.now(timezone.utc)
for i in range(1, num_rows + 1):
sample_google_id = "".join(random.choices(
string.ascii_letters + string.digits, k=10)) # set length of id to ten
user_info_data = {
"_username": f"user{i}",
"_password": f"password{i}",
"_email": f"user{i}@gmail.com",
"_profile_photo": f'./static/pics/terraria_player.png',
"_google_id": sample_google_id
}
user_data_data = {
"_username": f"user{i}",
"_email": f"user{i}@gmail.com",
"_top_wpm": 10+i,
"_accuracy": 20 + (i*0.5),
"_wins": 10+i,
"_losses": 1+i,
"_freq_mistyped_words": f"word{i}|mistake{i}",
"_total_playing_time": 3600*i,
}
user_letter_data = {
"_username": f"user{i}",
"_email": f"user{i}@gmail.com",
**{f"_{letter}": random.randint(0, 100) for letter in string.ascii_lowercase},
"_comma": random.randint(0, 100),
"_period": random.randint(0, 100),
"_exclamation": random.randint(0, 100),
"_question": random.randint(0, 100),
"_hyphen": random.randint(0, 100),
"_semicolon": random.randint(0, 100),
"_single_quote": random.randint(0, 100),
"_double_quote": random.randint(0, 100),
}
user_race_data = {
"_username": f"user{i}",
"_email": f"user{i}@gmail.com",
"_average_wpm": random.randint(40, 100),
"_selected_mode": random.choice(["Practice", "Robot Opponent", "MultiPlayer"]),
"_time_limit": random.choice([None, 30, 60, 90]),
"_date_played": current_datetime - timedelta(days=i)
}
Database.insert(UserInfo, **user_info_data)
Database.insert(UserData, **user_data_data)
Database.insert(UserLetter, **user_letter_data)
Database.insert(UserRace, **user_race_data)
print(f"{num_rows} sample users added successfully")
except Exception as e:
print(f"Error while populating sample rows: {e}")
#used for generating mock data with unique wpm
used_top_wpm_values = set()
@staticmethod
def generate_unique_wpm():
"""
Generate a unique random WPM value that has not been used before.
"""
while True:
wpm_value = random.randint(50, 150) # Adjust the range as needed
if wpm_value not in Database.used_wpm_values:
return wpm_value
#this method is used to populate a specific user's data in all tables
@staticmethod
def populate_sample_data_for_user(user_name:str):
"""
Responsible for auto populating sample data for a specific user
"""
try:
current_datetime = datetime.now(timezone.utc)
sample_google_id = "".join(random.choices(string.ascii_letters + string.digits, k=10)) # set length of id to ten
user_info_data = {
"_username": f"{user_name}",
"_password": f"password{user_name}",
"_email": f"{user_name}@gmail.com",
"_profile_photo": f'./static/pics/terraria_player.png',
"_google_id": sample_google_id
}
# Generate unique top WPM value
top_wpm_value = random.randint(0, 110)
while top_wpm_value in Database.used_top_wpm_values:
top_wpm_value = random.randint(0, 110)
user_data_data = {
"_username": f"{user_name}",
"_email": f"{user_name}@gmail.com",
"_top_wpm": top_wpm_value,
"_accuracy": 20 + (random.randint(0, 100) * 0.5),
"_wins": random.randint(0, 100),
"_losses": random.randint(0, 100),
"_freq_mistyped_words": f"word{user_name}|mistake{user_name}",
"_total_playing_time": random.randint(0, 100),
}
# Add the generated top WPM value to the set of used values
Database.used_top_wpm_values.add(top_wpm_value)
user_letter_data = {
"_username": f"{user_name}",
"_email": f"{user_name}@gmail.com",
**{f"_{letter}": random.randint(0, 100) for letter in string.ascii_lowercase},
"_comma": random.randint(0, 100),
"_period": random.randint(0, 100),
"_exclamation": random.randint(0, 100),
"_question": random.randint(0, 100),
"_hyphen": random.randint(0, 100),
"_semicolon": random.randint(0, 100),
"_single_quote": random.randint(0, 100),
"_double_quote": random.randint(0, 100),
}
Database.insert(UserInfo, **user_info_data)
Database.insert(UserData, **user_data_data)
Database.insert(UserLetter, **user_letter_data)
print(f"Sample data added successfully for user{user_name}")
except Exception as e:
print(f"Error while populating sample data: {e}")
@staticmethod
def insert(db_table, **kwargs):
"""
Insert a new user record into the database.
:param db_table: The SQLalchemy model class representing a database table
:type db_table: SQlalchemy model class
:param kwargs: this is the keyword arguments that represents field names and the corresponding value
:type kwargs: dict
:precondition: All required fields for the model class must be provided in kwargs (for example: non-nullable fields must be provided)
:precondition: If provided, `wpm`, `accuracy`, `wins`, `losses`, and `freq_mistyped_words` must be of the correct data types and within acceptable ranges.
:postcondition: If successful, a new user record is inserted into the database with password hashed.
"""
# check the provided key arguments based on valid column names
# raise ValueError if invalid column names are found
# retrieve all columns" name in the table
valid_columns = db_table.__table__.columns.keys()
# this is the required columns that must have a value entered (nullable=False)
required_columns = set(
Column.name for Column in db_table.__table__.columns if not Column.nullable)
# invliad columns are the set of argument keys minus the set of valid columns and non-required columns
# this required column is need to find all non-required columns
# this is needed to prevent crash when a valid column is not present in the insert, and viewed as an invliad column
invalid_columns = set(kwargs.keys()) - set(valid_columns) - \
(set(valid_columns) - required_columns)
if invalid_columns:
# list of the invalid columns
raise ValueError(
f"Invalid column(s) provided: {','.join(invalid_columns)}")
# instance of the model with specified column names in parameter
new_row = db_table(**kwargs)
try:
App.db.session.add(new_row) # add the new row to database table
App.db.session.commit() # commit the transaction/changes
return new_row
except Exception as e:
App.db.session.rollback() # rollback the change made
raise e
@staticmethod
def update(username: str, db_table_name: str, **kwargs):
"""
Update a user record in the database.
:param username: Unique identifier of the user to be updated.
:type username: str
:param db_table_name : the input class table name
:type db_table_name : str
:param **kwargs: Keyword arguments representing fields to be updated. Valid fields are "_pswd", "_wpm",
"_accuracy", "_wins", "_losses", and "_freq_mistyped_words".
:precondition: `username` must exist in the database.
:precondition: At least one field to update must be provided.
:precondition: If provided, values for fields must be of the correct data types and within acceptable ranges.
:postcondition: If successful, the user record is updated with the provided values.
"""
try:
# first validate the table name given in string
valid_table_list = ["UserInfo",
"UserData", "UserLetter", "UserRace"]
if db_table_name not in valid_table_list:
raise ValueError(f"Invalid table name: {db_table_name}")
# get the table class obj by given table name in string
table_obj = globals().get(db_table_name)
if table_obj is None:
raise ValueError(
f"Table Class Object not found for table name: {db_table_name}")
# query for user information
user_information = table_obj.query.filter_by(
_username=username).first()
if user_information is None:
raise ValueError(
f"User '{username}' does not exist in the Database")
# after user information is query, perform a check of if user is trying to update their _username
# check if the updating username is unique in the database
# get the value based on the key
new_username = kwargs.get("_username")
if new_username and new_username != username: # unique
existing_user = table_obj.query.filter_by(
_username=new_username).first()
if existing_user:
raise ValueError(
f"Username '{new_username}' already exists in the Database")
# does the same check for email address
new_email = kwargs.get("_email")
if new_email and new_email != user_information._email:
existing_email = table_obj.query.filter_by(
_email=new_email).first()
if existing_email:
raise ValueError(
f"Email '{new_email}' already exists in the Database")
# validates and update the provided fields
# key is the column name, value is the updating data
for key, value in kwargs.items():
# ensuring the fields/columns exist in the according table
if hasattr(table_obj, key): # table_obj is referring to the table class object
setattr(user_information, key, value)
else:
raise AttributeError(
f"Attribute '{key}' does not exist in the '{db_table_name}' table")
# if new_username and new_username != username:
# Database.update_username(username, new_username)
# commit the updated values and fields
App.db.session.commit()
print(
f"User '{username}' record updated successfully in table '{db_table_name}'")
except Exception as e:
App.db.session.rollback()
print(
f"Error in updating user '{username}' in table '{db_table_name}' : {e}")
@staticmethod
def query(identifier: str, db_table_class: str):
# changes made: being able to query by either _username or _email using or_ operator provided by sqlalchemy
"""
Query a user record from the database using either username or email address.
:param identifier: A unique identifier of the user to be queried.
:type identifier: str
:param db_table_class: the name of the table class
:type db_table_class: str
:return: Returns the User table object if found, else None.
:rtype: User Data table object or None
:precondition: `identifier` must be a valid user identifier/column in the data table.
:postcondition: If a user with the provided username/email exists in the database, returns the corresponding User Data object; otherwise, returns None.
"""
try:
# a list of valid table names
valid_table_list = ["UserInfo",
"UserData", "UserLetter", "UserRace"]
# validates if the given string is in the list
if db_table_class in valid_table_list:
# find the table class object by the given string
table_name_obj = globals().get(db_table_class)
# retriving data by sqlalchemy"s query and filter
retrieved_data = table_name_obj.query.filter(or_(
table_name_obj._username == identifier, table_name_obj._email == identifier)).first()
# filter_by takes kwargs, not positional arguments
# if user does not exist, return nothing
if retrieved_data is None:
print(f"Invalid username/email entered: {identifier}")
return None
# user information object returned
return retrieved_data
else:
# handles invalid table name string
raise ValueError(f"Invalid table name: {db_table_class}")
except Exception as e:
print(
f"Error in querying user information from {db_table_class}: {e}")
return None
@staticmethod
def delete(username: str):
"""
Delete a user record from the database.
:param username: Unique identifier of the user to be deleted.
:type username: str
:return: True if the user record is successfully deleted, False otherwise.
:rtype: bool
:precondition: `username` must be a valid user identifier.
:postcondition: If a user with the provided username exists in the database, the corresponding user record is deleted.
"""
try:
# the first index/result filtered by the username
delete_user = UserInfo.query.filter_by(_username=username).first()
# print("the user is: ", delete_user)
if delete_user:
# if username exists delete it and return True
App.db.session.delete(delete_user)
App.db.session.commit()
return True
# else username does not exist
else:
return False
except Exception as e:
# roll back transaction if error occurred
App.db.session.rollback()
return False
@staticmethod
def get_top_n_letters(username: str, n: int):
"""
Return a (sorted in DESC)list of letters according to the Top-N largest corresponding values(mistyped letter times) in UserLetter Table
:param username: the Username of the user
:type username: str
:param n: the selected number of Top-N largest letter to retrieve, max is 26
:type n: int
:return: List containing the Top-N letters in DESC order
:rtype: list
"""
try:
# validate if user exist in UserInfo
user_info = UserInfo.query.filter_by(_username=username).first()
if not user_info:
print(f"User '{username}' does not exist")
return []
# validate n
max_n = 26 + 8 # eight punctuations added
if n < 1 or n > max_n:
print("Invalid value for 'n', Only 26 Letters and 8 Punctuations")
return []
# query using username the user letter data
user_letter_data = UserLetter.query.filter_by(
_username=username).first()
# return empty list if user letter data is None
if not user_letter_data:
print(f"No Data Found For User '{username}'")
return []
# a dictionary with letters as keys and mistyped letter times as the number value
# loop through each letter in the alaphbets
letter_number_dict = {}
for letter in string.ascii_lowercase:
column_name = f"_{letter}"
letter_number_dict[letter] = getattr(
user_letter_data, column_name)
# added punctuations
# list of added punctuations
punctuation_marks = [",", ".", "!", "?", "-", ";", "'", '"']
punct_names = {
",": "_comma",
".": "_period",
"!": "_exclamation",
"?": "_question",
"-": "_hyphen",
";": "_semicolon",
"'": "_single_quote",
'"': "_double_quote"
}
for mark in punctuation_marks:
get_punct = punct_names.get(mark)
if get_punct:
letter_number_dict[mark] = getattr(
user_letter_data, get_punct)
# sort the dict by top-n values in desc order, returning a list
sorted_values = sorted(letter_number_dict, key=letter_number_dict.get, reverse=True)[
:n] # n here is not inclusive
# since there is a _ as the first index, it needs to be removed, starting each string with [1:]
# rm_underscore = [letter[1:] for letter in sorted_values]
return sorted_values
except Exception as e:
print(
f"Error while retrieving top {n} largest values for corresponding letters for user '{username}' : {e}")
return []
# these two tables/classes are not limited to parent/child relationship
# they"re bidirectional, you can retrieve the relative data of the other table by calling either table
# UserData table will have the foreign key
# responsible for storing user"s personal information
class UserInfo(App.db.Model):
"""
Representation of user personal information stored in the database under UserInfo table
_username : primary key of the table, unique identifier of a user
_password : can not be null, password of a user"s account
_email : the unique email address of the user
_profile_photo : the url representation of the user"s profile photo in email
_registered_date : record of the date and time in UTC when user registered
_google_id : identification for third party user(sign in via email)
"""
_username = App.db.Column(App.db.String(
30), primary_key=True) # primary_key makes username not null and unique
# password can be null for login with email
_password = App.db.Column(App.db.String(30))
# this will be kept nullable for now, if required later, this will be changed, along with the other tables
_email = App.db.Column(App.db.String(60), unique=True)
_profile_photo = App.db.Column(App.db.String(255))
# record the time the user account is created
_registered_date = App.db.Column(
App.db.DateTime, default=App.db.func.current_timestamp()) # still in UTC timezone
_google_id = App.db.Column(App.db.String(100))
# user_info_ref/user_data_ref are accessors to navigate the relationship between UserData and UserInfo objects
# uselist set to False meaning one-to-one relationship between the two table
# one instance of the user_info is related to one and only one user_data instance (1:1))
user_data_ref = App.db.relationship("UserData", backref=App.db.backref(
"user_info_ref_data", uselist=False), cascade="all, delete-orphan", single_parent=True)
# cascade = "all, delete-orphan" when userinfo/data row is deleted, the parent/child row will also be deleted in one-to-one relationship
# since cascade default to be many-to-one relationship(1 userinfo - Many userdata rows), single_parent flag need to set to be True(ensures 1:1)
# another backref relationship for UserLetter class (for delete)
user_letter_ref = App.db.relationship("UserLetter", backref=App.db.backref(
"user_info_ref_letter", uselist=False), cascade="all, delete-orphan", single_parent=True)