Skip to content

Commit

Permalink
feat: handle instructor permission events
Browse files Browse the repository at this point in the history
  • Loading branch information
zacharis278 committed Feb 27, 2024
1 parent 980719c commit 096ba43
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 0 deletions.
51 changes: 51 additions & 0 deletions edx_exams/apps/core/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
from django.dispatch import receiver
from openedx_events.event_bus import get_producer
from openedx_events.learning.signals import (
COURSE_ACCESS_ROLE_ADDED,
COURSE_ACCESS_ROLE_REMOVED,
EXAM_ATTEMPT_ERRORED,
EXAM_ATTEMPT_REJECTED,
EXAM_ATTEMPT_RESET,
EXAM_ATTEMPT_SUBMITTED,
EXAM_ATTEMPT_VERIFIED
)

from edx_exams.apps.core.models import CourseStaffRole, User

topic_name = getattr(settings, 'EXAM_ATTEMPT_EVENTS_KAFKA_TOPIC_NAME', '')

# list of roles that grant access to instructor features for exams
COURSE_STAFF_ROLES = ['staff', 'instructor', 'limited_staff']


@receiver(EXAM_ATTEMPT_SUBMITTED)
def listen_for_exam_attempt_submitted(sender, signal, **kwargs): # pylint: disable=unused-argument
Expand Down Expand Up @@ -83,3 +90,47 @@ def listen_for_exam_attempt_reset(sender, signal, **kwargs): # pylint: disable=
event_data={'exam_attempt': kwargs['exam_attempt']},
event_metadata=kwargs['metadata'],
)


@receiver(COURSE_ACCESS_ROLE_ADDED)
def listen_for_course_access_role_added(sender, signal, **kwargs): # pylint: disable=unused-argument
"""
Recieve COURSE_ACCESS_ROLE_ADDED signal from the event bus
"""
event_data = kwargs['course_access_role_data']
user_data = event_data.user
course_key = event_data.course_key
role = event_data.role

if role not in COURSE_STAFF_ROLES:
return

user, created = User.objects.get_or_create( # pylint: disable=unused-variable
username=user_data.pii.username,
email=user_data.pii.email,
)
CourseStaffRole.objects.get_or_create(
user=user,
course_id=course_key,
role=role,
)


@receiver(COURSE_ACCESS_ROLE_REMOVED)
def listen_for_course_access_role_removed(sender, signal, **kwargs): # pylint: disable=unused-argument
"""
Recieve COURSE_ACCESS_ROLE_REMOVED signal from the event bus
"""
event_data = kwargs['course_access_role_data']
user_data = event_data.user
course_key = event_data.course_key
role = event_data.role

if role not in COURSE_STAFF_ROLES:
return

CourseStaffRole.objects.filter(
user__username=user_data.pii.username,
course_id=course_key,
role=role,
).delete()
13 changes: 13 additions & 0 deletions edx_exams/apps/core/test_utils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from edx_exams.apps.core.models import (
AssessmentControlResult,
CourseExamConfiguration,
CourseStaffRole,
Exam,
ExamAttempt,
ProctoringProvider
Expand Down Expand Up @@ -115,3 +116,15 @@ class Meta:
incident_time = datetime.datetime.now() - datetime.timedelta(hours=1)
severity = 1
reason_code = '1'


class CourseStaffRoleFactory(DjangoModelFactory):
"""
Factory to create course staff roles
"""
class Meta:
model = CourseStaffRole

user = factory.SubFactory(UserFactory)
course_id = 'course-v1:edX+Test+Test_Course'
role = 'staff'
157 changes: 157 additions & 0 deletions edx_exams/apps/core/tests/test_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Test event handlers
"""
import uuid
from datetime import datetime, timezone

import ddt
from django.test import TestCase
from openedx_events.data import EventsMetadata
from openedx_events.learning.data import CourseAccessRoleData, UserData, UserPersonalData
from openedx_events.learning.signals import COURSE_ACCESS_ROLE_ADDED, COURSE_ACCESS_ROLE_REMOVED

from edx_exams.apps.core.models import CourseStaffRole, User
from edx_exams.apps.core.signals.handlers import (
listen_for_course_access_role_added,
listen_for_course_access_role_removed
)
from edx_exams.apps.core.test_utils.factories import CourseStaffRoleFactory, UserFactory


@ddt.ddt
class TestCourseRoleEvents(TestCase):
"""
Test course role events
"""
def setUp(self):
super().setUp()
self.course_id = 'course-v1:edx+test+2020'
self.existing_user = UserFactory(
username='test_user_exists', email='[email protected]'
)
self.existing_user_with_staff_role = UserFactory(
username='test_user_staff', email='[email protected]'
)
self.existing_user_with_instructor_role = UserFactory(
username='test_user_instructor', email='[email protected]'
)

CourseStaffRoleFactory(
user=self.existing_user_with_staff_role,
course_id=self.course_id,
role='staff',
)
CourseStaffRoleFactory(
user=self.existing_user_with_instructor_role,
course_id=self.course_id,
role='instructor',
)

@staticmethod
def _get_event_data(course_id, username, role):
""" create event data object """
return CourseAccessRoleData(
org_key='edx',
course_key=course_id,
role=role,
user=UserData(
pii=UserPersonalData(
username=username,
email=f'{username}@example.com',
),
id=123,
is_active=True,
),
)

@staticmethod
def _get_event_metadata(event_signal):
""" create metadata object for event """
return EventsMetadata(
event_type=event_signal.event_type,
id=uuid.uuid4(),
minorversion=0,
source='openedx/lms/web',
sourcehost='lms.test',
time=datetime.now(timezone.utc),
)

@ddt.data(
('test_user_1', 'staff', True),
('test_user_2', 'limited_staff', True),
('test_user_3', 'instructor', True),
('test_user_exists', 'staff', True),
('test_user_exists', 'other', False),
('test_user_staff', 'staff', True), # test duplicate event
('test_user_staff', 'instructor', True), # test multiple roles
)
@ddt.unpack
def test_course_access_role_added(self, username, role, expect_staff_access):
"""
Test CourseStaffRole is created on receiving COURSE_ACCESS_ROLE_ADDED event
with a role that grants staff access to exams
"""
role_event_data = self._get_event_data(self.course_id, username, role)
event_metadata = self._get_event_metadata(COURSE_ACCESS_ROLE_ADDED)
event_kwargs = {
'course_access_role_data': role_event_data,
'metadata': event_metadata,
}
listen_for_course_access_role_added(None, COURSE_ACCESS_ROLE_ADDED, **event_kwargs)

user = User.objects.get(username=username)
self.assertEqual(user.has_course_staff_permission(self.course_id), expect_staff_access)

@ddt.data(
('test_user_staff', 'staff', False),
('test_user_instructor', 'instructor', False),
('test_user_staff', 'limited_staff', True),
('test_user_staff', 'other', True),
('test_user_dne', 'other', False),
)
@ddt.unpack
def test_course_access_role_removed(self, username, role, expect_staff_access):
"""
Test CourseStaffRole is deleted on receiving COURSE_ACCESS_ROLE_REMOVED event
"""
role_event_data = self._get_event_data(self.course_id, username, role)
event_metadata = self._get_event_metadata(COURSE_ACCESS_ROLE_REMOVED)
event_kwargs = {
'course_access_role_data': role_event_data,
'metadata': event_metadata,
}
listen_for_course_access_role_removed(None, COURSE_ACCESS_ROLE_REMOVED, **event_kwargs)

if username == 'test_user_dne':
# this user should not be created (and therefore has no permissions)
self.assertFalse(User.objects.filter(username=username).exists())
else:
user = User.objects.get(username=username)
self.assertEqual(user.has_course_staff_permission(self.course_id), expect_staff_access)

def test_course_access_role_remove_single_role(self):
"""
Test correct role is removed for user with multiple roles
"""
CourseStaffRoleFactory(
user=self.existing_user_with_staff_role,
course_id=self.course_id,
role='instructor',
)

role_event_data = self._get_event_data(self.course_id, 'test_user_staff', 'staff')
event_metadata = self._get_event_metadata(COURSE_ACCESS_ROLE_REMOVED)
event_kwargs = {
'course_access_role_data': role_event_data,
'metadata': event_metadata,
}
listen_for_course_access_role_removed(None, COURSE_ACCESS_ROLE_REMOVED, **event_kwargs)

roles = [
staff_role.role for staff_role in
CourseStaffRole.objects.filter(user=self.existing_user_with_staff_role)
]
self.assertEqual(
roles,
['instructor']
)

0 comments on commit 096ba43

Please sign in to comment.