diff --git a/src/eherkenning/backends.py b/src/eherkenning/backends.py index dad3997527..4af05c57be 100644 --- a/src/eherkenning/backends.py +++ b/src/eherkenning/backends.py @@ -4,6 +4,8 @@ from digid_eherkenning.exceptions import eHerkenningError from digid_eherkenning.utils import get_client_ip +from open_inwoner.kvk.branches import KVK_BRANCH_SESSION_VARIABLE + UserModel = get_user_model() @@ -12,6 +14,12 @@ class eHerkenningBackend(_eHerkenningBackend): Custom backend to identify users based on the KvK number instead of RSIN """ + def get_company_branch_number(self, attributes): + company_branch_number = attributes.get( + "urn:etoegang:1.9:ServiceRestriction:Vestigingsnr", None + ) + return company_branch_number + def get_or_create_user(self, request, saml_response, saml_attributes): kvk = self.get_kvk_number(saml_attributes) if kvk == "": @@ -26,6 +34,10 @@ def get_or_create_user(self, request, saml_response, saml_attributes): user = UserModel.eherkenning_objects.eherkenning_create(kvk) created = True + if vestigingsnummer := self.get_company_branch_number(saml_attributes): + self.request.session[KVK_BRANCH_SESSION_VARIABLE] = vestigingsnummer + self.request.session.save() + success_message = self.error_messages["login_success"] % { "user": str(user), "user_info": " (new account)" if created else "", diff --git a/src/open_inwoner/accounts/backends.py b/src/open_inwoner/accounts/backends.py index c1bb8e2fc6..7fdca450af 100644 --- a/src/open_inwoner/accounts/backends.py +++ b/src/open_inwoner/accounts/backends.py @@ -5,16 +5,20 @@ from django.contrib.auth import get_user_model from django.contrib.auth.backends import ModelBackend from django.contrib.auth.hashers import check_password +from django.contrib.auth.models import AbstractUser from django.urls import reverse, reverse_lazy from axes.backends import AxesBackend from digid_eherkenning.oidc.backends import BaseBackend from mozilla_django_oidc_db.backends import OIDCAuthenticationBackend from mozilla_django_oidc_db.config import dynamic_setting +from mozilla_django_oidc_db.typing import JSONObject from oath import accept_totp from open_inwoner.configurations.models import SiteConfiguration +from open_inwoner.kvk.branches import KVK_BRANCH_SESSION_VARIABLE from open_inwoner.utils.hash import generate_email_from_string +from open_inwoner.utils.views import LogMixin from .choices import LoginTypeChoices from .models import OpenIDDigiDConfig, OpenIDEHerkenningConfig @@ -147,7 +151,7 @@ def filter_users_by_claims(self, claims): return self.UserModel.objects.filter(**{"oidc_id__iexact": unique_id}) -class DigiDEHerkenningOIDCBackend(BaseBackend): +class DigiDEHerkenningOIDCBackend(LogMixin, BaseBackend): OIP_UNIQUE_ID_USER_FIELDNAME = dynamic_setting[Literal["bsn", "kvk"]]() OIP_LOGIN_TYPE = dynamic_setting[LoginTypeChoices]() @@ -158,6 +162,26 @@ def _check_candidate_backend(self) -> bool: OpenIDEHerkenningConfig, ) + def _store_vestigingsnummer_in_session(self, claims: JSONObject): + """Get company vestigingsnummer from OIDC claims & store in session""" + + eherkenning_config = self.config_class.get_solo() + + branch_number_claim = eherkenning_config.branch_number_claim[0] + if not (vestigingsnummer := claims.get(branch_number_claim)): + return + + self.request.session[KVK_BRANCH_SESSION_VARIABLE] = vestigingsnummer + self.request.session.save() + + identifier_claim = eherkenning_config.identifier_type_claim[0] + kvk_or_rsin = claims.get(identifier_claim) + + self.log_system_action( + f"Vestigingsnummer {vestigingsnummer} retrieved from IdP for " + f"eHerkenning user (KVK/RSIN: {kvk_or_rsin})" + ) + def filter_users_by_claims(self, claims): """Return all users matching the specified subject.""" unique_id = self._extract_username(claims) @@ -169,7 +193,11 @@ def filter_users_by_claims(self, claims): ) def create_user(self, claims): - """Return object for a newly created user account.""" + """ + Return object for a newly created user account. + + Get vestigingsnummer from OIDC claims & store in session + """ unique_id = self._extract_username(claims) @@ -185,4 +213,12 @@ def create_user(self, claims): } ) + if self.config_class is OpenIDEHerkenningConfig: + self._store_vestigingsnummer_in_session(claims) + return user + + def update_user(self, user: AbstractUser, claims: JSONObject): + if self.config_class is OpenIDEHerkenningConfig: + self._store_vestigingsnummer_in_session(claims) + return super().update_user(user, claims) diff --git a/src/open_inwoner/accounts/tests/test_oidc_views.py b/src/open_inwoner/accounts/tests/test_oidc_views.py index 3453c6a4d9..5ee704b635 100644 --- a/src/open_inwoner/accounts/tests/test_oidc_views.py +++ b/src/open_inwoner/accounts/tests/test_oidc_views.py @@ -1828,3 +1828,66 @@ def test_redirect_after_login_no_registration_and_no_branch_selection( profile_response = self.app.get(profile_response.url) self.assertEqual(profile_response.status_code, 200) + + @patch("open_inwoner.kvk.client.KvKClient.get_all_company_branches") + @patch("open_inwoner.utils.context_processors.SiteConfiguration") + @patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.get_userinfo") + @patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.store_tokens") + @patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.verify_token") + @patch("mozilla_django_oidc_db.backends.OIDCAuthenticationBackend.get_token") + @patch( + "open_inwoner.accounts.models.OpenIDEHerkenningConfig.get_solo", + return_value=OpenIDEHerkenningConfig( + id=1, + enabled=True, + legal_subject_claim=["kvk"], + oidc_op_authorization_endpoint="http://idp.local/auth", + ), + ) + def test_redirect_after_login_branch_already_selected( + self, + mock_get_solo, + mock_get_token, + mock_verify_token, + mock_store_tokens, + mock_get_userinfo, + mock_siteconfig, + mock_kvk, + ): + """ + KVK branch selection should be skipped if KVK_BRANCH_SESSION_VARIABLE is present in session + """ + user = eHerkenningUserFactory.create(kvk="12345678", rsin="123456789") + mock_get_userinfo.return_value = { + "sub": "some_username", + "kvk": "12345678", + "urn:etoegang:1.9:ServiceRestriction:Vestigingsnr": "123456789000", + } + mock_siteconfig.return_value = SiteConfiguration(id=1, eherkenning_enabled=True) + mock_kvk.return_value = [ + {"kvkNummer": "12345678"}, + {"kvkNummer": "87654321"}, + ] + + self.assertEqual(User.objects.count(), 1) + + redirect_url = reverse("profile:detail") + + callback_response = perform_oidc_login( + self.app, "eherkenning", redirect_url=redirect_url + ) + + user = User.objects.get() + + self.assertEqual(user.pk, int(self.app.session.get("_auth_user_id"))) + self.assertEqual(user.kvk, "12345678") + self.assertEqual( + self.app.session.get(KVK_BRANCH_SESSION_VARIABLE), "123456789000" + ) + + self.assertRedirects( + callback_response, reverse("profile:detail"), fetch_redirect_response=False + ) + + response = self.app.get(callback_response.url) + self.assertEqual(response.status_code, 200)