Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OM-47: assign msmystem roles to imis user #7

Merged
merged 11 commits into from
Oct 31, 2023
6 changes: 6 additions & 0 deletions msystems/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class MsystemsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'msystems'

##### DO NOT CHANGE THIS ####
ADMIN = 'Admin'
INSPECTOR = 'Inspector'
EMPLOYER = 'Employer'
##### ------------------ ####

saml_config = None
base_login_redirect = None

Expand Down
45 changes: 45 additions & 0 deletions msystems/migrations/0002_add_roles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Generated by Django 3.2.19 on 2023-06-22 14:36

from django.db import migrations

ROLE_NAME_INSPECTOR = "Inspector"
ROLE_NAME_EMPLOYER = "Employer"


def _get_role(role_name, role_model):
return role_model.objects.filter(name=role_name).first()


def _create_role(role_name, role_model):
role = _get_role(role_name, role_model)
if not role:
role = role_model(name=role_name, is_blocked=False, is_system=0)
role.save()


def _delete_role(role_name, role_model):
role = _get_role(role_name, role_model)
if role:
role.delete()


def on_migration(apps, schema_editor):
role_model = apps.get_model("core", "role")
_create_role(ROLE_NAME_INSPECTOR, role_model)
_create_role(ROLE_NAME_EMPLOYER, role_model)


def on_migration_reverse(apps, schema_editor):
role_model = apps.get_model("core", "role")
_delete_role(ROLE_NAME_INSPECTOR, role_model)
_delete_role(ROLE_NAME_EMPLOYER, role_model)


class Migration(migrations.Migration):
dependencies = [
('msystems', '0001_initial'),
]

operations = [
migrations.RunPython(on_migration, on_migration_reverse)
]
55 changes: 47 additions & 8 deletions msystems/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from django.db.models import Q
from secrets import token_hex

from core.models import User, InteractiveUser
from core.models import User, InteractiveUser, Role, UserRole
from core.services.userServices import create_or_update_user_districts
from location.models import Location
from msystems.apps import MsystemsConfig
from policyholder.models import PolicyHolder, PolicyHolderUser

logger = logging.getLogger(__name__)
Expand All @@ -29,6 +30,7 @@ def login(self, username: str, user_data: dict):
try:
logger.debug("Successful SAML login, username=%s, user_data=%s", username, str(user_data))
user = self._get_or_create_user(username, user_data)
self._update_user_roles(user, user_data)
self._update_user_legal_entities(user, user_data)
return user
except BaseException as e:
Expand Down Expand Up @@ -68,13 +70,9 @@ def _update_user(self, user: User, user_data: dict) -> None:
data_first_name = user_data.get('FirstName')[0]
data_last_name = user_data.get('LastName')[0]

# For now only first and last name can be updated with saml
if user.i_user.other_names != data_first_name \
or user.i_user.last_name != data_last_name:
user.i_user.save_history()
user.i_user.other_names = data_first_name
user.i_user.last_name = data_last_name
user.i_user.save()
# Update first and last name if they are different
if user.i_user.other_names != data_first_name or user.i_user.last_name != data_last_name:
self._update_user_name(user.i_user, data_first_name, data_last_name)

def _update_user_legal_entities(self, user: User, user_data: dict) -> None:
legal_entities = self._parse_legal_entities(user_data.get('OrganizationAdministrator'))
Expand All @@ -83,6 +81,18 @@ def _update_user_legal_entities(self, user: User, user_data: dict) -> None:
self._delete_old_user_policyholders(user, policyholders)
self._add_new_user_policyholders(user, policyholders)

def _update_user_roles(self, user, user_data):
msystem_roles_list = user_data.get('Role')

self._delete_old_user_roles(user, msystem_roles_list)
self._add_new_user_roles(user, msystem_roles_list)

def _update_user_name(self, i_user, first_name, last_name):
i_user.save_history()
i_user.other_names = first_name
i_user.last_name = last_name
i_user.save()

def _parse_legal_entities(self, legal_entities) -> map:
# The format of EU is "Name Tax_Number", splitting by the last space
return map(lambda s: s.rsplit(' ', 1), legal_entities)
Expand Down Expand Up @@ -114,9 +124,38 @@ def _delete_old_user_policyholders(self, user: User, policyholders: List[PolicyH
for phu in PolicyHolderUser.objects.filter(~Q(policy_holder__in=policyholders), user=user, is_deleted=False):
phu.delete(username=user.username)

def _delete_old_user_roles(self, user: User, roles: List[str]):
for user_role in UserRole.objects.filter(~Q(role__name__in=roles), user=user.i_user, validity_to__isnull=True):
user_role.delete_history()

def _add_new_user_policyholders(self, user: User, policyholders: List[PolicyHolder]):
current_policyholders = (PolicyHolder.objects.filter(policyholderuser__user=user,
policyholderuser__is_deleted=False, is_deleted=False))
for ph in policyholders:
if ph not in current_policyholders:
PolicyHolderUser(user=user, policy_holder=ph).save(username=user.username)

def _add_new_user_roles(self, user: User, roles: List[str]):
current_user_roles = UserRole.objects.filter(user=user.i_user, validity_to__isnull=True)
for role in roles:
parsed_role = self._parse_msystem_role_to_imis_role(role)
if not current_user_roles.filter(role=parsed_role).exists():
UserRole(user=user.i_user, role=parsed_role).save()

def _update_roles(self, i_user, imis_role_ids):
self._remove_previous_user_roles(i_user)
self._connect_role_with_user(i_user, imis_role_ids)

def _connect_role_with_user(self, i_user, role_names):
for role_name in role_names:
role = Role.objects.filter(name=role_name).first()
UserRole.objects.create(user=i_user, role=role)

def _remove_previous_user_roles(self, i_user):
roles = UserRole.objects.filter(user=i_user)
if roles.exists():
for role in roles:
role.delete_history()

def _parse_msystem_role_to_imis_role(self, msystem_role):
return Role.objects.filter(name=msystem_role).first()
68 changes: 67 additions & 1 deletion msystems/tests/saml_user_service_tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from django.test import TestCase
from copy import deepcopy
from location.models import Location
from msystems.apps import MsystemsConfig

from msystems.services import SamlUserService
from msystems.tests.data import example_username, example_user_data, example_user_data_multiple_ph
from core.models import User, InteractiveUser
from core.models import User, InteractiveUser, UserRole, Role
from policyholder.models import PolicyHolder


Expand Down Expand Up @@ -149,3 +150,68 @@ def test_add_policyholder(self):

self.assertEqual(2, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).count())


def test_login_user_roles(self):
role_employer_qs = Role.objects.filter(name=MsystemsConfig.EMPLOYER)

self.assertFalse(UserRole.objects.filter(role=role_employer_qs.first()).exists())

self.service.login(username=example_username,
user_data=example_user_data)

user_qs = InteractiveUser.objects.filter(
login_name=example_username, validity_to__isnull=True)

self.assertTrue(user_qs.exists())
self.assertEquals(
UserRole.objects.filter(user=user_qs.first()).first().role, role_employer_qs.first()
)


def test_multiple_login_roles_updated(self):
role_employer_qs = Role.objects.filter(name=MsystemsConfig.EMPLOYER)
role_inspector_qs = Role.objects.filter(name=MsystemsConfig.INSPECTOR)

self.assertFalse(UserRole.objects.filter(role=role_employer_qs.first()).exists())
self.assertFalse(UserRole.objects.filter(role=role_inspector_qs.first()).exists())

self.service.login(username=example_username, user_data=example_user_data)

user_qs = InteractiveUser.objects.filter(
login_name=example_username, validity_to__isnull=True
)

self.assertEquals(
UserRole.objects.filter(user=user_qs.first()).first().role, role_employer_qs.first()
)

example_user_data_updated = deepcopy(example_user_data)
example_user_data_updated['Role'] = ["Inspector"]

self.service.login(username=example_username, user_data=example_user_data_updated)
user_qs = InteractiveUser.objects.filter(login_name=example_username, validity_to__isnull=True)
active_role_qs = UserRole.objects.filter(user=user_qs.first(), validity_to__isnull=True)
deleted_role_qs = UserRole.objects.filter(user=user_qs.first(), validity_to__isnull=False)

self.assertEquals(active_role_qs.count(), 1)
self.assertEquals(deleted_role_qs.count(), 2) # due to delete_history() it creates two instances
self.assertEquals(
active_role_qs.first().role,
role_inspector_qs.first()
)

def test_multiple_logins_no_role_update(self):
role_employer_qs = Role.objects.filter(name=MsystemsConfig.EMPLOYER)

self.assertFalse(UserRole.objects.filter(role=role_employer_qs.first()).exists())

self.service.login(username=example_username, user_data=example_user_data)

self.service.login(username=example_username, user_data=example_user_data)

user_qs = InteractiveUser.objects.filter(login_name=example_username, validity_to__isnull=True)
user_role_qs = UserRole.objects.filter(user=user_qs.first(), validity_to__isnull=True)

self.assertEquals(user_role_qs.count(), 1)
self.assertEquals(user_role_qs.first().role, role_employer_qs.first())
Loading