diff --git a/CHANGELOG.md b/CHANGELOG.md index d0354182..70f11e4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Change log +## Devel + +* Allow a user to link an Account that is not or has never been linked to another user. + ## 0.26.1 (2024-12-02) * Bugfix: `run_anvil_audit` now raises a `CommandException` if an API call fails. diff --git a/anvil_consortium_manager/__init__.py b/anvil_consortium_manager/__init__.py index 025f4c5d..ea14a2ed 100644 --- a/anvil_consortium_manager/__init__.py +++ b/anvil_consortium_manager/__init__.py @@ -1 +1 @@ -__version__ = "0.26.1" +__version__ = "0.27.0.dev0" diff --git a/anvil_consortium_manager/tests/test_views.py b/anvil_consortium_manager/tests/test_views.py index 6920468f..a95791ec 100644 --- a/anvil_consortium_manager/tests/test_views.py +++ b/anvil_consortium_manager/tests/test_views.py @@ -2341,14 +2341,52 @@ def test_blank_email(self): def test_account_exists_with_email_but_not_linked_to_user(self): """An Account with this email exists but is not linked to a user.""" email = "test@example.com" + # Create an account with this email. factories.AccountFactory.create(email=email) + api_url = self.get_api_url(email) + self.anvil_response_mock.add(responses.GET, api_url, status=200, json=self.get_api_json_response(email)) + timestamp_lower_limit = timezone.now() + # Need a client because messages are added. + self.client.force_login(self.user) + response = self.client.post(self.get_url(), {"email": email}) + self.assertEqual(response.status_code, 302) + # A new UserEmailEntry is created. + self.assertEqual(models.UserEmailEntry.objects.count(), 1) + # The new UserEmailentry is linked to the logged-in user. + new_object = models.UserEmailEntry.objects.latest("pk") + self.assertEqual(new_object.email, email) + self.assertEqual(new_object.user, self.user) + self.assertIsNotNone(new_object.date_verification_email_sent) + self.assertGreaterEqual(new_object.date_verification_email_sent, timestamp_lower_limit) + self.assertLessEqual(new_object.date_verification_email_sent, timezone.now()) + self.assertIsNone(new_object.date_verified) + # No account is linked. + with self.assertRaises(ObjectDoesNotExist): + new_object.verified_account + # History is added. + self.assertEqual(new_object.history.count(), 1) + self.assertEqual(new_object.history.latest().history_type, "+") + # One message has been sent. + self.assertEqual(len(mail.outbox), 1) + # The subject is correct. + self.assertEqual(mail.outbox[0].subject, "Verify your AnVIL account email") + + def test_account_exists_previously_linked_to_user(self): + """An Account with this email exists but is not linked to a user.""" + email = "test@example.com" + # Create an account with this email, and unlink it. + account = factories.AccountFactory.create(email=email, verified=True) + account.unlink_user() # No API call should be made, so do not add a mocked response. # Need a client because messages are added. self.client.force_login(self.user) response = self.client.post(self.get_url(), {"email": email}, follow=True) self.assertRedirects(response, "/test_home/") # No new UserEmailEntry is created. - self.assertEqual(models.UserEmailEntry.objects.count(), 0) + self.assertEqual(models.UserEmailEntry.objects.count(), 1) + self.assertEqual( + models.UserEmailEntry.objects.latest("pk"), account.accountuserarchive_set.first().verified_email_entry + ) # No email is sent. self.assertEqual(len(mail.outbox), 0) # A message is added. @@ -2731,6 +2769,154 @@ def test_token_does_not_match(self): self.assertEqual(len(messages), 1) self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_link_invalid) + def test_account_exists_in_app_never_linked_to_user(self): + """The email already has an Account in the app, but it was never verified by a user.""" + email = "test@example.com" + # Create an unverified account. + account = factories.AccountFactory.create(email=email) + # Create an email entry and a token for this user. + email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email) + token = account_verification_token.make_token(email_entry) + # Set up the API call. + api_url = self.get_api_url(email) + self.anvil_response_mock.add(responses.GET, api_url, status=200, json=self.get_api_json_response(email)) + timestamp_threshold = timezone.now() + # Need a client because messages are added. + self.client.force_login(self.user) + response = self.client.get(self.get_url(email_entry.uuid, token), follow=True) + self.assertRedirects(response, "/test_home/") + # No new accounts are created. + self.assertEqual(models.Account.objects.count(), 1) + self.assertIn(account, models.Account.objects.all()) + account.refresh_from_db() + self.assertEqual(account.email, email) + self.assertEqual(account.user, self.user) + self.assertFalse(account.is_service_account) + self.assertEqual(account.verified_email_entry, email_entry) + self.assertEqual(account.status, models.Account.ACTIVE_STATUS) + # The UserEmailEntry is linked to this account. + email_entry.refresh_from_db() + self.assertEqual(email_entry.verified_account, account) + self.assertIsNotNone(email_entry.date_verified) + self.assertGreaterEqual(email_entry.date_verified, timestamp_threshold) + self.assertLessEqual(email_entry.date_verified, timezone.now()) + # A message is added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_success) + + def test_account_exists_in_app_unlinked_from_user(self): + email = "test@example.com" + # Create an account that had previously been verified and then unlinked from the original user. + account = factories.AccountFactory.create(email=email, verified=True) + account.unlink_user() + # Create an email entry and a token for this user. + email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email) + token = account_verification_token.make_token(email_entry) + # No API calls are made, so do not add a mocked response. + # Need a client because messages are added. + self.client.force_login(self.user) + response = self.client.get(self.get_url(email_entry.uuid, token), follow=True) + self.assertRedirects(response, "/test_home/") + # No new accounts are created. + self.assertEqual(models.Account.objects.count(), 1) + self.assertIn(account, models.Account.objects.all()) + account.refresh_from_db() + # The existing account has not been changed. + self.assertIsNone(account.user) + self.assertIsNone(account.verified_email_entry) + # A message is added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_account_already_exists) + + def test_account_exists_in_app_is_service_account(self): + email = "test@example.com" + # Create an account that had previously been verified and then unlinked from the original user. + account = factories.AccountFactory.create(email=email, is_service_account=True) + # Create an email entry and a token for this user. + email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email) + token = account_verification_token.make_token(email_entry) + # No API calls are made, so do not add a mocked response. + # Need a client because messages are added. + self.client.force_login(self.user) + response = self.client.get(self.get_url(email_entry.uuid, token), follow=True) + self.assertRedirects(response, "/test_home/") + # No new accounts are created. + self.assertEqual(models.Account.objects.count(), 1) + self.assertIn(account, models.Account.objects.all()) + account.refresh_from_db() + # The existing account has not been changed. + self.assertIsNone(account.user) + self.assertIsNone(account.verified_email_entry) + # A message is added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_service_account) + + def test_account_exists_in_app_deactivated_never_linked_to_user(self): + """The email already has a deactivated Account in the app, but it was never verified by a user.""" + email = "test@example.com" + # Create an unverified account. + account = factories.AccountFactory.create(email=email) + account.deactivate() + # Create an email entry and a token for this user. + email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email) + token = account_verification_token.make_token(email_entry) + # Set up the API call. + api_url = self.get_api_url(email) + self.anvil_response_mock.add(responses.GET, api_url, status=200, json=self.get_api_json_response(email)) + timestamp_threshold = timezone.now() + # Need a client because messages are added. + self.client.force_login(self.user) + response = self.client.get(self.get_url(email_entry.uuid, token), follow=True) + self.assertRedirects(response, "/test_home/") + # No new accounts are created. + self.assertEqual(models.Account.objects.count(), 1) + self.assertIn(account, models.Account.objects.all()) + account.refresh_from_db() + self.assertEqual(account.email, email) + self.assertEqual(account.user, self.user) + self.assertFalse(account.is_service_account) + self.assertEqual(account.verified_email_entry, email_entry) + self.assertEqual(account.status, models.Account.INACTIVE_STATUS) + # The UserEmailEntry is linked to this account. + email_entry.refresh_from_db() + self.assertEqual(email_entry.verified_account, account) + self.assertIsNotNone(email_entry.date_verified) + self.assertGreaterEqual(email_entry.date_verified, timestamp_threshold) + self.assertLessEqual(email_entry.date_verified, timezone.now()) + # A message is added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_success) + + def test_account_exists_in_app_deactivated_unlinked_from_user(self): + email = "test@example.com" + # Create an account that had previously been verified and then unlinked from the original user. + account = factories.AccountFactory.create(email=email, verified=True) + account.unlink_user() + account.deactivate() + # Create an email entry and a token for this user. + email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email) + token = account_verification_token.make_token(email_entry) + # No API calls are made, so do not add a mocked response. + # Need a client because messages are added. + self.client.force_login(self.user) + response = self.client.get(self.get_url(email_entry.uuid, token), follow=True) + self.assertRedirects(response, "/test_home/") + # No new accounts are created. + self.assertEqual(models.Account.objects.count(), 1) + self.assertIn(account, models.Account.objects.all()) + account.refresh_from_db() + # The existing account has not been changed. + self.assertIsNone(account.user) + self.assertIsNone(account.verified_email_entry) + # A message is added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_account_already_exists) + def test_different_user_verified_this_email(self): """The email has already been verified by a different user.""" email = "test@example.com" diff --git a/anvil_consortium_manager/views.py b/anvil_consortium_manager/views.py index 7910df36..1b785b59 100644 --- a/anvil_consortium_manager/views.py +++ b/anvil_consortium_manager/views.py @@ -283,11 +283,16 @@ def form_valid(self, form): # Check if this email has an account already linked to a different user. # Don't need to check the user, because a user who has already linked their account shouldn't get here. - if models.Account.objects.filter(email=email).count(): + if models.Account.objects.filter(email=email, user__isnull=False).count(): # The user already has a linked account, so redirect with a message. messages.add_message(self.request, messages.ERROR, self.message_account_already_exists) return HttpResponseRedirect(self.get_redirect_url()) + if models.AccountUserArchive.objects.filter(account__email=email).exists(): + # The Account was already linked to a previous user, so redirect with a message. + messages.add_message(self.request, messages.ERROR, self.message_account_already_exists) + return HttpResponseRedirect(self.get_redirect_url()) + # Check if it exists on AnVIL. try: anvil_account_exists = email_entry.anvil_account_exists() @@ -314,6 +319,7 @@ class AccountLinkVerify(auth.AnVILConsortiumManagerAccountLinkRequired, Redirect message_link_invalid = "AnVIL account verification link is invalid." message_account_already_exists = "An AnVIL Account with this email already exists in this app." message_account_does_not_exist = "This account does not exist on AnVIL." + message_service_account = "Account is already marked as a service account." message_success = get_account_adapter().account_link_verify_message def get_redirect_url(self, *args, **kwargs): @@ -335,23 +341,39 @@ def get(self, request, *args, **kwargs): return super().get(request, *args, **kwargs) # Check if the email is already linked to an account. - if models.Account.objects.filter(email=email_entry.email).count(): + if models.Account.objects.filter(email=email_entry.email, user__isnull=False).count(): messages.add_message(self.request, messages.ERROR, self.message_account_already_exists) return super().get(request, *args, **kwargs) - # Check that the token maches. + # Check if any user was previously linked to this account. + if models.AccountUserArchive.objects.filter(account__email=email_entry.email).exists(): + messages.add_message(self.request, messages.ERROR, self.message_account_already_exists) + return super().get(request, *args, **kwargs) + + # Check if the account is a service account. + if models.Account.objects.filter(email=email_entry.email, is_service_account=True).count(): + messages.add_message(self.request, messages.ERROR, self.message_service_account) + return super().get(request, *args, **kwargs) + + # Check that the token matches. if not account_verification_token.check_token(email_entry, token): messages.add_message(self.request, messages.ERROR, self.message_link_invalid) return super().get(request, *args, **kwargs) # Create an account for this user from this email. - account = models.Account( - user=self.request.user, - email=email_entry.email, - status=models.Account.ACTIVE_STATUS, - is_service_account=False, - verified_email_entry=email_entry, - ) + try: + account = models.Account.objects.get(email=email_entry.email) + account.verified_email_entry = email_entry + account.user = request.user + except models.Account.DoesNotExist: + account = models.Account( + user=self.request.user, + email=email_entry.email, + status=models.Account.ACTIVE_STATUS, + is_service_account=False, + verified_email_entry=email_entry, + ) + # Make sure an AnVIL account still exists. try: anvil_account_exists = account.anvil_exists()