Skip to content

Commit

Permalink
OM-9 Added economic unit handling (#5)
Browse files Browse the repository at this point in the history
* OM-9 Added economic unit handling

* OM-9 Minor fixes
  • Loading branch information
malinowskikam authored Oct 26, 2023
1 parent 73f63ea commit 2997ec0
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 79 deletions.
97 changes: 80 additions & 17 deletions msystems/services.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,49 @@
import logging
from typing import List

from core import datetime

from django.db import transaction
from django.db.models import Q
from secrets import token_hex

from core.models import User, InteractiveUser
from core.services.userServices import create_or_update_user_districts
from location.models import Location
from policyholder.models import PolicyHolder, PolicyHolderUser

logger = logging.getLogger(__name__)


class SamlUserService:

location = None

def __init__(self):
if not self.location:
self.location = Location.objects \
.prefetch_related('parent', 'parent__parent', 'parent__parent__parent') \
.get(code='MV01', validity_to__isnull=True)

def login(self, username: str, user_data: dict):
with transaction.atomic():
user = User.objects.prefetch_related(
'i_user').filter(username=username).first()
if not user:
user = self._create_user(username, user_data)
else:
self._update_user(user, user_data)
self._update_user_legal_entities(user, user_data)

return user
try:
user = self._get_or_create_user(username, user_data)
self._update_user_legal_entities(user, user_data)
return user
except BaseException as e:
# Extra logging for the development, should be removed for any real data usage
# as it will put personal information in logs
logger.debug("Successful SAML login handling failed, username=%s, user_data=%s", username,
str(user_data), exc_info=e)
raise

def _get_or_create_user(self, username: str, user_data: dict):
user = User.objects.prefetch_related('i_user').filter(username=username).first()
if not user:
user = self._create_user(username, user_data)
else:
self._update_user(user, user_data)
return user

def _create_user(self, username: str, user_data: dict) -> User:
i_user = InteractiveUser(
Expand All @@ -31,13 +54,12 @@ def _create_user(self, username: str, user_data: dict) -> User:
audit_user_id=0,
is_associated=False,
private_key=token_hex(128),
password="locked" # this is password hash, it means no password will match
password="locked" # this is password hash, it means no password will match
)
i_user.save()

district = Location.objects.get(code='MD01', validity_to__isnull=True)
create_or_update_user_districts(i_user, [district.id], 0)


create_or_update_user_districts(i_user, [self.location.parent.parent.id], 0)

core_user = User(username=username)
core_user.i_user = i_user
core_user.save()
Expand All @@ -56,5 +78,46 @@ def _update_user(self, user: User, user_data: dict) -> None:
user.i_user.save()

def _update_user_legal_entities(self, user: User, user_data: dict) -> None:
# TODO Handling of legal entities
pass
legal_entities = self._parse_legal_entities(user_data.get('OrganizationAdministrator'))
policyholders = [self._get_or_create_policy_holder(user, line[1], line[0]) for line in legal_entities]

self._delete_old_user_policyholders(user, policyholders)
self._add_new_user_policyholders(user, policyholders)

def _parse_legal_entities(self, legal_entities) -> map:
# The format of EU is "Name Tax_Number", splitting by the last space
return map(lambda s: s.rsplit(' ', 1), legal_entities)

def _get_or_create_policy_holder(self, user: User, code: str, name: str) -> PolicyHolder:
policyholder = PolicyHolder.objects.filter(code=code, is_deleted=False).first()
if not policyholder:
policyholder = self._create_policyholder(user, code, name)
else:
self._update_policyholder(user, policyholder, name)
return policyholder

def _create_policyholder(self, user: User, code: str, name: str) -> PolicyHolder:
policyholder = PolicyHolder(
code=code,
trade_name=name,
locations=self.location,
date_valid_from=datetime.datetime.now()
)
policyholder.save(username=user.username)
return policyholder

def _update_policyholder(self, user: User, policyholder: PolicyHolder, name: str):
if policyholder.trade_name != name:
policyholder.trade_name = name
policyholder.save(username=user.username)

def _delete_old_user_policyholders(self, user: User, policyholders: List[PolicyHolder] ):
for phu in PolicyHolderUser.objects.filter(~Q(policy_holder__in=policyholders), user=user, is_deleted=False):
phu.delete(username=user.username)

def _add_new_user_policyholders(self, user: User, policyholders: List[PolicyHolder]):
current_policyholders = (PolicyHolder.objects.filter(policyholderuser__user=user,
policyholderuser__is_deleted=False, is_deleted=False))
for ph in policyholders:
if ph not in current_policyholders:
PolicyHolderUser(user=user, policy_holder=ph).save(username=user.username)
13 changes: 11 additions & 2 deletions msystems/tests/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@
'LastName': ['Test_Last_Name'],
'BirthDate': ['1970-01-01'],
'OrganizationAdministrator': [
'Test Ogrganisation 1 2345234523452',
'Test Ogrganisation 2 1234123412341'
'Test Organisation 1 2345234523452',
]
}

example_user_data_multiple_ph = {
'FirstName': ['Test_First_Name'],
'LastName': ['Test_Last_Name'],
'BirthDate': ['1970-01-01'],
'OrganizationAdministrator': [
'Test Organisation 1 2345234523452',
'Test Organisation 2 1234123412341'
]
}
119 changes: 96 additions & 23 deletions msystems/tests/saml_user_service_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from location.models import Location

from msystems.services import SamlUserService
from msystems.tests.data import example_username, example_user_data
from msystems.tests.data import example_username, example_user_data, example_user_data_multiple_ph
from core.models import User, InteractiveUser
from policyholder.models import PolicyHolder


class SamlUserServiceTestCase(TestCase):
Expand All @@ -30,36 +31,33 @@ def test_login(self):
login_name=example_username, validity_to__isnull=True).exists())

def test_multiple_logina_data_updated(self):
self.service.login(username=example_username,
user_data=example_user_data)
self.service.login(username=example_username, user_data=example_user_data)

self.assertTrue(InteractiveUser.objects
.filter(login_name=example_username, last_name=example_user_data['LastName'][0], validity_to__isnull=True)
.filter(login_name=example_username, last_name=example_user_data['LastName'][0],
validity_to__isnull=True)
.exists())

example_user_data_updated = deepcopy(example_user_data)
example_user_data_updated['LastName'][0] = "Test_Last_Name_Updated"

self.service.login(username=example_username,
user_data=example_user_data_updated)
self.service.login(username=example_username, user_data=example_user_data_updated)

self.assertTrue(InteractiveUser.objects
.filter(login_name=example_username, last_name=example_user_data['LastName'][0], validity_to__isnull=False)
.exists())
self.assertTrue(InteractiveUser.objects
.filter(login_name=example_username, last_name=example_user_data_updated['LastName'][0], validity_to__isnull=True)
.exists())
self.assertTrue(
InteractiveUser.objects.filter(login_name=example_username, last_name=example_user_data['LastName'][0],
validity_to__isnull=False).exists())
self.assertTrue(InteractiveUser.objects.filter(login_name=example_username,
last_name=example_user_data_updated['LastName'][0],
validity_to__isnull=True).exists())

def test_multiple_logins_no_data_update(self):
self.service.login(username=example_username,
user_data=example_user_data)
self.service.login(username=example_username, user_data=example_user_data)

self.assertTrue(InteractiveUser.objects
.filter(login_name=example_username, last_name=example_user_data['LastName'][0], validity_to__isnull=True)
.exists())
self.assertTrue(
InteractiveUser.objects.filter(login_name=example_username, last_name=example_user_data['LastName'][0],
validity_to__isnull=True).exists())

self.service.login(username=example_username,
user_data=example_user_data)
self.service.login(username=example_username, user_data=example_user_data)

self.assertFalse(InteractiveUser.objects
.filter(login_name=example_username, validity_to__isnull=False)
Expand All @@ -69,10 +67,85 @@ def test_multiple_logins_no_data_update(self):
.exists())

def test_user_district(self):
self.service.login(username=example_username,
user_data=example_user_data)
self.service.login(username=example_username, user_data=example_user_data)

i_user = InteractiveUser.objects.get(login_name=example_username)
district = Location.objects.get(code='MD01')
self.assertEqual(
[ud.location for ud in i_user.userdistrict_set.all()], [district])
self.assertEqual([ud.location for ud in i_user.userdistrict_set.all()], [district])

def test_create_policyholder(self):
user = self.service.login(username=example_username, user_data=example_user_data)

self.assertTrue(PolicyHolder.objects.filter(code='2345234523452', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())

def test_update_policyholder(self):
user = self.service.login(username=example_username, user_data=example_user_data)

self.assertTrue(
PolicyHolder.objects.filter(code='2345234523452', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())

example_user_data_updated = deepcopy(example_user_data)
example_user_data_updated['OrganizationAdministrator'][0] = "Test New Organisation 1 2345234523999"

user = self.service.login(username=example_username, user_data=example_user_data_updated)

self.assertTrue(
PolicyHolder.objects.filter(code='2345234523452', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=True).exists())
self.assertTrue(
PolicyHolder.objects.filter(code='2345234523999', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())
self.assertEqual(2, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user).count())

def test_update_policyholder_name(self):
user = self.service.login(username=example_username, user_data=example_user_data)

self.assertTrue(
PolicyHolder.objects.filter(code='2345234523452', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())

example_user_data_updated = deepcopy(example_user_data)
example_user_data_updated['OrganizationAdministrator'][0] = "Test New Name Organisation 1 2345234523452"

user = self.service.login(username=example_username, user_data=example_user_data_updated)

self.assertTrue(PolicyHolder.objects.filter(code='2345234523452', trade_name='Test New Name Organisation 1',
is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())
self.assertEqual(1, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user).count())

def test_revoke_policyholder(self):
user = self.service.login(username=example_username, user_data=example_user_data_multiple_ph)

self.assertTrue(
PolicyHolder.objects.filter(code='2345234523452', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())
self.assertEqual(2, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user).count())

example_user_data_updated = deepcopy(example_user_data_multiple_ph)
example_user_data_updated['OrganizationAdministrator'].pop()

user = self.service.login(username=example_username, user_data=example_user_data_updated)

self.assertEqual(1, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).count())
self.assertEqual(1, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=True).count())

def test_add_policyholder(self):
user = self.service.login(username=example_username, user_data=example_user_data)

self.assertTrue(
PolicyHolder.objects.filter(code='2345234523452', is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).exists())
self.assertEqual(1, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user).count())

example_user_data_updated = deepcopy(example_user_data)
example_user_data_updated['OrganizationAdministrator'].append("Test New Organisation 2345234523999")

user = self.service.login(username=example_username, user_data=example_user_data_updated)

self.assertEqual(2, PolicyHolder.objects.filter(is_deleted=False, policyholderuser__user=user,
policyholderuser__is_deleted=False).count())
Loading

0 comments on commit 2997ec0

Please sign in to comment.