From a108f9df9176c4b69b43c7fcf6a126d145e8a230 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Mon, 26 Feb 2024 09:17:02 -0800 Subject: [PATCH] Add view to resolve a collab analysis workspace audit Add a view that runs an audit on a specific email and CollaborativeAnalysisWorkspace and (if a post request) handles that audit appopriately. The view can also handle htmx requests. Modify the workspace audit classes for CollaborativeAnalsyisWorkspaces as necessary to achieve this: add a str method that can be displayed on the detail page; set the "action" text for different classes with a default, and change the action_url to be the same for all the different types of audit result classes. Add a get_all_results method to the CollaborativeAnalysisWorkspaceAccessAudit class. --- primed/collaborative_analysis/audit.py | 184 ++- .../tests/test_audit.py | 72 +- .../tests/test_views.py | 1232 ++++++++++++++++- primed/collaborative_analysis/urls.py | 5 + primed/collaborative_analysis/views.py | 166 ++- primed/dbgap/views.py | 6 - .../collaborative_analysis/audit_resolve.html | 44 + 7 files changed, 1594 insertions(+), 115 deletions(-) create mode 100644 primed/templates/collaborative_analysis/audit_resolve.html diff --git a/primed/collaborative_analysis/audit.py b/primed/collaborative_analysis/audit.py index c9cc3492..8545570a 100644 --- a/primed/collaborative_analysis/audit.py +++ b/primed/collaborative_analysis/audit.py @@ -24,14 +24,17 @@ class AccessAuditResult: member: Union[Account, ManagedGroup] note: str has_access: bool + action: str = None def get_action_url(self): - """The URL that handles the action needed.""" - return None - - def get_action(self): - """An indicator of what action needs to be taken.""" - return None + return reverse( + "collaborative_analysis:audit:resolve", + args=[ + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + self.member.email, + ], + ) def get_table_dictionary(self): """Return a dictionary that can be used to populate an instance of `SignedAgreementAccessAuditTable`.""" @@ -40,7 +43,7 @@ def get_table_dictionary(self): "member": self.member, "has_access": self.has_access, "note": self.note, - "action": self.get_action(), + "action": self.action, "action_url": self.get_action_url(), } return row @@ -52,6 +55,9 @@ class VerifiedAccess(AccessAuditResult): has_access: bool = True + def __str__(self): + return f"Verified access: {self.note}" + @dataclass class VerifiedNoAccess(AccessAuditResult): @@ -59,33 +65,19 @@ class VerifiedNoAccess(AccessAuditResult): has_access: bool = False + def __str__(self): + return f"Verified no access: {self.note}" + @dataclass class GrantAccess(AccessAuditResult): """Audit results class for when an account should be granted access.""" has_access: bool = False + action: str = "Grant access" - def get_action(self): - return "Grant access" - - def get_action_url(self): - if isinstance(self.member, Account): - return reverse( - "anvil_consortium_manager:managed_groups:member_accounts:new_by_account", - args=[ - self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, - self.member.uuid, - ], - ) - else: - return reverse( - "anvil_consortium_manager:managed_groups:member_groups:new_by_child", - args=[ - self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, - self.member.name, - ], - ) + def __str__(self): + return f"Grant access: {self.note}" @dataclass @@ -93,27 +85,10 @@ class RemoveAccess(AccessAuditResult): """Audit results class for when access for an account should be removed.""" has_access: bool = True + action: str = "Remove access" - def get_action(self): - return "Remove access" - - def get_action_url(self): - if isinstance(self.member, Account): - return reverse( - "anvil_consortium_manager:managed_groups:member_accounts:delete", - args=[ - self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, - self.member.uuid, - ], - ) - else: - return reverse( - "anvil_consortium_manager:managed_groups:member_groups:delete", - args=[ - self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, - self.member.name, - ], - ) + def __str__(self): + return f"Remove access: {self.note}" class AccessAuditResultsTable(tables.Table): @@ -281,63 +256,83 @@ def _audit_workspace_and_account(self, collaborative_analysis_workspace, account # - analyst is in some but not all relevant source auth domains, and is not in the workspace auth domain. # - analyst is in none of the relevant source auth domains, and is not in the workspace auth domain. # - an account is in the workspace auth domain, but is not in the analyst group. - - # Check whether access is allowed. Start by assuming yes; set to false if the account should not have access. - access_allowed = True + # Get all groups for the account. account_groups = account.get_all_groups() - # Loop over all source workspaces. - for ( - source_workspace - ) in collaborative_analysis_workspace.source_workspaces.all(): - # Loop over all auth domains for that source workspace. - for source_auth_domain in source_workspace.authorization_domains.all(): - # If the user is not in the auth domain, they are not allowed to have access to the collab workspace. - # If so, break out of the loop - it is not necessary to check membership of the remaining auth domains. - # Note that this only breaks out of the inner loop. - # It would be more efficient to break out of the outer loop as well. - if source_auth_domain not in account_groups: - access_allowed = False - break + # Check whether the account is in the analyst group. + in_analyst_group = ( + collaborative_analysis_workspace.analyst_group in account_groups + ) # Check whether the account is in the auth domain of the collab workspace. in_auth_domain = ( collaborative_analysis_workspace.workspace.authorization_domains.first() in account_groups ) - # Determine the audit result. - print(access_allowed) - print(in_auth_domain) - if access_allowed and in_auth_domain: - self.verified.append( - VerifiedAccess( - collaborative_analysis_workspace=collaborative_analysis_workspace, - member=account, - note=self.IN_SOURCE_AUTH_DOMAINS, + if in_analyst_group: + # Check whether access is allowed. Start by assuming yes, and then + # set to false if the account should not have access. + access_allowed = True + # Loop over all source workspaces. + for ( + source_workspace + ) in collaborative_analysis_workspace.source_workspaces.all(): + # Loop over all auth domains for that source workspace. + for source_auth_domain in source_workspace.authorization_domains.all(): + # If the user is not in the auth domain, they are not allowed to have access to the workspace. + # If so, break out of the loop - not necessary to check membership of the remaining auth domains. + # Note that this only breaks out of the inner loop. + # It would be more efficient to break out of the outer loop as well. + if source_auth_domain not in account_groups: + access_allowed = False + break + if access_allowed and in_auth_domain: + self.verified.append( + VerifiedAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.IN_SOURCE_AUTH_DOMAINS, + ) ) - ) - elif access_allowed and not in_auth_domain: - self.needs_action.append( - GrantAccess( - collaborative_analysis_workspace=collaborative_analysis_workspace, - member=account, - note=self.IN_SOURCE_AUTH_DOMAINS, + elif access_allowed and not in_auth_domain: + self.needs_action.append( + GrantAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.IN_SOURCE_AUTH_DOMAINS, + ) ) - ) - elif not access_allowed and in_auth_domain: - self.needs_action.append( - RemoveAccess( - collaborative_analysis_workspace=collaborative_analysis_workspace, - member=account, - note=self.NOT_IN_SOURCE_AUTH_DOMAINS, + elif not access_allowed and in_auth_domain: + self.needs_action.append( + RemoveAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + ) + else: + self.verified.append( + VerifiedNoAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.NOT_IN_SOURCE_AUTH_DOMAINS, + ) ) - ) else: - self.verified.append( - VerifiedNoAccess( - collaborative_analysis_workspace=collaborative_analysis_workspace, - member=account, - note=self.NOT_IN_SOURCE_AUTH_DOMAINS, + if in_auth_domain: + self.needs_action.append( + RemoveAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.NOT_IN_ANALYST_GROUP, + ) + ) + else: + self.verified.append( + VerifiedNoAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.NOT_IN_ANALYST_GROUP, + ) ) - ) def run_audit(self): """Run the audit on the set of workspaces.""" @@ -345,6 +340,9 @@ def run_audit(self): self._audit_workspace(workspace) self.completed = True + def get_all_results(self): + return self.verified + self.needs_action + self.errors + def get_verified_table(self): """Return a table of verified results.""" return self.results_table_class( diff --git a/primed/collaborative_analysis/tests/test_audit.py b/primed/collaborative_analysis/tests/test_audit.py index 62baf77f..75d5eeea 100644 --- a/primed/collaborative_analysis/tests/test_audit.py +++ b/primed/collaborative_analysis/tests/test_audit.py @@ -26,7 +26,15 @@ def test_account_verified_access(self): instance = audit.VerifiedAccess( collaborative_analysis_workspace=workspace, member=account, note="test" ) - self.assertIsNone(instance.get_action_url()) + expected_url = reverse( + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ], + ) + self.assertEqual(instance.get_action_url(), expected_url) def test_account_verified_no_access(self): workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() @@ -34,7 +42,15 @@ def test_account_verified_no_access(self): instance = audit.VerifiedNoAccess( collaborative_analysis_workspace=workspace, member=account, note="test" ) - self.assertIsNone(instance.get_action_url()) + expected_url = reverse( + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ], + ) + self.assertEqual(instance.get_action_url(), expected_url) def test_account_grant_access(self): workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() @@ -43,8 +59,12 @@ def test_account_grant_access(self): collaborative_analysis_workspace=workspace, member=account, note="test" ) expected_url = reverse( - "anvil_consortium_manager:managed_groups:member_accounts:new_by_account", - args=[workspace.workspace.authorization_domains.first().name, account.uuid], + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ], ) self.assertEqual(instance.get_action_url(), expected_url) @@ -55,8 +75,12 @@ def test_account_remove_access(self): collaborative_analysis_workspace=workspace, member=account, note="test" ) expected_url = reverse( - "anvil_consortium_manager:managed_groups:member_accounts:delete", - args=[workspace.workspace.authorization_domains.first().name, account.uuid], + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ], ) self.assertEqual(instance.get_action_url(), expected_url) @@ -66,7 +90,15 @@ def test_group_verified_access(self): instance = audit.VerifiedAccess( collaborative_analysis_workspace=workspace, member=group, note="test" ) - self.assertIsNone(instance.get_action_url()) + expected_url = reverse( + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ], + ) + self.assertEqual(instance.get_action_url(), expected_url) def test_group_verified_no_access(self): workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() @@ -74,7 +106,15 @@ def test_group_verified_no_access(self): instance = audit.VerifiedNoAccess( collaborative_analysis_workspace=workspace, member=group, note="test" ) - self.assertIsNone(instance.get_action_url()) + expected_url = reverse( + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ], + ) + self.assertEqual(instance.get_action_url(), expected_url) def test_group_grant_access(self): workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() @@ -83,8 +123,12 @@ def test_group_grant_access(self): collaborative_analysis_workspace=workspace, member=group, note="test" ) expected_url = reverse( - "anvil_consortium_manager:managed_groups:member_groups:new_by_child", - args=[workspace.workspace.authorization_domains.first().name, group.name], + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ], ) self.assertEqual(instance.get_action_url(), expected_url) @@ -95,8 +139,12 @@ def test_group_remove_access(self): collaborative_analysis_workspace=workspace, member=group, note="test" ) expected_url = reverse( - "anvil_consortium_manager:managed_groups:member_groups:delete", - args=[workspace.workspace.authorization_domains.first().name, group.name], + "collaborative_analysis:audit:resolve", + args=[ + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ], ) self.assertEqual(instance.get_action_url(), expected_url) diff --git a/primed/collaborative_analysis/tests/test_views.py b/primed/collaborative_analysis/tests/test_views.py index 0a7b9102..563192e3 100644 --- a/primed/collaborative_analysis/tests/test_views.py +++ b/primed/collaborative_analysis/tests/test_views.py @@ -1,7 +1,15 @@ """Tests for views related to the `collaborative_analysis` app.""" +from datetime import timedelta + import responses -from anvil_consortium_manager.models import AnVILProjectManagerAccess, Workspace +from anvil_consortium_manager.models import ( + AnVILProjectManagerAccess, + GroupAccountMembership, + GroupGroupMembership, + Workspace, +) +from anvil_consortium_manager.tests.api_factories import ErrorResponseFactory from anvil_consortium_manager.tests.factories import ( AccountFactory, BillingProjectFactory, @@ -13,11 +21,14 @@ from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission +from django.contrib.messages import get_messages from django.core.exceptions import PermissionDenied from django.http import Http404 from django.shortcuts import resolve_url from django.test import RequestFactory, TestCase from django.urls import reverse +from django.utils import timezone +from freezegun import freeze_time from primed.cdsa.tests.factories import CDSAWorkspaceFactory from primed.dbgap.tests.factories import dbGaPWorkspaceFactory @@ -1019,3 +1030,1222 @@ def test_context_error_table_group_in_auth_domain(self): audit.CollaborativeAnalysisWorkspaceAccessAudit.UNEXPECTED_GROUP_ACCESS, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + +class CollaborativeAnalysisAuditResolveTest(AnVILAPIMockTestMixin, TestCase): + """Tests for the CollaborativeWorkspaceAuditResolve view.""" + + def setUp(self): + """Set up test class.""" + super().setUp() + self.factory = RequestFactory() + # Create a user with both view and edit permission. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_EDIT_PERMISSION_CODENAME + ) + ) + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse( + "collaborative_analysis:audit:resolve", + args=args, + ) + + def get_view(self): + """Return the view being tested.""" + return views.CollaborativeAnalysisAuditResolve.as_view() + + def test_view_redirect_not_logged_in(self): + "View redirects to login view when user is not logged in." + # Need a client for redirects. + response = self.client.get(self.get_url("foo", "bar", "test@example.com")) + self.assertRedirects( + response, + resolve_url(settings.LOGIN_URL) + + "?next=" + + self.get_url("foo", "bar", "test@example.com"), + ) + + def test_status_code_account_with_user_permission_view(self): + """Returns successful response code if the user has view permission.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + member = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + collab_workspace.workspace.billing_project.name, + collab_workspace.workspace.name, + member.email, + ) + ) + self.assertEqual(response.status_code, 200) + + def test_status_code_group_with_user_permission_view(self): + """Returns successful response code if the user has view permission.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + member = ManagedGroupFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + collab_workspace.workspace.billing_project.name, + collab_workspace.workspace.name, + member.email, + ) + ) + self.assertEqual(response.status_code, 200) + + def test_access_without_user_permission(self): + """Raises permission denied if user has no permissions.""" + user_no_perms = User.objects.create_user( + username="test-none", password="test-none" + ) + request = self.factory.get(self.get_url("foo", "bar", "test@example.com")) + request.user = user_no_perms + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_invalid_billing_project_name(self): + """Raises a 404 error with an invalid object billing project name.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url("foo", collab_workspace.workspace.name, account.email) + ) + self.assertEqual(response.status_code, 404) + + def test_invalid_workspace_name(self): + """Raises a 404 error with an invalid object dbgap_application_pk.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + collab_workspace.workspace.billing_project.name, "foo", account.email + ) + ) + self.assertEqual(response.status_code, 404) + + def test_invalid_email(self): + """Raises a 404 error with an invalid object dbgap_application_pk.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + collab_workspace.workspace.billing_project.name, + collab_workspace.workspace.billing_project.name, + "test@example.com", + ) + ) + self.assertEqual(response.status_code, 404) + + def test_context_data_access_audit_account(self): + """The data_access_audit exists in the context.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + collab_workspace.workspace.billing_project.name, + collab_workspace.workspace.name, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + self.assertIsInstance( + response.context_data["audit_result"], + audit.AccessAuditResult, + ) + + def test_context_data_access_audit_group(self): + """The data_access_audit exists in the context.""" + collab_workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + collab_workspace.workspace.billing_project.name, + collab_workspace.workspace.name, + group.email, + ) + ) + self.assertIn("audit_result", response.context_data) + self.assertIsInstance( + response.context_data["audit_result"], + audit.AccessAuditResult, + ) + + def test_get_verified_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.VerifiedAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + account, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS, + ) + + def test_get_verified_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.VerifiedAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + group, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.DCC_ACCESS, + ) + + def test_get_verified_no_access_account(self): + """Get request with verified no access for an account.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.VerifiedNoAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + account, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_ANALYST_GROUP, + ) + + def test_get_verified_no_access_group(self): + """Get request with verified no access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.VerifiedNoAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual(audit_result.member, group) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.NON_DCC_GROUP, + ) + + def test_get_grant_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.GrantAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + account, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS, + ) + + def test_get_grant_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.GrantAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + group, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.DCC_ACCESS, + ) + + def test_get_remove_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.RemoveAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + account, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_ANALYST_GROUP, + ) + + def test_get_remove_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + # Auth domain membership. + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ) + ) + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.RemoveAccess) + self.assertEqual(audit_result.collaborative_analysis_workspace, workspace) + self.assertEqual( + audit_result.member, + group, + ) + self.assertEqual( + audit_result.note, + audit.CollaborativeAnalysisWorkspaceAccessAudit.UNEXPECTED_GROUP_ACCESS, + ) + + def test_post_verified_access_account(self): + """Post request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + date_created = timezone.now() - timedelta(weeks=5) + with freeze_time(date_created): + membership = GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + membership.refresh_from_db() + self.assertEqual(membership.created, date_created) + + def test_post_verified_access_group(self): + """Post request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + date_created = timezone.now() - timedelta(weeks=5) + with freeze_time(date_created): + membership = GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + membership.refresh_from_db() + self.assertEqual(membership.created, date_created) + + def test_post_verified_no_access_account(self): + """Get request with verified no access for an account.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + # No membership was created. + self.assertEqual(GroupAccountMembership.objects.count(), 0) + + def test_post_verified_no_access_group(self): + """Get request with verified no access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + # No membership was created. + self.assertEqual(GroupGroupMembership.objects.count(), 0) + + def test_post_grant_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + # A membership was created. + membership = GroupAccountMembership.objects.get( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + self.assertEqual(membership.role, GroupAccountMembership.MEMBER) + + def test_post_grant_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/PRIMED_CC_WRITERS@firecloud.org" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + # A membership was created. + membership = GroupGroupMembership.objects.get( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + self.assertEqual(membership.role, GroupGroupMembership.MEMBER) + + def test_post_remove_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + membership = GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + with self.assertRaises(GroupAccountMembership.DoesNotExist): + membership.refresh_from_db() + + def test_post_remove_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="test-group") + # Auth domain membership. + membership = GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test-group@firecloud.org" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + ) + self.assertRedirects(response, workspace.get_absolute_url()) + with self.assertRaises(GroupGroupMembership.DoesNotExist): + membership.refresh_from_db() + + def test_post_grant_access_account_htmx(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_success, + ) + # A membership was created. + membership = GroupAccountMembership.objects.get( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + self.assertEqual(membership.role, GroupAccountMembership.MEMBER) + + def test_post_grant_access_group_htmx(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/PRIMED_CC_WRITERS@firecloud.org" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=204, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_success, + ) + # A membership was created. + membership = GroupGroupMembership.objects.get( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + self.assertEqual(membership.role, GroupGroupMembership.MEMBER) + + def test_post_remove_access_account_htmx(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + membership = GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_success, + ) + with self.assertRaises(GroupAccountMembership.DoesNotExist): + membership.refresh_from_db() + + def test_post_remove_access_group_htmx(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="test-group") + # Auth domain membership. + membership = GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test-group@firecloud.org" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=204, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_success, + ) + with self.assertRaises(GroupGroupMembership.DoesNotExist): + membership.refresh_from_db() + + def test_anvil_error_grant_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + ) + self.assertEqual(response.status_code, 200) + # No new membership was created. + self.assertEqual(GroupAccountMembership.objects.count(), 1) + # Audit result is the same. + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.GrantAccess) + # A message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertIn("AnVIL API Error", str(messages[0])) + + def test_anvil_error_grant_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/PRIMED_CC_WRITERS@firecloud.org" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + ) + self.assertEqual(response.status_code, 200) + # No membership was created. + self.assertEqual(GroupGroupMembership.objects.count(), 0) + # Audit result is the same. + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.GrantAccess) + # A message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertIn("AnVIL API Error", str(messages[0])) + + def test_anvil_error_remove_access_account(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + membership = GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + ) + self.assertEqual(response.status_code, 200) + # The membership still exists + membership.refresh_from_db() + # Audit result is the same. + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.RemoveAccess) + # A message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertIn("AnVIL API Error", str(messages[0])) + + def test_anvil_error_remove_access_group(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="test-group") + # Auth domain membership. + membership = GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test-group@firecloud.org" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + ) + self.assertEqual(response.status_code, 200) + # Membership still exists. + membership.refresh_from_db() + # Audit result is the same. + self.assertIn("audit_result", response.context_data) + audit_result = response.context_data["audit_result"] + self.assertIsInstance(audit_result, audit.RemoveAccess) + # A message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 1) + self.assertIn("AnVIL API Error", str(messages[0])) + + def test_anvil_error_grant_access_account_htmx(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, + account=account, + ) + # Auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_error, + ) + # No new membership was created. + self.assertEqual(GroupAccountMembership.objects.count(), 1) + # No message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 0) + + def test_anvil_error_grant_access_group_htmx(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # Auth domain membership. + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/PRIMED_CC_WRITERS@firecloud.org" + ) + self.anvil_response_mock.add( + responses.PUT, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_error, + ) + # No new membership was created. + self.assertEqual(GroupGroupMembership.objects.count(), 0) + # No message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 0) + + def test_anvil_error_remove_access_account_htmx(self): + """Get request with verified access.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + account = AccountFactory.create(email="test@example.com") + # Analyst group membership. + # GroupAccountMembershipFactory.create( + # group=workspace.analyst_group, + # account=account, + # ) + # Auth domain membership. + membership = GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), + account=account, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test@example.com" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + account.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_error, + ) + # Membership still exists. + membership.refresh_from_db() + # No message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 0) + + def test_anvil_error_remove_access_group_htmx(self): + """Get request with verified access for a group.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="TEST_COLLAB" + ) + group = ManagedGroupFactory.create(name="test-group") + # Auth domain membership. + membership = GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + # Add API response + # Note that the auth domain group is created automatically by the factory using the workspace name. + api_url = ( + self.api_client.sam_entry_point + + "/api/groups/v1/auth_TEST_COLLAB/member/test-group@firecloud.org" + ) + self.anvil_response_mock.add( + responses.DELETE, + api_url, + status=500, + json=ErrorResponseFactory().response, + ) + self.client.force_login(self.user) + header = {"HTTP_HX-Request": "true"} + response = self.client.post( + self.get_url( + workspace.workspace.billing_project.name, + workspace.workspace.name, + group.email, + ), + {}, + **header + ) + self.assertEqual( + response.content.decode(), + views.CollaborativeAnalysisAuditResolve.htmx_error, + ) + # Membership still exists. + membership.refresh_from_db() + # No message was added. + messages = [m.message for m in get_messages(response.wsgi_request)] + self.assertEqual(len(messages), 0) diff --git a/primed/collaborative_analysis/urls.py b/primed/collaborative_analysis/urls.py index 4ec383ce..c07f3945 100644 --- a/primed/collaborative_analysis/urls.py +++ b/primed/collaborative_analysis/urls.py @@ -17,6 +17,11 @@ views.WorkspaceAudit.as_view(), name="workspaces", ), + path( + "resolve////", + views.CollaborativeAnalysisAuditResolve.as_view(), + name="resolve", + ), ], "audit", ) diff --git a/primed/collaborative_analysis/views.py b/primed/collaborative_analysis/views.py index a678e81b..3c58849b 100644 --- a/primed/collaborative_analysis/views.py +++ b/primed/collaborative_analysis/views.py @@ -1,7 +1,20 @@ -from anvil_consortium_manager.auth import AnVILConsortiumManagerStaffViewRequired -from django.http import Http404 +from anvil_consortium_manager.anvil_api import AnVILAPIError +from anvil_consortium_manager.auth import ( + AnVILConsortiumManagerStaffEditRequired, + AnVILConsortiumManagerStaffViewRequired, +) +from anvil_consortium_manager.models import ( + Account, + GroupAccountMembership, + GroupGroupMembership, + ManagedGroup, +) +from django.contrib import messages +from django.db import transaction +from django.forms.forms import Form +from django.http import Http404, HttpResponse from django.utils.translation import gettext_lazy as _ -from django.views.generic import DetailView, TemplateView +from django.views.generic import DetailView, FormView, TemplateView from . import audit, models @@ -67,3 +80,150 @@ def get_context_data(self, **kwargs): context["needs_action_table"] = data_access_audit.get_needs_action_table() context["data_access_audit"] = data_access_audit return context + + +class CollaborativeAnalysisAuditResolve( + AnVILConsortiumManagerStaffEditRequired, FormView +): + + form_class = Form + template_name = "collaborative_analysis/audit_resolve.html" + htmx_success = """ Handled!""" + htmx_error = """ Error!""" + + def get_collaborative_analysis_workspace(self): + """Look up the CollaborativeAnalysisWorkspace by billing project and name.""" + # Filter the queryset based on kwargs. + billing_project_slug = self.kwargs.get("billing_project_slug", None) + workspace_slug = self.kwargs.get("workspace_slug", None) + queryset = models.CollaborativeAnalysisWorkspace.objects.filter( + workspace__billing_project__name=billing_project_slug, + workspace__name=workspace_slug, + ) + try: + # Get the single item from the filtered queryset + obj = queryset.get() + except queryset.model.DoesNotExist: + raise Http404( + _("No %(verbose_name)s found matching the query") + % {"verbose_name": queryset.model._meta.verbose_name} + ) + return obj + + def get_member(self): + """Look up the member (account or group) by email.""" + email = self.kwargs.get( + "member_email", + ) + # Check for an account first. + try: + return Account.objects.get(email=email) + except Account.DoesNotExist: + pass + # Then check for a managed group. + try: + return ManagedGroup.objects.get(email=email) + except ManagedGroup.DoesNotExist: + raise Http404( + _("No %(verbose_name)s found matching the query") + % {"verbose_name": "Account or ManagedGroup"} + ) + + def get_audit_result(self): + instance = audit.CollaborativeAnalysisWorkspaceAccessAudit( + queryset=models.CollaborativeAnalysisWorkspace.objects.filter( + pk=self.collaborative_analysis_workspace.pk + ) + ) + # No way to include a queryset of members at this point - need to call the sub method directly. + if isinstance(self.member, Account): + instance._audit_workspace_and_account( + self.collaborative_analysis_workspace, self.member + ) + else: + instance._audit_workspace_and_group( + self.collaborative_analysis_workspace, self.member + ) + return instance.get_all_results()[0] + + def get(self, request, *args, **kwargs): + self.collaborative_analysis_workspace = ( + self.get_collaborative_analysis_workspace() + ) + self.member = self.get_member() + self.audit_result = self.get_audit_result() + return super().get(request, *args, **kwargs) + + def post(self, request, *args, **kwargs): + self.collaborative_analysis_workspace = ( + self.get_collaborative_analysis_workspace() + ) + self.member = self.get_member() + self.audit_result = self.get_audit_result() + return super().post(request, *args, **kwargs) + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + context[ + "collaborative_analysis_workspace" + ] = self.collaborative_analysis_workspace + context["member"] = self.member + context["audit_result"] = self.audit_result + return context + + def get_success_url(self): + return self.collaborative_analysis_workspace.get_absolute_url() + + def form_valid(self, form): + auth_domain = ( + self.collaborative_analysis_workspace.workspace.authorization_domains.first() + ) + # Handle the result. + try: + with transaction.atomic(): + if isinstance(self.audit_result, audit.GrantAccess): + # Add to workspace auth domain. + if isinstance(self.member, Account): + membership = GroupAccountMembership( + group=auth_domain, + account=self.member, + role=GroupAccountMembership.MEMBER, + ) + elif isinstance(self.member, ManagedGroup): + membership = GroupGroupMembership( + parent_group=auth_domain, + child_group=self.member, + role=GroupGroupMembership.MEMBER, + ) + membership.anvil_create() + membership.full_clean() + membership.save() + elif isinstance(self.audit_result, audit.RemoveAccess): + # Remove from CDSA group. + if isinstance(self.member, Account): + membership = GroupAccountMembership.objects.get( + group=auth_domain, + account=self.member, + role=GroupAccountMembership.MEMBER, + ) + elif isinstance(self.member, ManagedGroup): + membership = GroupGroupMembership.objects.get( + parent_group=auth_domain, + child_group=self.member, + role=GroupGroupMembership.MEMBER, + ) + membership.anvil_delete() + membership.delete() + else: + pass + except AnVILAPIError as e: + if self.request.htmx: + return HttpResponse(self.htmx_error) + else: + messages.error(self.request, "AnVIL API Error: " + str(e)) + return super().form_invalid(form) + # Otherwise, the audit resolution succeeded. + if self.request.htmx: + return HttpResponse(self.htmx_success) + else: + return super().form_valid(form) diff --git a/primed/dbgap/views.py b/primed/dbgap/views.py index dff7ce51..00c75b88 100644 --- a/primed/dbgap/views.py +++ b/primed/dbgap/views.py @@ -771,12 +771,6 @@ def form_valid(self, form): else: return super().form_valid(form) - # def form_invalid(self, form): - # if self.request.htmx: - # return HttpResponse(self.htmx_error) - # else: - # return super().form_invalid(form) - class dbGaPRecordsIndex(TemplateView): """Index page for dbGaP records.""" diff --git a/primed/templates/collaborative_analysis/audit_resolve.html b/primed/templates/collaborative_analysis/audit_resolve.html new file mode 100644 index 00000000..bdadad0b --- /dev/null +++ b/primed/templates/collaborative_analysis/audit_resolve.html @@ -0,0 +1,44 @@ +{% extends "anvil_consortium_manager/base.html" %} +{% load django_tables2 %} +{% load crispy_forms_tags %} + +{% block title %}Resolve collaborative analysis access audit{% endblock %} + + +{% block content %} + +

Resolve collaborative analysis

+ +
+ +
+ +

Audit results

+ +
+ TODO: ADD BOX TEXT + ADD +
+ +
+
+
Result
+

{{ audit_result }}

+ {% if audit_result.action %} +
+ + {% csrf_token %} + {{ form|crispy }} + + +
+ {% else %} + + {% endif %} +
+
+ +{% endblock content %}