diff --git a/msystems/apps.py b/msystems/apps.py index 3ce8a7a..f7cf183 100644 --- a/msystems/apps.py +++ b/msystems/apps.py @@ -94,6 +94,12 @@ class MsystemsConfig(AppConfig): default_auto_field = 'django.db.models.BigAutoField' name = 'msystems' + ##### DO NOT CHANGE THIS #### + ADMIN = 'Admin' + INSPECTOR = 'Inspector' + EMPLOYER = 'Employer' + ##### ------------------ #### + saml_config = None base_login_redirect = None diff --git a/msystems/migrations/0002_add_roles.py b/msystems/migrations/0002_add_roles.py new file mode 100644 index 0000000..53e5cb0 --- /dev/null +++ b/msystems/migrations/0002_add_roles.py @@ -0,0 +1,45 @@ +# Generated by Django 3.2.19 on 2023-06-22 14:36 + +from django.db import migrations + +ROLE_NAME_INSPECTOR = "Inspector" +ROLE_NAME_EMPLOYER = "Employer" + + +def _get_role(role_name, role_model): + return role_model.objects.filter(name=role_name).first() + + +def _create_role(role_name, role_model): + role = _get_role(role_name, role_model) + if not role: + role = role_model(name=role_name, is_blocked=False, is_system=0) + role.save() + + +def _delete_role(role_name, role_model): + role = _get_role(role_name, role_model) + if role: + role.delete() + + +def on_migration(apps, schema_editor): + role_model = apps.get_model("core", "role") + _create_role(ROLE_NAME_INSPECTOR, role_model) + _create_role(ROLE_NAME_EMPLOYER, role_model) + + +def on_migration_reverse(apps, schema_editor): + role_model = apps.get_model("core", "role") + _delete_role(ROLE_NAME_INSPECTOR, role_model) + _delete_role(ROLE_NAME_EMPLOYER, role_model) + + +class Migration(migrations.Migration): + dependencies = [ + ('msystems', '0001_initial'), + ] + + operations = [ + migrations.RunPython(on_migration, on_migration_reverse) + ] diff --git a/msystems/services.py b/msystems/services.py index 939d982..75fb38b 100644 --- a/msystems/services.py +++ b/msystems/services.py @@ -7,9 +7,10 @@ from django.db.models import Q from secrets import token_hex -from core.models import User, InteractiveUser +from core.models import User, InteractiveUser, Role, UserRole from core.services.userServices import create_or_update_user_districts from location.models import Location +from msystems.apps import MsystemsConfig from policyholder.models import PolicyHolder, PolicyHolderUser logger = logging.getLogger(__name__) @@ -29,6 +30,7 @@ def login(self, username: str, user_data: dict): try: logger.debug("Successful SAML login, username=%s, user_data=%s", username, str(user_data)) user = self._get_or_create_user(username, user_data) + self._update_user_roles(user, user_data) self._update_user_legal_entities(user, user_data) return user except BaseException as e: @@ -68,13 +70,9 @@ def _update_user(self, user: User, user_data: dict) -> None: data_first_name = user_data.get('FirstName')[0] data_last_name = user_data.get('LastName')[0] - # For now only first and last name can be updated with saml - if user.i_user.other_names != data_first_name \ - or user.i_user.last_name != data_last_name: - user.i_user.save_history() - user.i_user.other_names = data_first_name - user.i_user.last_name = data_last_name - user.i_user.save() + # Update first and last name if they are different + if user.i_user.other_names != data_first_name or user.i_user.last_name != data_last_name: + self._update_user_name(user.i_user, data_first_name, data_last_name) def _update_user_legal_entities(self, user: User, user_data: dict) -> None: legal_entities = self._parse_legal_entities(user_data.get('OrganizationAdministrator')) @@ -83,6 +81,18 @@ def _update_user_legal_entities(self, user: User, user_data: dict) -> None: self._delete_old_user_policyholders(user, policyholders) self._add_new_user_policyholders(user, policyholders) + def _update_user_roles(self, user, user_data): + msystem_roles_list = user_data.get('Role') + + self._delete_old_user_roles(user, msystem_roles_list) + self._add_new_user_roles(user, msystem_roles_list) + + def _update_user_name(self, i_user, first_name, last_name): + i_user.save_history() + i_user.other_names = first_name + i_user.last_name = last_name + i_user.save() + def _parse_legal_entities(self, legal_entities) -> map: # The format of EU is "Name Tax_Number", splitting by the last space return map(lambda s: s.rsplit(' ', 1), legal_entities) @@ -114,9 +124,38 @@ def _delete_old_user_policyholders(self, user: User, policyholders: List[PolicyH for phu in PolicyHolderUser.objects.filter(~Q(policy_holder__in=policyholders), user=user, is_deleted=False): phu.delete(username=user.username) + def _delete_old_user_roles(self, user: User, roles: List[str]): + for user_role in UserRole.objects.filter(~Q(role__name__in=roles), user=user.i_user, validity_to__isnull=True): + user_role.delete_history() + def _add_new_user_policyholders(self, user: User, policyholders: List[PolicyHolder]): current_policyholders = (PolicyHolder.objects.filter(policyholderuser__user=user, policyholderuser__is_deleted=False, is_deleted=False)) for ph in policyholders: if ph not in current_policyholders: PolicyHolderUser(user=user, policy_holder=ph).save(username=user.username) + + def _add_new_user_roles(self, user: User, roles: List[str]): + current_user_roles = UserRole.objects.filter(user=user.i_user, validity_to__isnull=True) + for role in roles: + parsed_role = self._parse_msystem_role_to_imis_role(role) + if not current_user_roles.filter(role=parsed_role).exists(): + UserRole(user=user.i_user, role=parsed_role).save() + + def _update_roles(self, i_user, imis_role_ids): + self._remove_previous_user_roles(i_user) + self._connect_role_with_user(i_user, imis_role_ids) + + def _connect_role_with_user(self, i_user, role_names): + for role_name in role_names: + role = Role.objects.filter(name=role_name).first() + UserRole.objects.create(user=i_user, role=role) + + def _remove_previous_user_roles(self, i_user): + roles = UserRole.objects.filter(user=i_user) + if roles.exists(): + for role in roles: + role.delete_history() + + def _parse_msystem_role_to_imis_role(self, msystem_role): + return Role.objects.filter(name=msystem_role).first() diff --git a/msystems/tests/saml_user_service_tests.py b/msystems/tests/saml_user_service_tests.py index c37d5fd..46b88ed 100644 --- a/msystems/tests/saml_user_service_tests.py +++ b/msystems/tests/saml_user_service_tests.py @@ -1,10 +1,11 @@ from django.test import TestCase from copy import deepcopy from location.models import Location +from msystems.apps import MsystemsConfig from msystems.services import SamlUserService from msystems.tests.data import example_username, example_user_data, example_user_data_multiple_ph -from core.models import User, InteractiveUser +from core.models import User, InteractiveUser, UserRole, Role from policyholder.models import PolicyHolder @@ -149,3 +150,68 @@ def test_add_policyholder(self): self.assertEqual(2, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user, policyholderuser__is_deleted=False).count()) + + + def test_login_user_roles(self): + role_employer_qs = Role.objects.filter(name=MsystemsConfig.EMPLOYER) + + self.assertFalse(UserRole.objects.filter(role=role_employer_qs.first()).exists()) + + self.service.login(username=example_username, + user_data=example_user_data) + + user_qs = InteractiveUser.objects.filter( + login_name=example_username, validity_to__isnull=True) + + self.assertTrue(user_qs.exists()) + self.assertEquals( + UserRole.objects.filter(user=user_qs.first()).first().role, role_employer_qs.first() + ) + + + def test_multiple_login_roles_updated(self): + role_employer_qs = Role.objects.filter(name=MsystemsConfig.EMPLOYER) + role_inspector_qs = Role.objects.filter(name=MsystemsConfig.INSPECTOR) + + self.assertFalse(UserRole.objects.filter(role=role_employer_qs.first()).exists()) + self.assertFalse(UserRole.objects.filter(role=role_inspector_qs.first()).exists()) + + self.service.login(username=example_username, user_data=example_user_data) + + user_qs = InteractiveUser.objects.filter( + login_name=example_username, validity_to__isnull=True + ) + + self.assertEquals( + UserRole.objects.filter(user=user_qs.first()).first().role, role_employer_qs.first() + ) + + example_user_data_updated = deepcopy(example_user_data) + example_user_data_updated['Role'] = ["Inspector"] + + self.service.login(username=example_username, user_data=example_user_data_updated) + user_qs = InteractiveUser.objects.filter(login_name=example_username, validity_to__isnull=True) + active_role_qs = UserRole.objects.filter(user=user_qs.first(), validity_to__isnull=True) + deleted_role_qs = UserRole.objects.filter(user=user_qs.first(), validity_to__isnull=False) + + self.assertEquals(active_role_qs.count(), 1) + self.assertEquals(deleted_role_qs.count(), 2) # due to delete_history() it creates two instances + self.assertEquals( + active_role_qs.first().role, + role_inspector_qs.first() + ) + + def test_multiple_logins_no_role_update(self): + role_employer_qs = Role.objects.filter(name=MsystemsConfig.EMPLOYER) + + self.assertFalse(UserRole.objects.filter(role=role_employer_qs.first()).exists()) + + self.service.login(username=example_username, user_data=example_user_data) + + self.service.login(username=example_username, user_data=example_user_data) + + user_qs = InteractiveUser.objects.filter(login_name=example_username, validity_to__isnull=True) + user_role_qs = UserRole.objects.filter(user=user_qs.first(), validity_to__isnull=True) + + self.assertEquals(user_role_qs.count(), 1) + self.assertEquals(user_role_qs.first().role, role_employer_qs.first())