diff --git a/edx_exams/apps/core/signals/handlers.py b/edx_exams/apps/core/signals/handlers.py index f8573180..edf7a8c6 100644 --- a/edx_exams/apps/core/signals/handlers.py +++ b/edx_exams/apps/core/signals/handlers.py @@ -5,6 +5,8 @@ from django.dispatch import receiver from openedx_events.event_bus import get_producer from openedx_events.learning.signals import ( + COURSE_ACCESS_ROLE_ADDED, + COURSE_ACCESS_ROLE_REMOVED, EXAM_ATTEMPT_ERRORED, EXAM_ATTEMPT_REJECTED, EXAM_ATTEMPT_RESET, @@ -12,8 +14,13 @@ EXAM_ATTEMPT_VERIFIED ) +from edx_exams.apps.core.models import CourseStaffRole, User + topic_name = getattr(settings, 'EXAM_ATTEMPT_EVENTS_KAFKA_TOPIC_NAME', '') +# list of roles that grant access to instructor features for exams +COURSE_STAFF_ROLES = ['staff', 'instructor', 'limited_staff'] + @receiver(EXAM_ATTEMPT_SUBMITTED) def listen_for_exam_attempt_submitted(sender, signal, **kwargs): # pylint: disable=unused-argument @@ -83,3 +90,47 @@ def listen_for_exam_attempt_reset(sender, signal, **kwargs): # pylint: disable= event_data={'exam_attempt': kwargs['exam_attempt']}, event_metadata=kwargs['metadata'], ) + + +@receiver(COURSE_ACCESS_ROLE_ADDED) +def listen_for_course_access_role_added(sender, signal, **kwargs): # pylint: disable=unused-argument + """ + Recieve COURSE_ACCESS_ROLE_ADDED signal from the event bus + """ + event_data = kwargs['course_access_role_data'] + user_data = event_data.user + course_key = event_data.course_key + role = event_data.role + + if role not in COURSE_STAFF_ROLES: + return + + user, created = User.objects.get_or_create( # pylint: disable=unused-variable + username=user_data.pii.username, + email=user_data.pii.email, + ) + CourseStaffRole.objects.get_or_create( + user=user, + course_id=course_key, + role=role, + ) + + +@receiver(COURSE_ACCESS_ROLE_REMOVED) +def listen_for_course_access_role_removed(sender, signal, **kwargs): # pylint: disable=unused-argument + """ + Recieve COURSE_ACCESS_ROLE_REMOVED signal from the event bus + """ + event_data = kwargs['course_access_role_data'] + user_data = event_data.user + course_key = event_data.course_key + role = event_data.role + + if role not in COURSE_STAFF_ROLES: + return + + CourseStaffRole.objects.filter( + user__username=user_data.pii.username, + course_id=course_key, + role=role, + ).delete() diff --git a/edx_exams/apps/core/test_utils/factories.py b/edx_exams/apps/core/test_utils/factories.py index a95352c5..b7fbf7c4 100644 --- a/edx_exams/apps/core/test_utils/factories.py +++ b/edx_exams/apps/core/test_utils/factories.py @@ -11,6 +11,7 @@ from edx_exams.apps.core.models import ( AssessmentControlResult, CourseExamConfiguration, + CourseStaffRole, Exam, ExamAttempt, ProctoringProvider @@ -115,3 +116,15 @@ class Meta: incident_time = datetime.datetime.now() - datetime.timedelta(hours=1) severity = 1 reason_code = '1' + + +class CourseStaffRoleFactory(DjangoModelFactory): + """ + Factory to create course staff roles + """ + class Meta: + model = CourseStaffRole + + user = factory.SubFactory(UserFactory) + course_id = 'course-v1:edX+Test+Test_Course' + role = 'staff' diff --git a/edx_exams/apps/core/tests/test_handlers.py b/edx_exams/apps/core/tests/test_handlers.py new file mode 100644 index 00000000..9d81adb5 --- /dev/null +++ b/edx_exams/apps/core/tests/test_handlers.py @@ -0,0 +1,157 @@ +""" +Test event handlers +""" +import uuid +from datetime import datetime, timezone + +import ddt +from django.test import TestCase +from openedx_events.data import EventsMetadata +from openedx_events.learning.data import CourseAccessRoleData, UserData, UserPersonalData +from openedx_events.learning.signals import COURSE_ACCESS_ROLE_ADDED, COURSE_ACCESS_ROLE_REMOVED + +from edx_exams.apps.core.models import CourseStaffRole, User +from edx_exams.apps.core.signals.handlers import ( + listen_for_course_access_role_added, + listen_for_course_access_role_removed +) +from edx_exams.apps.core.test_utils.factories import CourseStaffRoleFactory, UserFactory + + +@ddt.ddt +class TestCourseRoleEvents(TestCase): + """ + Test course role events + """ + def setUp(self): + super().setUp() + self.course_id = 'course-v1:edx+test+2020' + self.existing_user = UserFactory( + username='test_user_exists', email='test_user_exists@example.com' + ) + self.existing_user_with_staff_role = UserFactory( + username='test_user_staff', email='test_user_staff@example.com' + ) + self.existing_user_with_instructor_role = UserFactory( + username='test_user_instructor', email='test_user_instructor@example.com' + ) + + CourseStaffRoleFactory( + user=self.existing_user_with_staff_role, + course_id=self.course_id, + role='staff', + ) + CourseStaffRoleFactory( + user=self.existing_user_with_instructor_role, + course_id=self.course_id, + role='instructor', + ) + + @staticmethod + def _get_event_data(course_id, username, role): + """ create event data object """ + return CourseAccessRoleData( + org_key='edx', + course_key=course_id, + role=role, + user=UserData( + pii=UserPersonalData( + username=username, + email=f'{username}@example.com', + ), + id=123, + is_active=True, + ), + ) + + @staticmethod + def _get_event_metadata(event_signal): + """ create metadata object for event """ + return EventsMetadata( + event_type=event_signal.event_type, + id=uuid.uuid4(), + minorversion=0, + source='openedx/lms/web', + sourcehost='lms.test', + time=datetime.now(timezone.utc), + ) + + @ddt.data( + ('test_user_1', 'staff', True), + ('test_user_2', 'limited_staff', True), + ('test_user_3', 'instructor', True), + ('test_user_exists', 'staff', True), + ('test_user_exists', 'other', False), + ('test_user_staff', 'staff', True), # test duplicate event + ('test_user_staff', 'instructor', True), # test multiple roles + ) + @ddt.unpack + def test_course_access_role_added(self, username, role, expect_staff_access): + """ + Test CourseStaffRole is created on receiving COURSE_ACCESS_ROLE_ADDED event + with a role that grants staff access to exams + """ + role_event_data = self._get_event_data(self.course_id, username, role) + event_metadata = self._get_event_metadata(COURSE_ACCESS_ROLE_ADDED) + event_kwargs = { + 'course_access_role_data': role_event_data, + 'metadata': event_metadata, + } + listen_for_course_access_role_added(None, COURSE_ACCESS_ROLE_ADDED, **event_kwargs) + + user = User.objects.get(username=username) + self.assertEqual(user.has_course_staff_permission(self.course_id), expect_staff_access) + + @ddt.data( + ('test_user_staff', 'staff', False), + ('test_user_instructor', 'instructor', False), + ('test_user_staff', 'limited_staff', True), + ('test_user_staff', 'other', True), + ('test_user_dne', 'other', False), + ) + @ddt.unpack + def test_course_access_role_removed(self, username, role, expect_staff_access): + """ + Test CourseStaffRole is deleted on receiving COURSE_ACCESS_ROLE_REMOVED event + """ + role_event_data = self._get_event_data(self.course_id, username, role) + event_metadata = self._get_event_metadata(COURSE_ACCESS_ROLE_REMOVED) + event_kwargs = { + 'course_access_role_data': role_event_data, + 'metadata': event_metadata, + } + listen_for_course_access_role_removed(None, COURSE_ACCESS_ROLE_REMOVED, **event_kwargs) + + if username == 'test_user_dne': + # this user should not be created (and therefore has no permissions) + self.assertFalse(User.objects.filter(username=username).exists()) + else: + user = User.objects.get(username=username) + self.assertEqual(user.has_course_staff_permission(self.course_id), expect_staff_access) + + def test_course_access_role_remove_single_role(self): + """ + Test correct role is removed for user with multiple roles + """ + CourseStaffRoleFactory( + user=self.existing_user_with_staff_role, + course_id=self.course_id, + role='instructor', + ) + + role_event_data = self._get_event_data(self.course_id, 'test_user_staff', 'staff') + event_metadata = self._get_event_metadata(COURSE_ACCESS_ROLE_REMOVED) + event_kwargs = { + 'course_access_role_data': role_event_data, + 'metadata': event_metadata, + } + listen_for_course_access_role_removed(None, COURSE_ACCESS_ROLE_REMOVED, **event_kwargs) + + roles = [ + staff_role.role for staff_role in + CourseStaffRole.objects.filter(user=self.existing_user_with_staff_role) + ] + self.assertEqual( + roles, + ['instructor'] + )