Skip to content

Commit

Permalink
Merge pull request #550 from UW-GAC/feature/link-existing-account
Browse files Browse the repository at this point in the history
Allow users to link an existing account
  • Loading branch information
amstilp authored Dec 9, 2024
2 parents b2d83db + f4ec4a2 commit 8ed7d23
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change log

## Devel

* Allow a user to link an Account that is not or has never been linked to another user.

## 0.26.1 (2024-12-02)

* Bugfix: `run_anvil_audit` now raises a `CommandException` if an API call fails.
Expand Down
2 changes: 1 addition & 1 deletion anvil_consortium_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.26.1"
__version__ = "0.27.0.dev0"
188 changes: 187 additions & 1 deletion anvil_consortium_manager/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2341,14 +2341,52 @@ def test_blank_email(self):
def test_account_exists_with_email_but_not_linked_to_user(self):
"""An Account with this email exists but is not linked to a user."""
email = "[email protected]"
# Create an account with this email.
factories.AccountFactory.create(email=email)
api_url = self.get_api_url(email)
self.anvil_response_mock.add(responses.GET, api_url, status=200, json=self.get_api_json_response(email))
timestamp_lower_limit = timezone.now()
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.post(self.get_url(), {"email": email})
self.assertEqual(response.status_code, 302)
# A new UserEmailEntry is created.
self.assertEqual(models.UserEmailEntry.objects.count(), 1)
# The new UserEmailentry is linked to the logged-in user.
new_object = models.UserEmailEntry.objects.latest("pk")
self.assertEqual(new_object.email, email)
self.assertEqual(new_object.user, self.user)
self.assertIsNotNone(new_object.date_verification_email_sent)
self.assertGreaterEqual(new_object.date_verification_email_sent, timestamp_lower_limit)
self.assertLessEqual(new_object.date_verification_email_sent, timezone.now())
self.assertIsNone(new_object.date_verified)
# No account is linked.
with self.assertRaises(ObjectDoesNotExist):
new_object.verified_account
# History is added.
self.assertEqual(new_object.history.count(), 1)
self.assertEqual(new_object.history.latest().history_type, "+")
# One message has been sent.
self.assertEqual(len(mail.outbox), 1)
# The subject is correct.
self.assertEqual(mail.outbox[0].subject, "Verify your AnVIL account email")

def test_account_exists_previously_linked_to_user(self):
"""An Account with this email exists but is not linked to a user."""
email = "[email protected]"
# Create an account with this email, and unlink it.
account = factories.AccountFactory.create(email=email, verified=True)
account.unlink_user()
# No API call should be made, so do not add a mocked response.
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.post(self.get_url(), {"email": email}, follow=True)
self.assertRedirects(response, "/test_home/")
# No new UserEmailEntry is created.
self.assertEqual(models.UserEmailEntry.objects.count(), 0)
self.assertEqual(models.UserEmailEntry.objects.count(), 1)
self.assertEqual(
models.UserEmailEntry.objects.latest("pk"), account.accountuserarchive_set.first().verified_email_entry
)
# No email is sent.
self.assertEqual(len(mail.outbox), 0)
# A message is added.
Expand Down Expand Up @@ -2731,6 +2769,154 @@ def test_token_does_not_match(self):
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_link_invalid)

def test_account_exists_in_app_never_linked_to_user(self):
"""The email already has an Account in the app, but it was never verified by a user."""
email = "[email protected]"
# Create an unverified account.
account = factories.AccountFactory.create(email=email)
# Create an email entry and a token for this user.
email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email)
token = account_verification_token.make_token(email_entry)
# Set up the API call.
api_url = self.get_api_url(email)
self.anvil_response_mock.add(responses.GET, api_url, status=200, json=self.get_api_json_response(email))
timestamp_threshold = timezone.now()
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.get(self.get_url(email_entry.uuid, token), follow=True)
self.assertRedirects(response, "/test_home/")
# No new accounts are created.
self.assertEqual(models.Account.objects.count(), 1)
self.assertIn(account, models.Account.objects.all())
account.refresh_from_db()
self.assertEqual(account.email, email)
self.assertEqual(account.user, self.user)
self.assertFalse(account.is_service_account)
self.assertEqual(account.verified_email_entry, email_entry)
self.assertEqual(account.status, models.Account.ACTIVE_STATUS)
# The UserEmailEntry is linked to this account.
email_entry.refresh_from_db()
self.assertEqual(email_entry.verified_account, account)
self.assertIsNotNone(email_entry.date_verified)
self.assertGreaterEqual(email_entry.date_verified, timestamp_threshold)
self.assertLessEqual(email_entry.date_verified, timezone.now())
# A message is added.
messages = [m.message for m in get_messages(response.wsgi_request)]
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_success)

def test_account_exists_in_app_unlinked_from_user(self):
email = "[email protected]"
# Create an account that had previously been verified and then unlinked from the original user.
account = factories.AccountFactory.create(email=email, verified=True)
account.unlink_user()
# Create an email entry and a token for this user.
email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email)
token = account_verification_token.make_token(email_entry)
# No API calls are made, so do not add a mocked response.
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.get(self.get_url(email_entry.uuid, token), follow=True)
self.assertRedirects(response, "/test_home/")
# No new accounts are created.
self.assertEqual(models.Account.objects.count(), 1)
self.assertIn(account, models.Account.objects.all())
account.refresh_from_db()
# The existing account has not been changed.
self.assertIsNone(account.user)
self.assertIsNone(account.verified_email_entry)
# A message is added.
messages = [m.message for m in get_messages(response.wsgi_request)]
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_account_already_exists)

def test_account_exists_in_app_is_service_account(self):
email = "[email protected]"
# Create an account that had previously been verified and then unlinked from the original user.
account = factories.AccountFactory.create(email=email, is_service_account=True)
# Create an email entry and a token for this user.
email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email)
token = account_verification_token.make_token(email_entry)
# No API calls are made, so do not add a mocked response.
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.get(self.get_url(email_entry.uuid, token), follow=True)
self.assertRedirects(response, "/test_home/")
# No new accounts are created.
self.assertEqual(models.Account.objects.count(), 1)
self.assertIn(account, models.Account.objects.all())
account.refresh_from_db()
# The existing account has not been changed.
self.assertIsNone(account.user)
self.assertIsNone(account.verified_email_entry)
# A message is added.
messages = [m.message for m in get_messages(response.wsgi_request)]
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_service_account)

def test_account_exists_in_app_deactivated_never_linked_to_user(self):
"""The email already has a deactivated Account in the app, but it was never verified by a user."""
email = "[email protected]"
# Create an unverified account.
account = factories.AccountFactory.create(email=email)
account.deactivate()
# Create an email entry and a token for this user.
email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email)
token = account_verification_token.make_token(email_entry)
# Set up the API call.
api_url = self.get_api_url(email)
self.anvil_response_mock.add(responses.GET, api_url, status=200, json=self.get_api_json_response(email))
timestamp_threshold = timezone.now()
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.get(self.get_url(email_entry.uuid, token), follow=True)
self.assertRedirects(response, "/test_home/")
# No new accounts are created.
self.assertEqual(models.Account.objects.count(), 1)
self.assertIn(account, models.Account.objects.all())
account.refresh_from_db()
self.assertEqual(account.email, email)
self.assertEqual(account.user, self.user)
self.assertFalse(account.is_service_account)
self.assertEqual(account.verified_email_entry, email_entry)
self.assertEqual(account.status, models.Account.INACTIVE_STATUS)
# The UserEmailEntry is linked to this account.
email_entry.refresh_from_db()
self.assertEqual(email_entry.verified_account, account)
self.assertIsNotNone(email_entry.date_verified)
self.assertGreaterEqual(email_entry.date_verified, timestamp_threshold)
self.assertLessEqual(email_entry.date_verified, timezone.now())
# A message is added.
messages = [m.message for m in get_messages(response.wsgi_request)]
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_success)

def test_account_exists_in_app_deactivated_unlinked_from_user(self):
email = "[email protected]"
# Create an account that had previously been verified and then unlinked from the original user.
account = factories.AccountFactory.create(email=email, verified=True)
account.unlink_user()
account.deactivate()
# Create an email entry and a token for this user.
email_entry = factories.UserEmailEntryFactory.create(user=self.user, email=email)
token = account_verification_token.make_token(email_entry)
# No API calls are made, so do not add a mocked response.
# Need a client because messages are added.
self.client.force_login(self.user)
response = self.client.get(self.get_url(email_entry.uuid, token), follow=True)
self.assertRedirects(response, "/test_home/")
# No new accounts are created.
self.assertEqual(models.Account.objects.count(), 1)
self.assertIn(account, models.Account.objects.all())
account.refresh_from_db()
# The existing account has not been changed.
self.assertIsNone(account.user)
self.assertIsNone(account.verified_email_entry)
# A message is added.
messages = [m.message for m in get_messages(response.wsgi_request)]
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), views.AccountLinkVerify.message_account_already_exists)

def test_different_user_verified_this_email(self):
"""The email has already been verified by a different user."""
email = "[email protected]"
Expand Down
42 changes: 32 additions & 10 deletions anvil_consortium_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,16 @@ def form_valid(self, form):

# Check if this email has an account already linked to a different user.
# Don't need to check the user, because a user who has already linked their account shouldn't get here.
if models.Account.objects.filter(email=email).count():
if models.Account.objects.filter(email=email, user__isnull=False).count():
# The user already has a linked account, so redirect with a message.
messages.add_message(self.request, messages.ERROR, self.message_account_already_exists)
return HttpResponseRedirect(self.get_redirect_url())

if models.AccountUserArchive.objects.filter(account__email=email).exists():
# The Account was already linked to a previous user, so redirect with a message.
messages.add_message(self.request, messages.ERROR, self.message_account_already_exists)
return HttpResponseRedirect(self.get_redirect_url())

# Check if it exists on AnVIL.
try:
anvil_account_exists = email_entry.anvil_account_exists()
Expand All @@ -314,6 +319,7 @@ class AccountLinkVerify(auth.AnVILConsortiumManagerAccountLinkRequired, Redirect
message_link_invalid = "AnVIL account verification link is invalid."
message_account_already_exists = "An AnVIL Account with this email already exists in this app."
message_account_does_not_exist = "This account does not exist on AnVIL."
message_service_account = "Account is already marked as a service account."
message_success = get_account_adapter().account_link_verify_message

def get_redirect_url(self, *args, **kwargs):
Expand All @@ -335,23 +341,39 @@ def get(self, request, *args, **kwargs):
return super().get(request, *args, **kwargs)

# Check if the email is already linked to an account.
if models.Account.objects.filter(email=email_entry.email).count():
if models.Account.objects.filter(email=email_entry.email, user__isnull=False).count():
messages.add_message(self.request, messages.ERROR, self.message_account_already_exists)
return super().get(request, *args, **kwargs)

# Check that the token maches.
# Check if any user was previously linked to this account.
if models.AccountUserArchive.objects.filter(account__email=email_entry.email).exists():
messages.add_message(self.request, messages.ERROR, self.message_account_already_exists)
return super().get(request, *args, **kwargs)

# Check if the account is a service account.
if models.Account.objects.filter(email=email_entry.email, is_service_account=True).count():
messages.add_message(self.request, messages.ERROR, self.message_service_account)
return super().get(request, *args, **kwargs)

# Check that the token matches.
if not account_verification_token.check_token(email_entry, token):
messages.add_message(self.request, messages.ERROR, self.message_link_invalid)
return super().get(request, *args, **kwargs)

# Create an account for this user from this email.
account = models.Account(
user=self.request.user,
email=email_entry.email,
status=models.Account.ACTIVE_STATUS,
is_service_account=False,
verified_email_entry=email_entry,
)
try:
account = models.Account.objects.get(email=email_entry.email)
account.verified_email_entry = email_entry
account.user = request.user
except models.Account.DoesNotExist:
account = models.Account(
user=self.request.user,
email=email_entry.email,
status=models.Account.ACTIVE_STATUS,
is_service_account=False,
verified_email_entry=email_entry,
)

# Make sure an AnVIL account still exists.
try:
anvil_account_exists = account.anvil_exists()
Expand Down

0 comments on commit 8ed7d23

Please sign in to comment.