-
Notifications
You must be signed in to change notification settings - Fork 1
/
api.py
3311 lines (2843 loc) · 128 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Instructor Dashboard API views
JSON views which the instructor dashboard requests.
Many of these GETs may become PUTs in the future.
"""
import StringIO
import json
import logging
import re
import time
from django.conf import settings
from django.views.decorators.csrf import ensure_csrf_cookie, csrf_exempt
from django.views.decorators.http import require_POST, require_http_methods
from django.views.decorators.cache import cache_control
from django.core.exceptions import ValidationError, PermissionDenied
from django.core.mail.message import EmailMessage
from django.core.exceptions import ObjectDoesNotExist
from django.db import IntegrityError, transaction
from django.core.urlresolvers import reverse
from django.core.validators import validate_email
from django.utils.translation import ugettext as _
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseNotFound
from django.utils.html import strip_tags
from django.shortcuts import redirect
import string
import random
import unicodecsv
import decimal
from student import auth
from student.roles import GlobalStaff, CourseSalesAdminRole, CourseFinanceAdminRole
from util.file import (
store_uploaded_file, course_and_time_based_filename_generator,
FileValidationException, UniversalNewlineIterator
)
from util.json_request import JsonResponse, JsonResponseBadRequest
from instructor.views.instructor_task_helpers import extract_email_features, extract_task_features
from courseware.access import has_access
from courseware.courses import get_course_with_access, get_course_by_id
from django.contrib.auth.models import User
from django_comment_client.utils import has_forum_access
from django_comment_common.models import (
Role,
FORUM_ROLE_ADMINISTRATOR,
FORUM_ROLE_MODERATOR,
FORUM_ROLE_COMMUNITY_TA,
)
from edxmako.shortcuts import render_to_string
from courseware.models import StudentModule
from shoppingcart.models import (
Coupon,
CourseRegistrationCode,
RegistrationCodeRedemption,
Invoice,
CourseMode,
CourseRegistrationCodeInvoiceItem,
)
from student.models import (
CourseEnrollment, unique_id_for_user, anonymous_id_for_user,
UserProfile, Registration, EntranceExamConfiguration,
ManualEnrollmentAudit, UNENROLLED_TO_ALLOWEDTOENROLL, ALLOWEDTOENROLL_TO_ENROLLED,
ENROLLED_TO_ENROLLED, ENROLLED_TO_UNENROLLED, UNENROLLED_TO_ENROLLED,
UNENROLLED_TO_UNENROLLED, ALLOWEDTOENROLL_TO_UNENROLLED, DEFAULT_TRANSITION_STATE
)
import instructor_task.api
from instructor_task.api_helper import AlreadyRunningError
from instructor_task.models import ReportStore
import instructor.enrollment as enrollment
from instructor.enrollment import (
get_user_email_language,
enroll_email,
send_mail_to_student,
get_email_params,
send_beta_role_email,
unenroll_email,
)
from instructor.access import list_with_level, allow_access, revoke_access, ROLES, update_forum_role
import instructor_analytics.basic
import instructor_analytics.distributions
import instructor_analytics.csvs
import csv
from openedx.core.djangoapps.user_api.preferences.api import get_user_preference, set_user_preference
from openedx.core.djangolib.markup import HTML, Text
from instructor.views import INVOICE_KEY
from submissions import api as sub_api # installed from the edx-submissions repository
from certificates import api as certs_api
from certificates.models import CertificateWhitelist, GeneratedCertificate, CertificateStatuses, CertificateInvalidation
from bulk_email.models import CourseEmail, BulkEmailFlag
from student.models import get_user_by_username_or_email
from .tools import (
dump_student_extensions,
dump_module_extensions,
find_unit,
get_student_from_identifier,
require_student_from_identifier,
handle_dashboard_error,
parse_datetime,
set_due_date_extension,
strip_if_string,
)
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locations import SlashSeparatedCourseKey
from opaque_keys import InvalidKeyError
from openedx.core.djangoapps.course_groups.cohorts import is_course_cohorted
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
log = logging.getLogger(__name__)
def common_exceptions_400(func):
"""
Catches common exceptions and renders matching 400 errors.
(decorator without arguments)
"""
def wrapped(request, *args, **kwargs): # pylint: disable=missing-docstring
use_json = (request.is_ajax() or
request.META.get("HTTP_ACCEPT", "").startswith("application/json"))
try:
return func(request, *args, **kwargs)
except User.DoesNotExist:
message = _("User does not exist.")
if use_json:
return JsonResponse({"error": message}, 400)
else:
return HttpResponseBadRequest(message)
except AlreadyRunningError:
message = _("Task is already running.")
if use_json:
return JsonResponse({"error": message}, 400)
else:
return HttpResponseBadRequest(message)
return wrapped
def require_post_params(*args, **kwargs):
"""
Checks for required parameters or renders a 400 error.
(decorator with arguments)
`args` is a *list of required POST parameter names.
`kwargs` is a **dict of required POST parameter names
to string explanations of the parameter
"""
required_params = []
required_params += [(arg, None) for arg in args]
required_params += [(key, kwargs[key]) for key in kwargs]
# required_params = e.g. [('action', 'enroll or unenroll'), ['emails', None]]
def decorator(func): # pylint: disable=missing-docstring
def wrapped(*args, **kwargs): # pylint: disable=missing-docstring
request = args[0]
error_response_data = {
'error': 'Missing required query parameter(s)',
'parameters': [],
'info': {},
}
for (param, extra) in required_params:
default = object()
if request.POST.get(param, default) == default:
error_response_data['parameters'].append(param)
error_response_data['info'][param] = extra
if len(error_response_data['parameters']) > 0:
return JsonResponse(error_response_data, status=400)
else:
return func(*args, **kwargs)
return wrapped
return decorator
def require_level(level):
"""
Decorator with argument that requires an access level of the requesting
user. If the requirement is not satisfied, returns an
HttpResponseForbidden (403).
Assumes that request is in args[0].
Assumes that course_id is in kwargs['course_id'].
`level` is in ['instructor', 'staff']
if `level` is 'staff', instructors will also be allowed, even
if they are not in the staff group.
"""
if level not in ['instructor', 'staff']:
raise ValueError("unrecognized level '{}'".format(level))
def decorator(func): # pylint: disable=missing-docstring
def wrapped(*args, **kwargs): # pylint: disable=missing-docstring
request = args[0]
course = get_course_by_id(CourseKey.from_string(kwargs['course_id']))
if has_access(request.user, level, course):
return func(*args, **kwargs)
else:
return HttpResponseForbidden()
return wrapped
return decorator
def require_global_staff(func):
"""View decorator that requires that the user have global staff permissions. """
def wrapped(request, *args, **kwargs): # pylint: disable=missing-docstring
if GlobalStaff().has_user(request.user):
return func(request, *args, **kwargs)
else:
return HttpResponseForbidden(
u"Must be {platform_name} staff to perform this action.".format(
platform_name=configuration_helpers.get_value('PLATFORM_NAME', settings.PLATFORM_NAME)
)
)
return wrapped
def require_sales_admin(func):
"""
Decorator for checking sales administrator access before executing an HTTP endpoint. This decorator
is designed to be used for a request based action on a course. It assumes that there will be a
request object as well as a course_id attribute to leverage to check course level privileges.
If the user does not have privileges for this operation, this will return HttpResponseForbidden (403).
"""
def wrapped(request, course_id): # pylint: disable=missing-docstring
try:
course_key = CourseKey.from_string(course_id)
except InvalidKeyError:
log.error(u"Unable to find course with course key %s", course_id)
return HttpResponseNotFound()
access = auth.user_has_role(request.user, CourseSalesAdminRole(course_key))
if access:
return func(request, course_id)
else:
return HttpResponseForbidden()
return wrapped
def require_finance_admin(func):
"""
Decorator for checking finance administrator access before executing an HTTP endpoint. This decorator
is designed to be used for a request based action on a course. It assumes that there will be a
request object as well as a course_id attribute to leverage to check course level privileges.
If the user does not have privileges for this operation, this will return HttpResponseForbidden (403).
"""
def wrapped(request, course_id): # pylint: disable=missing-docstring
try:
course_key = CourseKey.from_string(course_id)
except InvalidKeyError:
log.error(u"Unable to find course with course key %s", course_id)
return HttpResponseNotFound()
access = auth.user_has_role(request.user, CourseFinanceAdminRole(course_key))
if access:
return func(request, course_id)
else:
return HttpResponseForbidden()
return wrapped
EMAIL_INDEX = 0
USERNAME_INDEX = 1
NAME_INDEX = 2
COUNTRY_INDEX = 3
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_level('staff')
def register_and_enroll_students(request, course_id): # pylint: disable=too-many-statements
"""
Create new account and Enroll students in this course.
Passing a csv file that contains a list of students.
Order in csv should be the following email = 0; username = 1; name = 2; country = 3.
Requires staff access.
-If the email address and username already exists and the user is enrolled in the course,
do nothing (including no email gets sent out)
-If the email address already exists, but the username is different,
match on the email address only and continue to enroll the user in the course using the email address
as the matching criteria. Note the change of username as a warning message (but not a failure). Send a standard enrollment email
which is the same as the existing manual enrollment
-If the username already exists (but not the email), assume it is a different user and fail to create the new account.
The failure will be messaged in a response in the browser.
"""
if not configuration_helpers.get_value(
'ALLOW_AUTOMATED_SIGNUPS',
settings.FEATURES.get('ALLOW_AUTOMATED_SIGNUPS', False),
):
return HttpResponseForbidden()
course_id = SlashSeparatedCourseKey.from_deprecated_string(course_id)
warnings = []
row_errors = []
general_errors = []
# for white labels we use 'shopping cart' which uses CourseMode.DEFAULT_SHOPPINGCART_MODE_SLUG as
# course mode for creating course enrollments.
if CourseMode.is_white_label(course_id):
course_mode = CourseMode.DEFAULT_SHOPPINGCART_MODE_SLUG
else:
course_mode = None
if 'students_list' in request.FILES:
students = []
try:
upload_file = request.FILES.get('students_list')
if upload_file.name.endswith('.csv'):
students = [row for row in csv.reader(upload_file.read().splitlines())]
course = get_course_by_id(course_id)
else:
general_errors.append({
'username': '', 'email': '',
'response': _('Make sure that the file you upload is in CSV format with no extraneous characters or rows.')
})
except Exception: # pylint: disable=broad-except
general_errors.append({
'username': '', 'email': '', 'response': _('Could not read uploaded file.')
})
finally:
upload_file.close()
generated_passwords = []
row_num = 0
for student in students:
row_num = row_num + 1
# verify that we have exactly four columns in every row but allow for blank lines
if len(student) != 4:
if len(student) > 0:
general_errors.append({
'username': '',
'email': '',
'response': _('Data in row #{row_num} must have exactly four columns: email, username, full name, and country').format(row_num=row_num)
})
continue
# Iterate each student in the uploaded csv file.
email = student[EMAIL_INDEX]
username = student[USERNAME_INDEX]
name = student[NAME_INDEX]
country = student[COUNTRY_INDEX][:2]
email_params = get_email_params(course, True, secure=request.is_secure())
try:
validate_email(email) # Raises ValidationError if invalid
except ValidationError:
row_errors.append({
'username': username, 'email': email, 'response': _('Invalid email {email_address}.').format(email_address=email)})
else:
if User.objects.filter(email=email).exists():
# Email address already exists. assume it is the correct user
# and just register the user in the course and send an enrollment email.
user = User.objects.get(email=email)
# see if it is an exact match with email and username
# if it's not an exact match then just display a warning message, but continue onwards
if not User.objects.filter(email=email, username=username).exists():
warning_message = _(
'An account with email {email} exists but the provided username {username} '
'is different. Enrolling anyway with {email}.'
).format(email=email, username=username)
warnings.append({
'username': username, 'email': email, 'response': warning_message
})
log.warning(u'email %s already exist', email)
else:
log.info(
u"user already exists with username '%s' and email '%s'",
username,
email
)
# enroll a user if it is not already enrolled.
if not CourseEnrollment.is_enrolled(user, course_id):
# Enroll user to the course and add manual enrollment audit trail
create_manual_course_enrollment(
user=user,
course_id=course_id,
mode=course_mode,
enrolled_by=request.user,
reason='Enrolling via csv upload',
state_transition=UNENROLLED_TO_ENROLLED,
)
enroll_email(course_id=course_id, student_email=email, auto_enroll=True, email_students=True, email_params=email_params)
else:
# This email does not yet exist, so we need to create a new account
# If username already exists in the database, then create_and_enroll_user
# will raise an IntegrityError exception.
password = generate_unique_password(generated_passwords)
errors = create_and_enroll_user(
email, username, name, country, password, course_id, course_mode, request.user, email_params
)
row_errors.extend(errors)
else:
general_errors.append({
'username': '', 'email': '', 'response': _('File is not attached.')
})
results = {
'row_errors': row_errors,
'general_errors': general_errors,
'warnings': warnings
}
return JsonResponse(results)
def generate_random_string(length):
"""
Create a string of random characters of specified length
"""
chars = [
char for char in string.ascii_uppercase + string.digits + string.ascii_lowercase
if char not in 'aAeEiIoOuU1l'
]
return string.join((random.choice(chars) for __ in range(length)), '')
def generate_unique_password(generated_passwords, password_length=12):
"""
generate a unique password for each student.
"""
password = generate_random_string(password_length)
while password in generated_passwords:
password = generate_random_string(password_length)
generated_passwords.append(password)
return password
def create_user_and_user_profile(email, username, name, country, password):
"""
Create a new user, add a new Registration instance for letting user verify its identity and create a user profile.
:param email: user's email address
:param username: user's username
:param name: user's name
:param country: user's country
:param password: user's password
:return: User instance of the new user.
"""
user = User.objects.create_user(username, email, password)
reg = Registration()
reg.register(user)
profile = UserProfile(user=user)
profile.name = name
profile.country = country
profile.save()
return user
def create_manual_course_enrollment(user, course_id, mode, enrolled_by, reason, state_transition):
"""
Create course enrollment for the given student and create manual enrollment audit trail.
:param user: User who is to enroll in course
:param course_id: course identifier of the course in which to enroll the user.
:param mode: mode for user enrollment, e.g. 'honor', 'audit' etc.
:param enrolled_by: User who made the manual enrollment entry (usually instructor or support)
:param reason: Reason behind manual enrollment
:param state_transition: state transition denoting whether student enrolled from un-enrolled,
un-enrolled from enrolled etc.
:return CourseEnrollment instance.
"""
enrollment_obj = CourseEnrollment.enroll(user, course_id, mode=mode)
ManualEnrollmentAudit.create_manual_enrollment_audit(
enrolled_by, user.email, state_transition, reason, enrollment_obj
)
log.info(u'user %s enrolled in the course %s', user.username, course_id)
return enrollment_obj
def create_and_enroll_user(email, username, name, country, password, course_id, course_mode, enrolled_by, email_params):
"""
Create a new user and enroll him/her to the given course, return list of errors in the following format
Error format:
each error is key-value pait dict with following key-value pairs.
1. username: username of the user to enroll
1. email: email of the user to enroll
1. response: readable error message
:param email: user's email address
:param username: user's username
:param name: user's name
:param country: user's country
:param password: user's password
:param course_id: course identifier of the course in which to enroll the user.
:param course_mode: mode for user enrollment, e.g. 'honor', 'audit' etc.
:param enrolled_by: User who made the manual enrollment entry (usually instructor or support)
:param email_params: information to send to the user via email
:return: list of errors
"""
errors = list()
try:
with transaction.atomic():
# Create a new user
user = create_user_and_user_profile(email, username, name, country, password)
# Enroll user to the course and add manual enrollment audit trail
create_manual_course_enrollment(
user=user,
course_id=course_id,
mode=course_mode,
enrolled_by=enrolled_by,
reason='Enrolling via csv upload',
state_transition=UNENROLLED_TO_ENROLLED,
)
except IntegrityError:
errors.append({
'username': username, 'email': email, 'response': _('Username {user} already exists.').format(user=username)
})
except Exception as ex: # pylint: disable=broad-except
log.exception(type(ex).__name__)
errors.append({
'username': username, 'email': email, 'response': type(ex).__name__,
})
else:
try:
# It's a new user, an email will be sent to each newly created user.
email_params.update({
'message': 'account_creation_and_enrollment',
'email_address': email,
'password': password,
'platform_name': configuration_helpers.get_value('platform_name', settings.PLATFORM_NAME),
})
send_mail_to_student(email, email_params)
except Exception as ex: # pylint: disable=broad-except
log.exception(
"Exception '{exception}' raised while sending email to new user.".format(exception=type(ex).__name__)
)
errors.append({
'username': username,
'email': email,
'response':
_("Error '{error}' while sending email to new user (user email={email}). "
"Without the email student would not be able to login. "
"Please contact support for further information.").format(error=type(ex).__name__, email=email),
})
else:
log.info(u'email sent to new created user at %s', email)
return errors
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_level('staff')
@require_post_params(action="enroll or unenroll", identifiers="stringified list of emails and/or usernames")
def students_update_enrollment(request, course_id):
"""
Enroll or unenroll students by email.
Requires staff access.
Query Parameters:
- action in ['enroll', 'unenroll']
- identifiers is string containing a list of emails and/or usernames separated by anything split_input_list can handle.
- auto_enroll is a boolean (defaults to false)
If auto_enroll is false, students will be allowed to enroll.
If auto_enroll is true, students will be enrolled as soon as they register.
- email_students is a boolean (defaults to false)
If email_students is true, students will be sent email notification
If email_students is false, students will not be sent email notification
Returns an analog to this JSON structure: {
"action": "enroll",
"auto_enroll": false,
"results": [
{
"email": "[email protected]",
"before": {
"enrollment": false,
"auto_enroll": false,
"user": true,
"allowed": false
},
"after": {
"enrollment": true,
"auto_enroll": false,
"user": true,
"allowed": false
}
}
]
}
"""
course_id = SlashSeparatedCourseKey.from_deprecated_string(course_id)
action = request.POST.get('action')
identifiers_raw = request.POST.get('identifiers')
identifiers = _split_input_list(identifiers_raw)
auto_enroll = request.POST.get('auto_enroll') in ['true', 'True', True]
email_students = request.POST.get('email_students') in ['true', 'True', True]
is_white_label = CourseMode.is_white_label(course_id)
reason = request.POST.get('reason')
if is_white_label:
if not reason:
return JsonResponse(
{
'action': action,
'results': [{'error': True}],
'auto_enroll': auto_enroll,
}, status=400)
enrollment_obj = None
state_transition = DEFAULT_TRANSITION_STATE
email_params = {}
if email_students:
course = get_course_by_id(course_id)
email_params = get_email_params(course, auto_enroll, secure=request.is_secure())
results = []
for identifier in identifiers:
# First try to get a user object from the identifer
user = None
email = None
language = None
try:
user = get_student_from_identifier(identifier)
except User.DoesNotExist:
email = identifier
else:
email = user.email
language = get_user_email_language(user)
try:
# Use django.core.validators.validate_email to check email address
# validity (obviously, cannot check if email actually /exists/,
# simply that it is plausibly valid)
validate_email(email) # Raises ValidationError if invalid
if action == 'enroll':
before, after, enrollment_obj = enroll_email(
course_id, email, auto_enroll, email_students, email_params, language=language
)
before_enrollment = before.to_dict()['enrollment']
before_user_registered = before.to_dict()['user']
before_allowed = before.to_dict()['allowed']
after_enrollment = after.to_dict()['enrollment']
after_allowed = after.to_dict()['allowed']
if before_user_registered:
if after_enrollment:
if before_enrollment:
state_transition = ENROLLED_TO_ENROLLED
else:
if before_allowed:
state_transition = ALLOWEDTOENROLL_TO_ENROLLED
else:
state_transition = UNENROLLED_TO_ENROLLED
else:
if after_allowed:
state_transition = UNENROLLED_TO_ALLOWEDTOENROLL
elif action == 'unenroll':
before, after = unenroll_email(
course_id, email, email_students, email_params, language=language
)
before_enrollment = before.to_dict()['enrollment']
before_allowed = before.to_dict()['allowed']
enrollment_obj = CourseEnrollment.get_enrollment(user, course_id)
if before_enrollment:
state_transition = ENROLLED_TO_UNENROLLED
else:
if before_allowed:
state_transition = ALLOWEDTOENROLL_TO_UNENROLLED
else:
state_transition = UNENROLLED_TO_UNENROLLED
else:
return HttpResponseBadRequest(strip_tags(
"Unrecognized action '{}'".format(action)
))
except ValidationError:
# Flag this email as an error if invalid, but continue checking
# the remaining in the list
results.append({
'identifier': identifier,
'invalidIdentifier': True,
})
except Exception as exc: # pylint: disable=broad-except
# catch and log any exceptions
# so that one error doesn't cause a 500.
log.exception(u"Error while #{}ing student")
log.exception(exc)
results.append({
'identifier': identifier,
'error': True,
})
else:
ManualEnrollmentAudit.create_manual_enrollment_audit(
request.user, email, state_transition, reason, enrollment_obj
)
results.append({
'identifier': identifier,
'before': before.to_dict(),
'after': after.to_dict(),
})
response_payload = {
'action': action,
'results': results,
'auto_enroll': auto_enroll,
}
return JsonResponse(response_payload)
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_level('instructor')
@common_exceptions_400
@require_post_params(
identifiers="stringified list of emails and/or usernames",
action="add or remove",
)
def bulk_beta_modify_access(request, course_id):
"""
Enroll or unenroll users in beta testing program.
Query parameters:
- identifiers is string containing a list of emails and/or usernames separated by
anything split_input_list can handle.
- action is one of ['add', 'remove']
"""
course_id = SlashSeparatedCourseKey.from_deprecated_string(course_id)
action = request.POST.get('action')
identifiers_raw = request.POST.get('identifiers')
identifiers = _split_input_list(identifiers_raw)
email_students = request.POST.get('email_students') in ['true', 'True', True]
auto_enroll = request.POST.get('auto_enroll') in ['true', 'True', True]
results = []
rolename = 'beta'
course = get_course_by_id(course_id)
email_params = {}
if email_students:
secure = request.is_secure()
email_params = get_email_params(course, auto_enroll=auto_enroll, secure=secure)
for identifier in identifiers:
try:
error = False
user_does_not_exist = False
user = get_student_from_identifier(identifier)
if action == 'add':
allow_access(course, user, rolename)
elif action == 'remove':
revoke_access(course, user, rolename)
else:
return HttpResponseBadRequest(strip_tags(
"Unrecognized action '{}'".format(action)
))
except User.DoesNotExist:
error = True
user_does_not_exist = True
# catch and log any unexpected exceptions
# so that one error doesn't cause a 500.
except Exception as exc: # pylint: disable=broad-except
log.exception(u"Error while #{}ing student")
log.exception(exc)
error = True
else:
# If no exception thrown, see if we should send an email
if email_students:
send_beta_role_email(action, user, email_params)
# See if we should autoenroll the student
if auto_enroll:
# Check if student is already enrolled
if not CourseEnrollment.is_enrolled(user, course_id):
CourseEnrollment.enroll(user, course_id)
finally:
# Tabulate the action result of this email address
results.append({
'identifier': identifier,
'error': error,
'userDoesNotExist': user_does_not_exist
})
response_payload = {
'action': action,
'results': results,
}
return JsonResponse(response_payload)
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_level('instructor')
@common_exceptions_400
@require_post_params(
unique_student_identifier="email or username of user to change access",
rolename="'instructor', 'staff', 'beta', or 'ccx_coach'",
action="'allow' or 'revoke'"
)
def modify_access(request, course_id):
"""
Modify staff/instructor access of other user.
Requires instructor access.
NOTE: instructors cannot remove their own instructor access.
Query parameters:
unique_student_identifer is the target user's username or email
rolename is one of ['instructor', 'staff', 'beta', 'ccx_coach']
action is one of ['allow', 'revoke']
"""
course_id = SlashSeparatedCourseKey.from_deprecated_string(course_id)
course = get_course_with_access(
request.user, 'instructor', course_id, depth=None
)
try:
user = get_student_from_identifier(request.POST.get('unique_student_identifier'))
except User.DoesNotExist:
response_payload = {
'unique_student_identifier': request.POST.get('unique_student_identifier'),
'userDoesNotExist': True,
}
return JsonResponse(response_payload)
# Check that user is active, because add_users
# in common/djangoapps/student/roles.py fails
# silently when we try to add an inactive user.
if not user.is_active:
response_payload = {
'unique_student_identifier': user.username,
'inactiveUser': True,
}
return JsonResponse(response_payload)
rolename = request.POST.get('rolename')
action = request.POST.get('action')
if rolename not in ROLES:
error = strip_tags("unknown rolename '{}'".format(rolename))
log.error(error)
return HttpResponseBadRequest(error)
# disallow instructors from removing their own instructor access.
if rolename == 'instructor' and user == request.user and action != 'allow':
response_payload = {
'unique_student_identifier': user.username,
'rolename': rolename,
'action': action,
'removingSelfAsInstructor': True,
}
return JsonResponse(response_payload)
if action == 'allow':
allow_access(course, user, rolename)
elif action == 'revoke':
revoke_access(course, user, rolename)
else:
return HttpResponseBadRequest(strip_tags(
"unrecognized action '{}'".format(action)
))
response_payload = {
'unique_student_identifier': user.username,
'rolename': rolename,
'action': action,
'success': 'yes',
}
return JsonResponse(response_payload)
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_level('instructor')
@require_post_params(rolename="'instructor', 'staff', or 'beta'")
def list_course_role_members(request, course_id):
"""
List instructors and staff.
Requires instructor access.
rolename is one of ['instructor', 'staff', 'beta', 'ccx_coach']
Returns JSON of the form {
"course_id": "some/course/id",
"staff": [
{
"username": "staff1",
"email": "[email protected]",
"first_name": "Joe",
"last_name": "Shmoe",
}
]
}
"""
course_id = SlashSeparatedCourseKey.from_deprecated_string(course_id)
course = get_course_with_access(
request.user, 'instructor', course_id, depth=None
)
rolename = request.POST.get('rolename')
if rolename not in ROLES:
return HttpResponseBadRequest()
def extract_user_info(user):
""" convert user into dicts for json view """
return {
'username': user.username,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
}
response_payload = {
'course_id': course_id.to_deprecated_string(),
rolename: map(extract_user_info, list_with_level(
course, rolename
)),
}
return JsonResponse(response_payload)
@transaction.non_atomic_requests
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_level('staff')
def get_problem_responses(request, course_id):
"""
Initiate generation of a CSV file containing all student answers
to a given problem.
Responds with JSON
{"status": "... status message ..."}
if initiation is successful (or generation task is already running).
Responds with BadRequest if problem location is faulty.
"""
course_key = CourseKey.from_string(course_id)
problem_location = request.POST.get('problem_location', '')
try:
problem_key = UsageKey.from_string(problem_location)
# Are we dealing with an "old-style" problem location?
run = problem_key.run
if not run:
problem_key = course_key.make_usage_key_from_deprecated_string(problem_location)
if problem_key.course_key != course_key:
raise InvalidKeyError(type(problem_key), problem_key)
except InvalidKeyError:
return JsonResponseBadRequest(_("Could not find problem with this location."))
try:
instructor_task.api.submit_calculate_problem_responses_csv(request, course_key, problem_location)
success_status = _(
"The problem responses report is being created."
" To view the status of the report, see Pending Tasks below."
)
return JsonResponse({"status": success_status})
except AlreadyRunningError:
already_running_status = _(
"A problem responses report generation task is already in progress. "
"Check the 'Pending Tasks' table for the status of the task. "
"When completed, the report will be available for download in the table below."
)
return JsonResponse({"status": already_running_status})
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)