diff --git a/lms/djangoapps/instructor/tests/test_api.py b/lms/djangoapps/instructor/tests/test_api.py index 2548cc4f411d..b0e533ee6f7f 100644 --- a/lms/djangoapps/instructor/tests/test_api.py +++ b/lms/djangoapps/instructor/tests/test_api.py @@ -642,6 +642,25 @@ def setUp(self): last_name='Student' ) + def test_api_without_login(self): + """ + verify in case of no authentication it returns 401. + """ + self.client.logout() + uploaded_file = SimpleUploadedFile("temp.jpg", io.BytesIO(b"some initial binary data: \x00\x01").read()) + response = self.client.post(self.url, {'students_list': uploaded_file}) + assert response.status_code == 401 + + def test_api_without_permission(self): + """ + verify in case of no authentication it returns 403. + """ + # removed role from course for instructor + CourseInstructorRole(self.course.id).remove_users(self.instructor) + uploaded_file = SimpleUploadedFile("temp.jpg", io.BytesIO(b"some initial binary data: \x00\x01").read()) + response = self.client.post(self.url, {'students_list': uploaded_file}) + assert response.status_code == 403 + @patch('lms.djangoapps.instructor.views.api.log.info') @ddt.data( b"test_student@example.com,test_student_1,tester1,USA", # Typical use case. diff --git a/lms/djangoapps/instructor/views/api.py b/lms/djangoapps/instructor/views/api.py index 6abfc81e75dd..ceb699cd9054 100644 --- a/lms/djangoapps/instructor/views/api.py +++ b/lms/djangoapps/instructor/views/api.py @@ -283,299 +283,305 @@ def wrapped(request, course_id): return wrapped -@require_POST -@ensure_csrf_cookie -@cache_control(no_cache=True, no_store=True, must_revalidate=True) -@require_course_permission(permissions.CAN_ENROLL) -def register_and_enroll_students(request, course_id): # pylint: disable=too-many-statements +@method_decorator(cache_control(no_cache=True, no_store=True, must_revalidate=True), name='dispatch') +class RegisterAndEnrollStudents(APIView): """ Create new account and Enroll students in this course. - Passing a csv file that contains a list of students. - Order in csv should be the following email = 0; username = 1; name = 2; country = 3. - If there are more than 4 columns in the csv: cohort = 4, course mode = 5. - Requires staff access. - - -If the email address and username already exists and the user is enrolled in the course, - do nothing (including no email gets sent out) - - -If the email address already exists, but the username is different, - match on the email address only and continue to enroll the user in the course using the email address - as the matching criteria. Note the change of username as a warning message (but not a failure). - Send a standard enrollment email which is the same as the existing manual enrollment - - -If the username already exists (but not the email), assume it is a different user and fail - to create the new account. - The failure will be messaged in a response in the browser. """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.CAN_ENROLL - if not configuration_helpers.get_value( - 'ALLOW_AUTOMATED_SIGNUPS', - settings.FEATURES.get('ALLOW_AUTOMATED_SIGNUPS', False), - ): - return HttpResponseForbidden() + @method_decorator(ensure_csrf_cookie) + def post(self, request, course_id): # pylint: disable=too-many-statements + """ + Create new account and Enroll students in this course. + Passing a csv file that contains a list of students. + Order in csv should be the following email = 0; username = 1; name = 2; country = 3. + If there are more than 4 columns in the csv: cohort = 4, course mode = 5. + Requires staff access. + + -If the email address and username already exists and the user is enrolled in the course, + do nothing (including no email gets sent out) + + -If the email address already exists, but the username is different, + match on the email address only and continue to enroll the user in the course using the email address + as the matching criteria. Note the change of username as a warning message (but not a failure). + Send a standard enrollment email which is the same as the existing manual enrollment + + -If the username already exists (but not the email), assume it is a different user and fail + to create the new account. + The failure will be messaged in a response in the browser. + """ + if not configuration_helpers.get_value( + 'ALLOW_AUTOMATED_SIGNUPS', + settings.FEATURES.get('ALLOW_AUTOMATED_SIGNUPS', False), + ): + return HttpResponseForbidden() - course_id = CourseKey.from_string(course_id) - warnings = [] - row_errors = [] - general_errors = [] + course_id = CourseKey.from_string(course_id) + warnings = [] + row_errors = [] + general_errors = [] - # email-students is a checkbox input type; will be present in POST if checked, absent otherwise - notify_by_email = 'email-students' in request.POST + # email-students is a checkbox input type; will be present in POST if checked, absent otherwise + notify_by_email = 'email-students' in request.POST - # for white labels we use 'shopping cart' which uses CourseMode.HONOR as - # course mode for creating course enrollments. - if CourseMode.is_white_label(course_id): - default_course_mode = CourseMode.HONOR - else: - default_course_mode = None + # for white labels we use 'shopping cart' which uses CourseMode.HONOR as + # course mode for creating course enrollments. + if CourseMode.is_white_label(course_id): + default_course_mode = CourseMode.HONOR + else: + default_course_mode = None - # Allow bulk enrollments in all non-expired course modes including "credit" (which is non-selectable) - valid_course_modes = set(map(lambda x: x.slug, CourseMode.modes_for_course( - course_id=course_id, - only_selectable=False, - include_expired=False, - ))) + # Allow bulk enrollments in all non-expired course modes including "credit" (which is non-selectable) + valid_course_modes = set(map(lambda x: x.slug, CourseMode.modes_for_course( + course_id=course_id, + only_selectable=False, + include_expired=False, + ))) - if 'students_list' in request.FILES: # lint-amnesty, pylint: disable=too-many-nested-blocks - students = [] + if 'students_list' in request.FILES: # lint-amnesty, pylint: disable=too-many-nested-blocks + students = [] - try: - upload_file = request.FILES.get('students_list') - if upload_file.name.endswith('.csv'): - students = list(csv.reader(upload_file.read().decode('utf-8-sig').splitlines())) - course = get_course_by_id(course_id) - else: + try: + upload_file = request.FILES.get('students_list') + if upload_file.name.endswith('.csv'): + students = list(csv.reader(upload_file.read().decode('utf-8-sig').splitlines())) + course = get_course_by_id(course_id) + else: + general_errors.append({ + 'username': '', 'email': '', + 'response': _( + 'Make sure that the file you upload is in CSV format with no ' + 'extraneous characters or rows.') + }) + + except Exception: # pylint: disable=broad-except general_errors.append({ - 'username': '', 'email': '', - 'response': _( - 'Make sure that the file you upload is in CSV format with no extraneous characters or rows.') + 'username': '', 'email': '', 'response': _('Could not read uploaded file.') }) + finally: + upload_file.close() + + generated_passwords = [] + # To skip fetching cohorts from the DB while iterating on students, + # {: CourseUserGroup} + cohorts_cache = {} + already_warned_not_cohorted = False + extra_fields_is_enabled = configuration_helpers.get_value( + 'ENABLE_AUTOMATED_SIGNUPS_EXTRA_FIELDS', + settings.FEATURES.get('ENABLE_AUTOMATED_SIGNUPS_EXTRA_FIELDS', False), + ) - except Exception: # pylint: disable=broad-except - general_errors.append({ - 'username': '', 'email': '', 'response': _('Could not read uploaded file.') - }) - finally: - upload_file.close() - - generated_passwords = [] - # To skip fetching cohorts from the DB while iterating on students, - # {: CourseUserGroup} - cohorts_cache = {} - already_warned_not_cohorted = False - extra_fields_is_enabled = configuration_helpers.get_value( - 'ENABLE_AUTOMATED_SIGNUPS_EXTRA_FIELDS', - settings.FEATURES.get('ENABLE_AUTOMATED_SIGNUPS_EXTRA_FIELDS', False), - ) - - # Iterate each student in the uploaded csv file. - for row_num, student in enumerate(students, 1): + # Iterate each student in the uploaded csv file. + for row_num, student in enumerate(students, 1): - # Verify that we have the expected number of columns in every row - # but allow for blank lines. - if not student: - continue + # Verify that we have the expected number of columns in every row + # but allow for blank lines. + if not student: + continue - if extra_fields_is_enabled: - is_valid_csv = 4 <= len(student) <= 6 - error = _('Data in row #{row_num} must have between four and six columns: ' - 'email, username, full name, country, cohort, and course mode. ' - 'The last two columns are optional.').format(row_num=row_num) - else: - is_valid_csv = len(student) == 4 - error = _('Data in row #{row_num} must have exactly four columns: ' - 'email, username, full name, and country.').format(row_num=row_num) + if extra_fields_is_enabled: + is_valid_csv = 4 <= len(student) <= 6 + error = _('Data in row #{row_num} must have between four and six columns: ' + 'email, username, full name, country, cohort, and course mode. ' + 'The last two columns are optional.').format(row_num=row_num) + else: + is_valid_csv = len(student) == 4 + error = _('Data in row #{row_num} must have exactly four columns: ' + 'email, username, full name, and country.').format(row_num=row_num) + + if not is_valid_csv: + general_errors.append({ + 'username': '', + 'email': '', + 'response': error + }) + continue - if not is_valid_csv: - general_errors.append({ - 'username': '', - 'email': '', - 'response': error - }) - continue + # Extract each column, handle optional columns if they exist. + email, username, name, country, *optional_cols = student + if optional_cols: + optional_cols.append(default_course_mode) + cohort_name, course_mode, *_tail = optional_cols + else: + cohort_name = None + course_mode = None - # Extract each column, handle optional columns if they exist. - email, username, name, country, *optional_cols = student - if optional_cols: - optional_cols.append(default_course_mode) - cohort_name, course_mode, *_tail = optional_cols - else: - cohort_name = None - course_mode = None + # Validate cohort name, and get the cohort object. Skip if course + # is not cohorted. + cohort = None - # Validate cohort name, and get the cohort object. Skip if course - # is not cohorted. - cohort = None + if cohort_name and not already_warned_not_cohorted: + if not is_course_cohorted(course_id): + row_errors.append({ + 'username': username, + 'email': email, + 'response': _('Course is not cohorted but cohort provided. ' + 'Ignoring cohort assignment for all users.') + }) + already_warned_not_cohorted = True + elif cohort_name in cohorts_cache: + cohort = cohorts_cache[cohort_name] + else: + # Don't attempt to create cohort or assign student if cohort + # does not exist. + try: + cohort = get_cohort_by_name(course_id, cohort_name) + except CourseUserGroup.DoesNotExist: + row_errors.append({ + 'username': username, + 'email': email, + 'response': _('Cohort name not found: {cohort}. ' + 'Ignoring cohort assignment for ' + 'all users.').format(cohort=cohort_name) + }) + cohorts_cache[cohort_name] = cohort + + # Validate course mode. + if not course_mode: + course_mode = default_course_mode + + if (course_mode is not None + and course_mode not in valid_course_modes): + # If `default is None` and the user is already enrolled, + # `CourseEnrollment.change_mode()` will not update the mode, + # hence two error messages. + if default_course_mode is None: + err_msg = _( + 'Invalid course mode: {mode}. Falling back to the ' + 'default mode, or keeping the current mode in case the ' + 'user is already enrolled.' + ).format(mode=course_mode) + else: + err_msg = _( + 'Invalid course mode: {mode}. Failling back to ' + '{default_mode}, or resetting to {default_mode} in case ' + 'the user is already enrolled.' + ).format(mode=course_mode, default_mode=default_course_mode) + row_errors.append({ + 'username': username, + 'email': email, + 'response': err_msg, + }) + course_mode = default_course_mode - if cohort_name and not already_warned_not_cohorted: - if not is_course_cohorted(course_id): + email_params = get_email_params(course, True, secure=request.is_secure()) + try: + validate_email(email) # Raises ValidationError if invalid + except ValidationError: row_errors.append({ 'username': username, 'email': email, - 'response': _('Course is not cohorted but cohort provided. ' - 'Ignoring cohort assignment for all users.') + 'response': _('Invalid email {email_address}.').format(email_address=email) }) - already_warned_not_cohorted = True - elif cohort_name in cohorts_cache: - cohort = cohorts_cache[cohort_name] else: - # Don't attempt to create cohort or assign student if cohort - # does not exist. - try: - cohort = get_cohort_by_name(course_id, cohort_name) - except CourseUserGroup.DoesNotExist: + if User.objects.filter(email=email).exists(): + # Email address already exists. assume it is the correct user + # and just register the user in the course and send an enrollment email. + user = User.objects.get(email=email) + + # see if it is an exact match with email and username + # if it's not an exact match then just display a warning message, but continue onwards + if not User.objects.filter(email=email, username=username).exists(): + warning_message = _( + 'An account with email {email} exists but the provided username {username} ' + 'is different. Enrolling anyway with {email}.' + ).format(email=email, username=username) + + warnings.append({ + 'username': username, 'email': email, 'response': warning_message + }) + log.warning('email %s already exist', email) + else: + log.info( + "user already exists with username '%s' and email '%s'", + username, + email + ) + + # enroll a user if it is not already enrolled. + if not is_user_enrolled_in_course(user, course_id): + # Enroll user to the course and add manual enrollment audit trail + create_manual_course_enrollment( + user=user, + course_id=course_id, + mode=course_mode, + enrolled_by=request.user, + reason='Enrolling via csv upload', + state_transition=UNENROLLED_TO_ENROLLED, + ) + enroll_email(course_id=course_id, + student_email=email, + auto_enroll=True, + email_students=notify_by_email, + email_params=email_params) + else: + # update the course mode if already enrolled + existing_enrollment = CourseEnrollment.get_enrollment(user, course_id) + if existing_enrollment.mode != course_mode: + existing_enrollment.change_mode(mode=course_mode) + if cohort: + try: + add_user_to_cohort(cohort, user) + except ValueError: + # user already in this cohort; ignore + pass + elif is_email_retired(email): + # We are either attempting to enroll a retired user or create a new user with an email which is + # already associated with a retired account. Simply block these attempts. row_errors.append({ 'username': username, 'email': email, - 'response': _('Cohort name not found: {cohort}. ' - 'Ignoring cohort assignment for ' - 'all users.').format(cohort=cohort_name) + 'response': _('Invalid email {email_address}.').format(email_address=email), }) - cohorts_cache[cohort_name] = cohort - - # Validate course mode. - if not course_mode: - course_mode = default_course_mode - - if (course_mode is not None - and course_mode not in valid_course_modes): - # If `default is None` and the user is already enrolled, - # `CourseEnrollment.change_mode()` will not update the mode, - # hence two error messages. - if default_course_mode is None: - err_msg = _( - 'Invalid course mode: {mode}. Falling back to the ' - 'default mode, or keeping the current mode in case the ' - 'user is already enrolled.' - ).format(mode=course_mode) - else: - err_msg = _( - 'Invalid course mode: {mode}. Failling back to ' - '{default_mode}, or resetting to {default_mode} in case ' - 'the user is already enrolled.' - ).format(mode=course_mode, default_mode=default_course_mode) - row_errors.append({ - 'username': username, - 'email': email, - 'response': err_msg, - }) - course_mode = default_course_mode - - email_params = get_email_params(course, True, secure=request.is_secure()) - try: - validate_email(email) # Raises ValidationError if invalid - except ValidationError: - row_errors.append({ - 'username': username, - 'email': email, - 'response': _('Invalid email {email_address}.').format(email_address=email) - }) - else: - if User.objects.filter(email=email).exists(): - # Email address already exists. assume it is the correct user - # and just register the user in the course and send an enrollment email. - user = User.objects.get(email=email) - - # see if it is an exact match with email and username - # if it's not an exact match then just display a warning message, but continue onwards - if not User.objects.filter(email=email, username=username).exists(): - warning_message = _( - 'An account with email {email} exists but the provided username {username} ' - 'is different. Enrolling anyway with {email}.' - ).format(email=email, username=username) - - warnings.append({ - 'username': username, 'email': email, 'response': warning_message - }) - log.warning('email %s already exist', email) + log.warning('Email address %s is associated with a retired user, so course enrollment was ' + # lint-amnesty, pylint: disable=logging-not-lazy + 'blocked.', email) else: - log.info( - "user already exists with username '%s' and email '%s'", + # This email does not yet exist, so we need to create a new account + # If username already exists in the database, then create_and_enroll_user + # will raise an IntegrityError exception. + password = generate_unique_password(generated_passwords) + errors = create_and_enroll_user( + email, username, - email + name, + country, + password, + course_id, + course_mode, + request.user, + email_params, + email_user=notify_by_email, ) + row_errors.extend(errors) + if cohort: + try: + add_user_to_cohort(cohort, email) + except ValueError: + # user already in this cohort; ignore + # NOTE: Checking this here may be unnecessary if we can prove that a + # new user will never be + # automatically assigned to a cohort from the above. + pass + except ValidationError: + row_errors.append({ + 'username': username, + 'email': email, + 'response': _('Invalid email {email_address}.').format(email_address=email), + }) - # enroll a user if it is not already enrolled. - if not is_user_enrolled_in_course(user, course_id): - # Enroll user to the course and add manual enrollment audit trail - create_manual_course_enrollment( - user=user, - course_id=course_id, - mode=course_mode, - enrolled_by=request.user, - reason='Enrolling via csv upload', - state_transition=UNENROLLED_TO_ENROLLED, - ) - enroll_email(course_id=course_id, - student_email=email, - auto_enroll=True, - email_students=notify_by_email, - email_params=email_params) - else: - # update the course mode if already enrolled - existing_enrollment = CourseEnrollment.get_enrollment(user, course_id) - if existing_enrollment.mode != course_mode: - existing_enrollment.change_mode(mode=course_mode) - if cohort: - try: - add_user_to_cohort(cohort, user) - except ValueError: - # user already in this cohort; ignore - pass - elif is_email_retired(email): - # We are either attempting to enroll a retired user or create a new user with an email which is - # already associated with a retired account. Simply block these attempts. - row_errors.append({ - 'username': username, - 'email': email, - 'response': _('Invalid email {email_address}.').format(email_address=email), - }) - log.warning('Email address %s is associated with a retired user, so course enrollment was ' + # lint-amnesty, pylint: disable=logging-not-lazy - 'blocked.', email) - else: - # This email does not yet exist, so we need to create a new account - # If username already exists in the database, then create_and_enroll_user - # will raise an IntegrityError exception. - password = generate_unique_password(generated_passwords) - errors = create_and_enroll_user( - email, - username, - name, - country, - password, - course_id, - course_mode, - request.user, - email_params, - email_user=notify_by_email, - ) - row_errors.extend(errors) - if cohort: - try: - add_user_to_cohort(cohort, email) - except ValueError: - # user already in this cohort; ignore - # NOTE: Checking this here may be unnecessary if we can prove that a new user will never be - # automatically assigned to a cohort from the above. - pass - except ValidationError: - row_errors.append({ - 'username': username, - 'email': email, - 'response': _('Invalid email {email_address}.').format(email_address=email), - }) - - else: - general_errors.append({ - 'username': '', 'email': '', 'response': _('File is not attached.') - }) + else: + general_errors.append({ + 'username': '', 'email': '', 'response': _('File is not attached.') + }) - results = { - 'row_errors': row_errors, - 'general_errors': general_errors, - 'warnings': warnings - } - return JsonResponse(results) + results = { + 'row_errors': row_errors, + 'general_errors': general_errors, + 'warnings': warnings + } + return JsonResponse(results) def generate_random_string(length): diff --git a/lms/djangoapps/instructor/views/api_urls.py b/lms/djangoapps/instructor/views/api_urls.py index fc9d5a342773..08b68b0760d4 100644 --- a/lms/djangoapps/instructor/views/api_urls.py +++ b/lms/djangoapps/instructor/views/api_urls.py @@ -22,7 +22,7 @@ urlpatterns = [ path('students_update_enrollment', api.students_update_enrollment, name='students_update_enrollment'), - path('register_and_enroll_students', api.register_and_enroll_students, name='register_and_enroll_students'), + path('register_and_enroll_students', api.RegisterAndEnrollStudents.as_view(), name='register_and_enroll_students'), path('list_course_role_members', api.ListCourseRoleMembersView.as_view(), name='list_course_role_members'), path('modify_access', api.modify_access, name='modify_access'), path('bulk_beta_modify_access', api.bulk_beta_modify_access, name='bulk_beta_modify_access'),