From a341a6eaa05c27352ee82447b098afac0a1b7b65 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 21 Jun 2024 09:28:13 -0700 Subject: [PATCH] Add view to resolve SignedAgreement Accessor audit results Split the audit explanation into its own template, and include it in the template for both the audit view and the resolve view. --- primed/cdsa/tests/test_views.py | 782 ++++++++++++++++++ primed/cdsa/urls.py | 9 +- primed/cdsa/views.py | 98 +++ .../cdsa/signedagreement_accessor_audit.html | 30 +- ...ignedagreement_accessor_audit_resolve.html | 54 ++ .../snippets/accessor_audit_explanation.html | 29 + 6 files changed, 971 insertions(+), 31 deletions(-) create mode 100644 primed/templates/cdsa/signedagreement_accessor_audit_resolve.html create mode 100644 primed/templates/cdsa/snippets/accessor_audit_explanation.html diff --git a/primed/cdsa/tests/test_views.py b/primed/cdsa/tests/test_views.py index 766e2c1d..5bece4fa 100644 --- a/primed/cdsa/tests/test_views.py +++ b/primed/cdsa/tests/test_views.py @@ -5,6 +5,7 @@ import responses from anvil_consortium_manager.models import ( AnVILProjectManagerAccess, + GroupAccountMembership, GroupGroupMembership, ManagedGroup, Workspace, @@ -7151,6 +7152,787 @@ def test_context_error_table_has_access(self): ) +class SignedAgreementAccessorAuditResolveTest(AnVILAPIMockTestMixin, TestCase): + """Tests for the SignedAgreementAccessorAuditResolve view.""" + + def setUp(self): + """Set up test class.""" + super().setUp() + self.factory = RequestFactory() + # Create a user with both view and edit permission. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get(codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME) + ) + self.user.user_permissions.add( + Permission.objects.get(codename=AnVILProjectManagerAccess.STAFF_EDIT_PERMISSION_CODENAME) + ) + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse( + "cdsa:audit:signed_agreements:accessors:resolve", + args=args, + ) + + def get_view(self): + """Return the view being tested.""" + return views.SignedAgreementAccessorAuditResolve.as_view() + + def test_view_redirect_not_logged_in(self): + "View redirects to login view when user is not logged in." + # Need a client for redirects. + response = self.client.get(self.get_url(1, "foo")) + self.assertRedirects( + response, + resolve_url(settings.LOGIN_URL) + "?next=" + self.get_url(1, "foo"), + ) + + def test_status_code_with_user_permission_staff_edit(self): + """Returns successful response code if the user has staff edit permission.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + self.client.force_login(self.user) + response = self.client.get(self.get_url(member_agreement.signed_agreement.cc_id, account.email)) + self.assertEqual(response.status_code, 200) + + def test_status_code_with_user_permission_staff_view(self): + """Returns 403 response code if the user has staff view permission.""" + user_view = User.objects.create_user(username="test-view", password="test-view") + user_view.user_permissions.add( + Permission.objects.get(codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME) + ) + self.client.force_login(self.user) + request = self.factory.get(self.get_url(1, "foo")) + request.user = user_view + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_status_code_with_user_permission_view(self): + """Returns forbidden response code if the user has view permission.""" + user = User.objects.create_user(username="test-none", password="test-none") + user.user_permissions.add(Permission.objects.get(codename=AnVILProjectManagerAccess.VIEW_PERMISSION_CODENAME)) + request = self.factory.get(self.get_url(1, "foo")) + request.user = user + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_access_without_user_permission(self): + """Raises permission denied if user has no permissions.""" + user_no_perms = User.objects.create_user(username="test-none", password="test-none") + request = self.factory.get(self.get_url(1, "foo")) + request.user = user_no_perms + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_member_agreement_does_not_exist(self): + """Raises a 404 error with an invalid object member_agreement_pk.""" + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + 1, + "foo", + ) + ) + self.assertEqual(response.status_code, 404) + + def test_email_is_user(self): + """View returns successful response code when the email is a user.""" + member_agreement = factories.MemberAgreementFactory.create() + user = UserFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + user.username, + ) + ) + self.assertEqual(response.status_code, 200) + + def test_email_is_user_case_insensitive(self): + """View returns successful response code when the email is a user.""" + member_agreement = factories.MemberAgreementFactory.create() + UserFactory.create(username="foo@bar.com") + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + "FOO@BAR.com", + ) + ) + self.assertEqual(response.status_code, 200) + + def test_email_is_account(self): + """View returns successful response code when the email is an account.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertEqual(response.status_code, 200) + + def test_email_is_account_case_insensitive(self): + """View returns successful response code when the email is an account.""" + member_agreement = factories.MemberAgreementFactory.create() + AccountFactory.create(email="foo@bar.com") + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + "FOO@BAR.com", + ) + ) + self.assertEqual(response.status_code, 200) + + def test_email_is_group(self): + """View returns successful response code when the email is a group.""" + member_agreement = factories.MemberAgreementFactory.create() + group = ManagedGroupFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + group.email, + ) + ) + self.assertEqual(response.status_code, 200) + + def test_email_is_group_case_insensitive(self): + """View returns successful response code when the email is a group.""" + member_agreement = factories.MemberAgreementFactory.create() + ManagedGroupFactory.create(email="foo@bar.com") + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + "FOO@BAR.com", + ) + ) + self.assertEqual(response.status_code, 200) + + def test_get_email_not_found(self): + """get request raises a 404 error with an non-existent email.""" + member_agreement = factories.MemberAgreementFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + "foo@bar.com", + ) + ) + self.assertEqual(response.status_code, 404) + + def test_get_context_audit_result(self): + """The data_access_audit exists in the context.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + self.assertIsInstance( + response.context_data["audit_result"], + accessor_audit.AccessorAuditResult, + ) + + def test_get_context_signed_agreement_member(self): + """The signed_agreement exists in the context.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("signed_agreement", response.context_data) + self.assertEqual(response.context_data["signed_agreement"], member_agreement.signed_agreement) + + def test_get_context_signed_agreement_data_affiliate(self): + """The signed_agreement exists in the context.""" + data_affiliate_agreement = factories.DataAffiliateAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + data_affiliate_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("signed_agreement", response.context_data) + self.assertEqual(response.context_data["signed_agreement"], data_affiliate_agreement.signed_agreement) + + def test_get_context_signed_agreement_non_data_affiliate(self): + """The signed_agreement exists in the context.""" + non_data_affiliate_agreement = factories.NonDataAffiliateAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + non_data_affiliate_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("signed_agreement", response.context_data) + self.assertEqual(response.context_data["signed_agreement"], non_data_affiliate_agreement.signed_agreement) + + def test_get_context_email(self): + """email exists in the context.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("email", response.context_data) + self.assertEqual(response.context_data["email"], account.email) + + def test_get_verified_access(self): + """Get request with verified access.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, accessor_audit.VerifiedAccess) + self.assertEqual(audit_result.signed_agreement, member_agreement.signed_agreement) + self.assertEqual(audit_result.member, account) + self.assertEqual(audit_result.user, account.user) + self.assertEqual(audit_result.note, accessor_audit.SignedAgreementAccessorAudit.ACCESSOR_IN_ACCESS_GROUP) + self.assertIsNone(audit_result.action) + + def test_get_verified_no_access(self): + """verified_table shows a record when audit has verified no access.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, accessor_audit.VerifiedNoAccess) + self.assertEqual(audit_result.signed_agreement, member_agreement.signed_agreement) + self.assertEqual(audit_result.member, account) + self.assertEqual(audit_result.user, account.user) + self.assertEqual(audit_result.note, accessor_audit.SignedAgreementAccessorAudit.NOT_ACCESSOR) + self.assertIsNone(audit_result.action) + + def test_get_grant_access(self): + """Get request with grant access.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, accessor_audit.GrantAccess) + self.assertEqual(audit_result.signed_agreement, member_agreement.signed_agreement) + self.assertEqual(audit_result.member, account) + self.assertEqual(audit_result.user, account.user) + self.assertEqual(audit_result.note, accessor_audit.SignedAgreementAccessorAudit.ACCESSOR_LINKED_ACCOUNT) + self.assertIsNotNone(audit_result.action) + + def test_get_remove_access(self): + """Get request with remove access.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, accessor_audit.RemoveAccess) + self.assertEqual(audit_result.signed_agreement, member_agreement.signed_agreement) + self.assertEqual(audit_result.member, account) + self.assertEqual(audit_result.user, account.user) + self.assertEqual(audit_result.note, accessor_audit.SignedAgreementAccessorAudit.NOT_ACCESSOR) + self.assertIsNotNone(audit_result.action) + + def test_post_email_not_found(self): + """post request raises a 404 error with an non-existent email.""" + member_agreement = factories.MemberAgreementFactory.create() + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + "foo@bar.com", + ), + {}, + ) + self.assertEqual(response.status_code, 404) + + def test_post_verified_access_user(self): + """post with VerifiedAccess audit result with a user email.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + date_created = timezone.now() - timedelta(weeks=3) + with freeze_time(date_created): + membership = GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.user.username, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + # Membership hasn't changed. + self.assertEqual(GroupAccountMembership.objects.count(), 1) + membership.refresh_from_db() + self.assertEqual(membership.created, date_created) + self.assertEqual(membership.group, member_agreement.signed_agreement.anvil_access_group) + self.assertEqual(membership.account, account) + + def test_post_verified_access_account(self): + """post with VerifiedAccess audit result.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + date_created = timezone.now() - timedelta(weeks=3) + with freeze_time(date_created): + membership = GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + # Membership hasn't changed. + self.assertEqual(GroupAccountMembership.objects.count(), 1) + membership.refresh_from_db() + self.assertEqual(membership.created, date_created) + self.assertEqual(membership.group, member_agreement.signed_agreement.anvil_access_group) + self.assertEqual(membership.account, account) + + def test_post_verified_no_access_user_email(self): + """post with VerifiedNoAccess audit result.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create(verified=True) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.user.username, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + self.assertEqual(GroupAccountMembership.objects.count(), 0) + + def test_post_verified_no_access_account_email(self): + """post with VerifiedNoAccess audit result.""" + member_agreement = factories.MemberAgreementFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + self.assertEqual(GroupAccountMembership.objects.count(), 0) + + def test_post_verified_no_access_group_email(self): + """post with VerifiedNoAccess audit result.""" + member_agreement = factories.MemberAgreementFactory.create() + group = ManagedGroupFactory.create() + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + group.email, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + self.assertEqual(GroupGroupMembership.objects.count(), 0) + + def test_post_grant_access_user_email(self): + """post with GrantAccess audit result.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.user.username, + ), + {}, + ) + # The GroupGroup membership was created. + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + self.assertEqual(GroupAccountMembership.objects.count(), 1) + membership = GroupAccountMembership.objects.get( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.assertEqual(membership.role, membership.MEMBER) + + def test_post_grant_access_account_email(self): + """post with GrantAccess audit result.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + ) + # The GroupGroup membership was created. + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + self.assertEqual(GroupAccountMembership.objects.count(), 1) + membership = GroupAccountMembership.objects.get( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.assertEqual(membership.role, membership.MEMBER) + + def test_post_grant_access_htmx(self): + """Context with GrantAccess.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + **header, + ) + # The membership was created. + self.assertEqual(response.content.decode(), views.SignedAgreementAccessorAuditResolve.htmx_success) + membership = GroupAccountMembership.objects.get( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + self.assertEqual(membership.role, membership.MEMBER) + + def test_post_remove_access_user_email(self): + """post request with remove access for an user.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create(verified=True) + membership = GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.user.username, + ), + {}, + ) + # The GroupGroup membership was created. + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + # Make sure the membership has been deleted. + with self.assertRaises(GroupAccountMembership.DoesNotExist): + membership.refresh_from_db() + + def test_post_remove_access_account_email(self): + """post request with remove access for an user.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create() + membership = GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + # Make sure the membership has been deleted. + with self.assertRaises(GroupAccountMembership.DoesNotExist): + membership.refresh_from_db() + + def test_post_remove_access_group_email(self): + """post request with remove access for an user.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + group = ManagedGroupFactory.create() + membership = GroupGroupMembershipFactory.create( + parent_group=member_agreement.signed_agreement.anvil_access_group, + child_group=group, + ) + # Add API response + api_url = self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{group.email}" + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + group.email, + ), + {}, + ) + self.assertRedirects(response, member_agreement.signed_agreement.get_absolute_url()) + # Make sure the membership has been deleted. + with self.assertRaises(GroupGroupMembership.DoesNotExist): + membership.refresh_from_db() + + def test_anvil_api_error_grant(self): + """AnVIL API errors are properly handled.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + ) + self.assertEqual(response.status_code, 200) + # No group membership was created. + self.assertEqual(GroupAccountMembership.objects.count(), 0) + # Audit result is still GrantAccess. + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, accessor_audit.GrantAccess) + # A message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertIn("AnVIL API Error", str(messages[0])) + + def test_anvil_api_error_grant_htmx(self): + """AnVIL API errors are properly handled with htmx.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create(verified=True) + member_agreement.signed_agreement.accessors.add(account.user) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + # Check the response. + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + **header, + ) + self.assertEqual(response.content.decode(), views.SignedAgreementAccessorAuditResolve.htmx_error) + # No membership was created. + self.assertEqual(GroupAccountMembership.objects.count(), 0) + # No messages were added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 0) + + def test_anvil_api_error_remove(self): + """AnVIL API errors are properly handled.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create() + membership = GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + # Add API response + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + ) + self.assertEqual(response.status_code, 200) + # The group-group membership still exists. + membership.refresh_from_db() + # Audit result is still RemoveAccess. + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, accessor_audit.RemoveAccess) + # A message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertIn("AnVIL API Error", str(messages[0])) + + def test_anvil_api_error_remove_htmx(self): + """AnVIL API errors are properly handled.""" + member_agreement = factories.MemberAgreementFactory.create(signed_agreement__cc_id=1234) + account = AccountFactory.create() + membership = GroupAccountMembershipFactory.create( + group=member_agreement.signed_agreement.anvil_access_group, + account=account, + ) + # Add API response + api_url = ( + self.api_client.sam_entry_point + f"/api/groups/v1/TEST_PRIMED_CDSA_ACCESS_1234/member/{account.email}" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + member_agreement.signed_agreement.cc_id, + account.email, + ), + {}, + **header, + ) + self.assertEqual(response.content.decode(), views.SignedAgreementAccessorAuditResolve.htmx_error) + # The group-group membership still exists. + membership.refresh_from_db() + # No messages was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 0) + + class CDSAWorkspaceAuditTest(TestCase): """Tests for the SignedAgreementAudit view.""" diff --git a/primed/cdsa/urls.py b/primed/cdsa/urls.py index d3ece8ec..ad993ffc 100644 --- a/primed/cdsa/urls.py +++ b/primed/cdsa/urls.py @@ -138,7 +138,7 @@ [ path("", views.SignedAgreementAudit.as_view(), name="all"), path( - "/resolve/", + "resolve//", views.SignedAgreementAuditResolve.as_view(), name="resolve", ), @@ -149,6 +149,11 @@ signed_agreement_accessor_audit_patterns = ( [ path("", views.SignedAgreementAccessorAudit.as_view(), name="all"), + path( + "resolve///", + views.SignedAgreementAccessorAuditResolve.as_view(), + name="resolve", + ), ], "accessors", ) @@ -164,7 +169,7 @@ [ path("", views.CDSAWorkspaceAudit.as_view(), name="all"), path( - "//resolve/", + "resolve///", views.CDSAWorkspaceAuditResolve.as_view(), name="resolve", ), diff --git a/primed/cdsa/views.py b/primed/cdsa/views.py index 432eb8bb..269c3a0a 100644 --- a/primed/cdsa/views.py +++ b/primed/cdsa/views.py @@ -7,6 +7,7 @@ AnVILProjectManagerAccess, ) from anvil_consortium_manager.models import ( + Account, GroupAccountMembership, GroupGroupMembership, ManagedGroup, @@ -788,6 +789,103 @@ def get_context_data(self, **kwargs): return context +class SignedAgreementAccessorAuditResolve(AnVILConsortiumManagerStaffEditRequired, FormView): + form_class = Form + template_name = "cdsa/signedagreement_accessor_audit_resolve.html" + htmx_success = """ Handled!""" + htmx_error = """ Error!""" + + def get_signed_agreement(self): + """Look up the SignedAgreement by cc_id.""" + try: + obj = models.SignedAgreement.objects.get(cc_id=self.kwargs.get("cc_id")) + except models.SignedAgreement.DoesNotExist: + raise Http404("No SignedAgreements found matching the query") + return obj + + def get_email(self): + return self.kwargs.get("email") + + def get_audit_result(self): + audit = accessor_audit.SignedAgreementAccessorAudit( + queryset=models.SignedAgreement.objects.filter(pk=self.signed_agreement.pk) + ) + # No way to include a queryset of members at this point - need to call the sub method directly. + audit.audit_agreement_and_object(self.signed_agreement, self.email) + # Set to completed, because we are just running this one specific check. + audit.completed = True + return audit.get_all_results()[0] + + def get(self, request, *args, **kwargs): + self.signed_agreement = self.get_signed_agreement() + self.email = self.get_email() + try: + self.audit_result = self.get_audit_result() + except ValueError as e: + raise Http404(str(e)) + return super().get(request, *args, **kwargs) + + def post(self, request, *args, **kwargs): + self.signed_agreement = self.get_signed_agreement() + self.email = self.get_email() + try: + self.audit_result = self.get_audit_result() + except ValueError as e: + raise Http404(str(e)) + return super().post(request, *args, **kwargs) + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + context["signed_agreement"] = self.signed_agreement + context["email"] = self.email + context["audit_result"] = self.audit_result + return context + + def get_success_url(self): + return self.signed_agreement.get_absolute_url() + + def form_valid(self, form): + # Add or remove the user from the access group. + try: + with transaction.atomic(): + if isinstance(self.audit_result, accessor_audit.GrantAccess): + # Only accounts should be added to the access group, so we shouldn't need to check type. + membership = GroupAccountMembership( + group=self.signed_agreement.anvil_access_group, + account=self.audit_result.member, + role=GroupAccountMembership.MEMBER, + ) + membership.full_clean() + membership.save() + membership.anvil_create() + elif isinstance(self.audit_result, accessor_audit.RemoveAccess): + if isinstance(self.audit_result.member, Account): + membership = GroupAccountMembership.objects.get( + group=self.signed_agreement.anvil_access_group, + account=self.audit_result.member, + ) + elif isinstance(self.audit_result.member, ManagedGroup): + membership = GroupGroupMembership.objects.get( + parent_group=self.signed_agreement.anvil_access_group, + child_group=self.audit_result.member, + ) + membership.delete() + membership.anvil_delete() + else: + pass + except AnVILAPIError as e: + if self.request.htmx: + return HttpResponse(self.htmx_error) + else: + messages.error(self.request, "AnVIL API Error: " + str(e)) + return super().form_invalid(form) + # Otherwise, the audit resolution succeeded. + if self.request.htmx: + return HttpResponse(self.htmx_success) + else: + return super().form_valid(form) + + class RecordsIndex(TemplateView): """Index page for records.""" diff --git a/primed/templates/cdsa/signedagreement_accessor_audit.html b/primed/templates/cdsa/signedagreement_accessor_audit.html index 709c799c..952b730b 100644 --- a/primed/templates/cdsa/signedagreement_accessor_audit.html +++ b/primed/templates/cdsa/signedagreement_accessor_audit.html @@ -10,35 +10,7 @@

Signed Agreement accessor audit

Audit results

-
-

- To be added to a SignedAgreement access group, a user must have linked their AnVIL account and be named as an accessor on that agreement. - Access to shared workspaces containing SAG data is controlled by adding users' AnVIL accounts to the AnVIL access group associated with the agreement. -

-

The audit result categories are explained below. -

    - -
  • Verified includes the following:
  • -
      -
    • A named accessor who has not linked their AnVIL account.
    • -
    • A named accessor who has linked their AnVIL account and is in the AnVIL access group for this application.
    • -
    • A named accessor who has not linked their AnVIL account and is not in the AnVIL access group.
    • -
    - -
  • Needs action includes the following:
  • -
      -
    • A named accessor who has linked their AnVIL account and is not in the AnVIL access group for this application.
    • -
    • An account in the AnVIL access group for this application that is not linked to a collaborator.
    • -
    - -
  • Errors
  • -
      -
    • A different group is in the AnVIL access group for this application.
    • -
    -
-

-

Any errors should be reported!

-
+{% include "cdsa/snippets/accessor_audit_explanation.html" %} {% include "__audit_tables.html" with verified_table=verified_table needs_action_table=needs_action_table errors_table=errors_table %} diff --git a/primed/templates/cdsa/signedagreement_accessor_audit_resolve.html b/primed/templates/cdsa/signedagreement_accessor_audit_resolve.html new file mode 100644 index 00000000..6a6c8049 --- /dev/null +++ b/primed/templates/cdsa/signedagreement_accessor_audit_resolve.html @@ -0,0 +1,54 @@ +{% extends "anvil_consortium_manager/base.html" %} +{% load django_tables2 %} +{% load crispy_forms_tags %} + +{% block title %}Resolve dbGaP collaborator audit{% endblock %} + + +{% block content %} + +

Resolve Signed Agreement accessor audit

+ +
+ +
+ +

Audit results

+ +{% include "cdsa/snippets/accessor_audit_explanation.html" %} + +
+
+
Result
+

{{ audit_result }}

+ {% if audit_result.action %} +
+ + {% csrf_token %} + {{ form|crispy }} + + +
+ {% else %} + + {% endif %} +
+
+ +{% endblock content %} diff --git a/primed/templates/cdsa/snippets/accessor_audit_explanation.html b/primed/templates/cdsa/snippets/accessor_audit_explanation.html new file mode 100644 index 00000000..231269be --- /dev/null +++ b/primed/templates/cdsa/snippets/accessor_audit_explanation.html @@ -0,0 +1,29 @@ +
+

+ To be added to a SignedAgreement access group, a user must have linked their AnVIL account and be named as an accessor on that agreement. + Access to shared workspaces containing SAG data is controlled by adding users' AnVIL accounts to the AnVIL access group associated with the agreement. +

+

The audit result categories are explained below. +

    + +
  • Verified includes the following:
  • +
      +
    • A named accessor who has not linked their AnVIL account.
    • +
    • A named accessor who has linked their AnVIL account and is in the AnVIL access group for this application.
    • +
    • A named accessor who has not linked their AnVIL account and is not in the AnVIL access group.
    • +
    + +
  • Needs action includes the following:
  • +
      +
    • A named accessor who has linked their AnVIL account and is not in the AnVIL access group for this application.
    • +
    • An account in the AnVIL access group for this application that is not linked to a collaborator.
    • +
    + +
  • Errors
  • +
      +
    • A different group is in the AnVIL access group for this application.
    • +
    +
+

+

Any errors should be reported!

+