Skip to content

Commit

Permalink
Merge pull request #1526 from maykinmedia/task/2932-eherkenning-login…
Browse files Browse the repository at this point in the history
…-choice

[#2932] Skip KVK branch selection if vestigingsnummer already selected
  • Loading branch information
alextreme authored Dec 16, 2024
2 parents 89451c6 + fcb4c6b commit 1645f79
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/eherkenning/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from digid_eherkenning.exceptions import eHerkenningError
from digid_eherkenning.utils import get_client_ip

from open_inwoner.kvk.branches import KVK_BRANCH_SESSION_VARIABLE

UserModel = get_user_model()


Expand All @@ -12,6 +14,12 @@ class eHerkenningBackend(_eHerkenningBackend):
Custom backend to identify users based on the KvK number instead of RSIN
"""

def get_company_branch_number(self, attributes):
company_branch_number = attributes.get(
"urn:etoegang:1.9:ServiceRestriction:Vestigingsnr", None
)
return company_branch_number

def get_or_create_user(self, request, saml_response, saml_attributes):
kvk = self.get_kvk_number(saml_attributes)
if kvk == "":
Expand All @@ -26,6 +34,10 @@ def get_or_create_user(self, request, saml_response, saml_attributes):
user = UserModel.eherkenning_objects.eherkenning_create(kvk)
created = True

if vestigingsnummer := self.get_company_branch_number(saml_attributes):
self.request.session[KVK_BRANCH_SESSION_VARIABLE] = vestigingsnummer
self.request.session.save()

success_message = self.error_messages["login_success"] % {
"user": str(user),
"user_info": " (new account)" if created else "",
Expand Down
40 changes: 38 additions & 2 deletions src/open_inwoner/accounts/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.hashers import check_password
from django.contrib.auth.models import AbstractUser
from django.urls import reverse, reverse_lazy

from axes.backends import AxesBackend
from digid_eherkenning.oidc.backends import BaseBackend
from mozilla_django_oidc_db.backends import OIDCAuthenticationBackend
from mozilla_django_oidc_db.config import dynamic_setting
from mozilla_django_oidc_db.typing import JSONObject
from oath import accept_totp

from open_inwoner.configurations.models import SiteConfiguration
from open_inwoner.kvk.branches import KVK_BRANCH_SESSION_VARIABLE
from open_inwoner.utils.hash import generate_email_from_string
from open_inwoner.utils.views import LogMixin

from .choices import LoginTypeChoices
from .models import OpenIDDigiDConfig, OpenIDEHerkenningConfig
Expand Down Expand Up @@ -147,7 +151,7 @@ def filter_users_by_claims(self, claims):
return self.UserModel.objects.filter(**{"oidc_id__iexact": unique_id})


class DigiDEHerkenningOIDCBackend(BaseBackend):
class DigiDEHerkenningOIDCBackend(LogMixin, BaseBackend):
OIP_UNIQUE_ID_USER_FIELDNAME = dynamic_setting[Literal["bsn", "kvk"]]()
OIP_LOGIN_TYPE = dynamic_setting[LoginTypeChoices]()

Expand All @@ -158,6 +162,26 @@ def _check_candidate_backend(self) -> bool:
OpenIDEHerkenningConfig,
)

def _store_vestigingsnummer_in_session(self, claims: JSONObject):
"""Get company vestigingsnummer from OIDC claims & store in session"""

eherkenning_config = self.config_class.get_solo()

branch_number_claim = eherkenning_config.branch_number_claim[0]
if not (vestigingsnummer := claims.get(branch_number_claim)):
return

self.request.session[KVK_BRANCH_SESSION_VARIABLE] = vestigingsnummer
self.request.session.save()

identifier_claim = eherkenning_config.identifier_type_claim[0]
kvk_or_rsin = claims.get(identifier_claim)

self.log_system_action(
f"Vestigingsnummer {vestigingsnummer} retrieved from IdP for "
f"eHerkenning user (KVK/RSIN: {kvk_or_rsin})"
)

def filter_users_by_claims(self, claims):
"""Return all users matching the specified subject."""
unique_id = self._extract_username(claims)
Expand All @@ -169,7 +193,11 @@ def filter_users_by_claims(self, claims):
)

def create_user(self, claims):
"""Return object for a newly created user account."""
"""
Return object for a newly created user account.
Get vestigingsnummer from OIDC claims & store in session
"""

unique_id = self._extract_username(claims)

Expand All @@ -185,4 +213,12 @@ def create_user(self, claims):
}
)

if self.config_class is OpenIDEHerkenningConfig:
self._store_vestigingsnummer_in_session(claims)

return user

def update_user(self, user: AbstractUser, claims: JSONObject):
if self.config_class is OpenIDEHerkenningConfig:
self._store_vestigingsnummer_in_session(claims)
return super().update_user(user, claims)
63 changes: 63 additions & 0 deletions src/open_inwoner/accounts/tests/test_oidc_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1920,3 +1920,66 @@ def test_redirect_after_login_no_registration_and_no_branch_selection(
profile_response = self.app.get(profile_response.url)

self.assertEqual(profile_response.status_code, 200)

@patch("open_inwoner.kvk.client.KvKClient.get_all_company_branches")
@patch("open_inwoner.utils.context_processors.SiteConfiguration")
@patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.get_userinfo")
@patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.store_tokens")
@patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.verify_token")
@patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.get_token")
@patch(
"open_inwoner.accounts.models.OpenIDEHerkenningConfig.get_solo",
return_value=OpenIDEHerkenningConfig(
id=1,
enabled=True,
legal_subject_claim=["kvk"],
oidc_op_authorization_endpoint="http://idp.local/auth",
),
)
def test_redirect_after_login_branch_already_selected(
self,
mock_get_solo,
mock_get_token,
mock_verify_token,
mock_store_tokens,
mock_get_userinfo,
mock_siteconfig,
mock_kvk,
):
"""
KVK branch selection should be skipped if KVK_BRANCH_SESSION_VARIABLE is present in session
"""
user = eHerkenningUserFactory.create(kvk="12345678", rsin="123456789")
mock_get_userinfo.return_value = {
"sub": "some_username",
"kvk": "12345678",
"urn:etoegang:1.9:ServiceRestriction:Vestigingsnr": "123456789000",
}
mock_siteconfig.return_value = SiteConfiguration(id=1, eherkenning_enabled=True)
mock_kvk.return_value = [
{"kvkNummer": "12345678"},
{"kvkNummer": "87654321"},
]

self.assertEqual(User.objects.count(), 1)

redirect_url = reverse("profile:detail")

callback_response = perform_oidc_login(
self.app, "eherkenning", redirect_url=redirect_url
)

user = User.objects.get()

self.assertEqual(user.pk, int(self.app.session.get("_auth_user_id")))
self.assertEqual(user.kvk, "12345678")
self.assertEqual(
self.app.session.get(KVK_BRANCH_SESSION_VARIABLE), "123456789000"
)

self.assertRedirects(
callback_response, reverse("profile:detail"), fetch_redirect_response=False
)

response = self.app.get(callback_response.url)
self.assertEqual(response.status_code, 200)

0 comments on commit 1645f79

Please sign in to comment.