From 7bbcace7c576c0ec18e79a514b8b76d28c4e8cf2 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Thu, 1 Feb 2024 16:17:09 -0800 Subject: [PATCH 01/12] Use only one class for auditing dbGaP access Instead of having an abstract base class and subclassing that to audit a dbGaPApplication or a dbGaPWorkspace, instead just use the super class itself. Update dbGaPAccessAudit to be a non-abstract class and to take a queryset of dbGaPApplications and dbGaPWorkspaces to audit. Remove the dbGaPApplicationAccessAudit and dbGaPWorkspaceAccessAudit classes. Note that views and the rest of the app is still broken. --- primed/dbgap/audit.py | 64 ++-- primed/dbgap/tests/test_audit.py | 555 ++++--------------------------- 2 files changed, 88 insertions(+), 531 deletions(-) diff --git a/primed/dbgap/audit.py b/primed/dbgap/audit.py index 2f384079..f21c5f75 100644 --- a/primed/dbgap/audit.py +++ b/primed/dbgap/audit.py @@ -1,4 +1,3 @@ -from abc import ABC from dataclasses import dataclass import django_tables2 as tables @@ -139,7 +138,7 @@ def render_action(self, record, value): ) -class dbGaPAccessAudit(ABC): +class dbGaPAccessAudit: # Access verified. APPROVED_DAR = "Approved DAR." @@ -159,12 +158,29 @@ class dbGaPAccessAudit(ABC): results_table_class = dbGaPAccessAuditTable - def __init__(self): + def __init__(self, dbgap_application_queryset=None, dbgap_workspace_queryset=None): self.completed = False # Set up lists to hold audit results. self.verified = None self.needs_action = None self.errors = None + self.dbgap_application_queryset = dbgap_application_queryset + self.dbgap_workspace_queryset = dbgap_workspace_queryset + if not self.dbgap_application_queryset: + self.dbgap_application_queryset = dbGaPApplication.objects.all() + if not self.dbgap_workspace_queryset: + self.dbgap_workspace_queryset = dbGaPWorkspace.objects.all() + # Check types of workspaces. + + def run_audit(self): + self.verified = [] + self.needs_action = [] + self.errors = [] + + for dbgap_application in self.dbgap_application_queryset: + for dbgap_workspace in self.dbgap_workspace_queryset: + self.audit_application_and_workspace(dbgap_application, dbgap_workspace) + self.completed = True def audit_application_and_workspace(self, dbgap_application, dbgap_workspace): """Audit access for a specific dbGaP application and a specific workspace.""" @@ -319,45 +335,3 @@ def get_needs_action_table(self): def get_errors_table(self): """Return a table of audit errors.""" return self.results_table_class([x.get_table_dictionary() for x in self.errors]) - - -class dbGaPApplicationAccessAudit(dbGaPAccessAudit): - def __init__(self, dbgap_application): - super().__init__() - self.dbgap_application = dbgap_application - - def run_audit(self): - """Audit all workspaces against access provided by this dbGaPApplication.""" - self.verified = [] - self.needs_action = [] - self.errors = [] - - # Get a list of all dbGaP workspaces. - dbgap_workspaces = dbGaPWorkspace.objects.all() - # Loop through workspaces and verify access. - for dbgap_workspace in dbgap_workspaces: - self.audit_application_and_workspace( - self.dbgap_application, dbgap_workspace - ) - self.completed = True - - -class dbGaPWorkspaceAccessAudit(dbGaPAccessAudit): - def __init__(self, dbgap_workspace): - super().__init__() - self.dbgap_workspace = dbgap_workspace - - def run_audit(self): - """Audit this workspace against access provided by all dbGaPApplications.""" - self.verified = [] - self.needs_action = [] - self.errors = [] - - # Get a list of all dbGaP applications. - dbgap_applications = dbGaPApplication.objects.all() - # Loop through workspaces and verify access. - for dbgap_application in dbgap_applications: - self.audit_application_and_workspace( - dbgap_application, self.dbgap_workspace - ) - self.completed = True diff --git a/primed/dbgap/tests/test_audit.py b/primed/dbgap/tests/test_audit.py index c8cb3866..4d44ce9a 100644 --- a/primed/dbgap/tests/test_audit.py +++ b/primed/dbgap/tests/test_audit.py @@ -119,22 +119,30 @@ def test_post_init(self): ) -class dbGaPApplicationAccessAuditTest(TestCase): - """Tests for the dbGaPApplicationAccessAudit class.""" +class dbGaPAccessAuditTest(TestCase): + """Tests for the dbGaPAccessAudit class.""" def test_completed(self): """completed is updated properly.""" - dbgap_application = factories.dbGaPApplicationFactory.create() - dbgap_audit = audit.dbGaPApplicationAccessAudit(dbgap_application) + dbgap_audit = audit.dbGaPAccessAudit() self.assertFalse(dbgap_audit.completed) dbgap_audit.run_audit() self.assertTrue(dbgap_audit.completed) - def test_no_workspaces_no_snapshots(self): - """run_audit with no existing workspaces and no snapshots.""" + def test_one_application_no_workspace(self): + """run_audit with one application and no existing workspaces.""" + factories.dbGaPApplicationFactory.create() + dbgap_audit = audit.dbGaPAccessAudit() + dbgap_audit.run_audit() + self.assertEqual(len(dbgap_audit.verified), 0) + self.assertEqual(len(dbgap_audit.needs_action), 0) + self.assertEqual(len(dbgap_audit.errors), 0) + + def test_one_application_one_workspace_no_snapshots(self): + """run_audit with one application, one workspaces and no snapshots.""" dbgap_workspace = factories.dbGaPWorkspaceFactory.create() dbgap_application = factories.dbGaPApplicationFactory.create() - dbgap_audit = audit.dbGaPApplicationAccessAudit(dbgap_application) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 1) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -146,23 +154,12 @@ def test_no_workspaces_no_snapshots(self): self.assertIsNone(record.data_access_request) self.assertEqual(record.note, audit.dbGaPAccessAudit.NO_SNAPSHOTS) - def test_one_workspaces_no_snapshots(self): - """run_audit with no existing workspaces and no snapshots.""" - dbgap_application = factories.dbGaPApplicationFactory.create() - dbgap_audit = audit.dbGaPApplicationAccessAudit(dbgap_application) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - - def test_snapshot_has_no_dars(self): + def test_one_application_one_workspace_snapshot_has_no_dars(self): """run_audit with no dars.""" # Create a workspace and a snapshot. dbgap_workspace = factories.dbGaPWorkspaceFactory.create() dbgap_snapshot = factories.dbGaPDataAccessSnapshotFactory.create() - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dbgap_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 1) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -174,8 +171,8 @@ def test_snapshot_has_no_dars(self): self.assertIsNone(record.data_access_request) self.assertEqual(record.note, audit.dbGaPAccessAudit.NO_DAR) - def test_one_verified_access(self): - """run_audit with one workspace that has verified access.""" + def test_verified_access(self): + """run_audit with one application and one workspace that has verified access.""" # Create a workspace and matching DAR. dbgap_workspace = factories.dbGaPWorkspaceFactory.create() dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( @@ -186,9 +183,7 @@ def test_one_verified_access(self): parent_group=dbgap_workspace.workspace.authorization_domains.first(), child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, ) - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 1) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -201,8 +196,8 @@ def test_one_verified_access(self): ) self.assertEqual(record.data_access_request, dar) - def test_two_verified_access(self): - """run_audit with two workspaces that have verified access.""" + def test_two_workspaces(self): + """run_audit with one application and two workspaces with different access.""" dbgap_snapshot = factories.dbGaPDataAccessSnapshotFactory.create() # Create two workspaces and matching DARs. dbgap_workspace_1 = factories.dbGaPWorkspaceFactory.create() @@ -213,35 +208,33 @@ def test_two_verified_access(self): dar_2 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( dbgap_workspace=dbgap_workspace_2, dbgap_data_access_snapshot=dbgap_snapshot ) - # Add the anvil group to the auth groups for the workspaces. + # Add the anvil group to the auth groups for the first workspace. GroupGroupMembershipFactory( parent_group=dbgap_workspace_1.workspace.authorization_domains.first(), child_group=dar_1.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, ) - GroupGroupMembershipFactory( - parent_group=dbgap_workspace_2.workspace.authorization_domains.first(), - child_group=dar_2.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dbgap_snapshot.dbgap_application - ) + # GroupGroupMembershipFactory( + # parent_group=dbgap_workspace_2.workspace.authorization_domains.first(), + # child_group=dar_2.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, + # ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 2) - self.assertEqual(len(dbgap_audit.needs_action), 0) + self.assertEqual(len(dbgap_audit.verified), 1) + self.assertEqual(len(dbgap_audit.needs_action), 1) self.assertEqual(len(dbgap_audit.errors), 0) record = dbgap_audit.verified[0] self.assertIsInstance(record, audit.VerifiedAccess) self.assertEqual(record.workspace, dbgap_workspace_1) self.assertEqual(record.data_access_request, dar_1) self.assertEqual(record.dbgap_application, dbgap_snapshot.dbgap_application) - record = dbgap_audit.verified[1] - self.assertIsInstance(record, audit.VerifiedAccess) + record = dbgap_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) self.assertEqual(record.workspace, dbgap_workspace_2) self.assertEqual(record.dbgap_application, dbgap_snapshot.dbgap_application) self.assertEqual(record.data_access_request, dar_2) - def test_one_verified_no_access_dar_not_approved(self): - """run_audit with one workspace that has verified no access.""" + def test_verified_no_access_dar_not_approved(self): + """run_audit with one application and one workspace that has verified no access.""" # Create a workspace and matching DAR. dbgap_workspace = factories.dbGaPWorkspaceFactory.create() dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( @@ -249,9 +242,7 @@ def test_one_verified_no_access_dar_not_approved(self): dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, ) # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 1) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -263,9 +254,7 @@ def test_one_verified_no_access_dar_not_approved(self): record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application ) self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.DAR_NOT_APPROVED - ) + self.assertEqual(record.note, audit.dbGaPAccessAudit.DAR_NOT_APPROVED) def test_grant_access_new_approved_dar(self): # Create a workspace and matching DAR. @@ -278,9 +267,7 @@ def test_grant_access_new_approved_dar(self): dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), ) # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 1) @@ -292,9 +279,7 @@ def test_grant_access_new_approved_dar(self): record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application ) self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.NEW_APPROVED_DAR - ) + self.assertEqual(record.note, audit.dbGaPAccessAudit.NEW_APPROVED_DAR) def test_grant_access_new_workspace(self): # Create a workspace and matching DAR. @@ -307,9 +292,7 @@ def test_grant_access_new_workspace(self): dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=3), ) # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 1) @@ -321,7 +304,7 @@ def test_grant_access_new_workspace(self): record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application ) self.assertEqual(record.data_access_request, dar) - self.assertEqual(record.note, audit.dbGaPApplicationAccessAudit.NEW_WORKSPACE) + self.assertEqual(record.note, audit.dbGaPAccessAudit.NEW_WORKSPACE) def test_grant_access_updated_dar(self): # Create a workspace and matching DAR. @@ -345,9 +328,7 @@ def test_grant_access_updated_dar(self): dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), ) # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 1) @@ -359,9 +340,7 @@ def test_grant_access_updated_dar(self): record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application ) self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.NEW_APPROVED_DAR - ) + self.assertEqual(record.note, audit.dbGaPAccessAudit.NEW_APPROVED_DAR) def test_remove_access_udpated_dar(self): # Create a workspace and matching DAR. @@ -387,9 +366,7 @@ def test_remove_access_udpated_dar(self): parent_group=dbgap_workspace.workspace.authorization_domains.first(), child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, ) - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 1) @@ -401,34 +378,35 @@ def test_remove_access_udpated_dar(self): record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application ) self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.PREVIOUS_APPROVAL - ) + self.assertEqual(record.note, audit.dbGaPAccessAudit.PREVIOUS_APPROVAL) def test_error_remove_access_unknown_reason(self): """Access needs to be removed for an unknown reason.""" + dbgap_application = factories.dbGaPApplicationFactory.create() # Create a workspace and matching DAR. dbgap_workspace = factories.dbGaPWorkspaceFactory.create() # Create an old snapshot where the DAR was rejected and a new one where it is still rejected. old_dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( dbgap_workspace=dbgap_workspace, + dbgap_data_access_snapshot__dbgap_application=dbgap_application, dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=3), + dbgap_data_access_snapshot__is_most_recent=False, dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, ) dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( dbgap_dar_id=old_dar.dbgap_dar_id, dbgap_workspace=dbgap_workspace, + dbgap_data_access_snapshot__dbgap_application=dbgap_application, dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), + dbgap_data_access_snapshot__is_most_recent=True, dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, ) # Add the anvil group to the auth group for the workspace. GroupGroupMembershipFactory.create( parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application + child_group=dbgap_application.anvil_access_group, ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -436,13 +414,9 @@ def test_error_remove_access_unknown_reason(self): record = dbgap_audit.errors[0] self.assertIsInstance(record, audit.RemoveAccess) self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) + self.assertEqual(record.dbgap_application, dbgap_application) self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS - ) + self.assertEqual(record.note, audit.dbGaPAccessAudit.ERROR_HAS_ACCESS) def test_error_remove_access_no_snapshot(self): """Access needs to be removed for an unknown reason when there is no snapshot.""" @@ -454,7 +428,7 @@ def test_error_remove_access_no_snapshot(self): parent_group=dbgap_workspace.workspace.authorization_domains.first(), child_group=dbgap_application.anvil_access_group, ) - dbgap_audit = audit.dbGaPApplicationAccessAudit(dbgap_application) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -464,9 +438,7 @@ def test_error_remove_access_no_snapshot(self): self.assertEqual(record.workspace, dbgap_workspace) self.assertEqual(record.dbgap_application, dbgap_application) self.assertIsNone(record.data_access_request) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS - ) + self.assertEqual(record.note, audit.dbGaPAccessAudit.ERROR_HAS_ACCESS) def test_error_remove_access_snapshot_no_dar(self): """Group has access but there is no matching DAR.""" @@ -478,7 +450,7 @@ def test_error_remove_access_snapshot_no_dar(self): parent_group=dbgap_workspace.workspace.authorization_domains.first(), child_group=snapshot.dbgap_application.anvil_access_group, ) - dbgap_audit = audit.dbGaPApplicationAccessAudit(snapshot.dbgap_application) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() self.assertEqual(len(dbgap_audit.verified), 0) self.assertEqual(len(dbgap_audit.needs_action), 0) @@ -488,121 +460,10 @@ def test_error_remove_access_snapshot_no_dar(self): self.assertEqual(record.workspace, dbgap_workspace) self.assertEqual(record.dbgap_application, snapshot.dbgap_application) self.assertIsNone(record.data_access_request) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS - ) - - def test_approved_dar_for_different_application(self): - """There is an approved dar for a different application, but not this one.""" - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - # Create an approved DAR from an unrelated application. - factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace - ) - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, - ) - dbgap_audit = audit.dbGaPApplicationAccessAudit( - dar.dbgap_data_access_snapshot.dbgap_application - ) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 1) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.verified[0] - self.assertIsInstance(record, audit.VerifiedNoAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.DAR_NOT_APPROVED - ) - - -class dbGaPWorkspaceAccessAuditTest(TestCase): - """Tests for the dbGaPWorkspaceAccessAudit class.""" - - def test_completed(self): - """completed is updated properly.""" - workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(workspace) - self.assertFalse(dbgap_audit.completed) - dbgap_audit.run_audit() - self.assertTrue(dbgap_audit.completed) - - def test_no_applications_no_snapshots(self): - """run_audit with no existing workspaces and no snapshots.""" - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - - def test_one_application_no_snapshots(self): - """run_audit with no existing workspaces and no snapshots.""" - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_application = factories.dbGaPApplicationFactory.create() - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 1) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.verified[0] - self.assertIsInstance(record, audit.VerifiedNoAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual(record.dbgap_application, dbgap_application) - self.assertIsNone(record.data_access_request) - self.assertEqual(record.note, audit.dbGaPAccessAudit.NO_SNAPSHOTS) - - def test_snapshot_has_no_dars(self): - """run_audit with one snapshot that has no dars.""" - # Create a workspace and a snapshot. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_snapshot = factories.dbGaPDataAccessSnapshotFactory.create() - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 1) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.verified[0] - self.assertIsInstance(record, audit.VerifiedNoAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual(record.dbgap_application, dbgap_snapshot.dbgap_application) - self.assertIsNone(record.data_access_request) - self.assertEqual(record.note, audit.dbGaPAccessAudit.NO_DAR) - - def test_one_verified_access(self): - """run_audit with one workspace that has verified access.""" - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace - ) - # Add the anvil group to the auth group for the workspace. - GroupGroupMembershipFactory( - parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 1) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.verified[0] - self.assertIsInstance(record, audit.VerifiedAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) + self.assertEqual(record.note, audit.dbGaPAccessAudit.ERROR_HAS_ACCESS) - def test_two_verified_access(self): - """run_audit with two applications that have verified access.""" + def test_two_applications(self): + """run_audit with two applications and one workspace.""" dbgap_workspace = factories.dbGaPWorkspaceFactory.create() # Create two applications and matching DARs. dbgap_snapshot_1 = factories.dbGaPDataAccessSnapshotFactory.create() @@ -613,19 +474,19 @@ def test_two_verified_access(self): dar_2 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( dbgap_workspace=dbgap_workspace, dbgap_data_access_snapshot=dbgap_snapshot_2 ) - # Add the anvil group to the auth groups for the workspaces. + # Add the anvil group for the first applicatoin to the auth groups for the workspace. GroupGroupMembershipFactory( parent_group=dbgap_workspace.workspace.authorization_domains.first(), child_group=dbgap_snapshot_1.dbgap_application.anvil_access_group, ) - GroupGroupMembershipFactory( - parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=dbgap_snapshot_2.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) + # GroupGroupMembershipFactory( + # parent_group=dbgap_workspace.workspace.authorization_domains.first(), + # child_group=dbgap_snapshot_2.dbgap_application.anvil_access_group, + # ) + dbgap_audit = audit.dbGaPAccessAudit() dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 2) - self.assertEqual(len(dbgap_audit.needs_action), 0) + self.assertEqual(len(dbgap_audit.verified), 1) + self.assertEqual(len(dbgap_audit.needs_action), 1) self.assertEqual(len(dbgap_audit.errors), 0) record = dbgap_audit.verified[0] self.assertIsInstance(record, audit.VerifiedAccess) @@ -634,291 +495,13 @@ def test_two_verified_access(self): record.dbgap_application, dar_1.dbgap_data_access_snapshot.dbgap_application ) self.assertEqual(record.data_access_request, dar_1) - record = dbgap_audit.verified[1] - self.assertIsInstance(record, audit.VerifiedAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar_2.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar_2) - - def test_one_verified_no_access_dar_not_approved(self): - """run_audit with one application that has verified no access.""" - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, - ) - # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 1) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.verified[0] - self.assertIsInstance(record, audit.VerifiedNoAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.DAR_NOT_APPROVED - ) - - def test_grant_access_new_approved_dar(self): - # Create a workspace and matching DAR. - # Workspace was created before the snapshot. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create( - created=timezone.now() - timedelta(weeks=3) - ) - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), - ) - # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 1) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.needs_action[0] - self.assertIsInstance(record, audit.GrantAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.NEW_APPROVED_DAR - ) - - def test_grant_access_new_workspace(self): - # Create a workspace and matching DAR. - # Workspace was created after the snapshot. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create( - created=timezone.now() - timedelta(weeks=2) - ) - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=3), - ) - # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 1) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.needs_action[0] - self.assertIsInstance(record, audit.GrantAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual(record.note, audit.dbGaPApplicationAccessAudit.NEW_WORKSPACE) - - def test_grant_access_updated_dar(self): - # Create a workspace and matching DAR. - # Workspace was created before the snapshot. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create( - created=timezone.now() - timedelta(weeks=4) - ) - dbgap_application = factories.dbGaPApplicationFactory.create() - # Create an old snapshot where the DAR was not approved. - old_dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__is_most_recent=False, - dbgap_data_access_snapshot__dbgap_application=dbgap_application, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=3), - dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, - ) - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_dar_id=old_dar.dbgap_dar_id, - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__dbgap_application=dbgap_application, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), - ) - # Do not add the anvil group to the auth group for the workspace. - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 1) - self.assertEqual(len(dbgap_audit.errors), 0) record = dbgap_audit.needs_action[0] self.assertIsInstance(record, audit.GrantAccess) self.assertEqual(record.workspace, dbgap_workspace) self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.NEW_APPROVED_DAR - ) - - def test_remove_access_udpated_dar(self): - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_application = factories.dbGaPApplicationFactory.create() - # Create an old snapshot where the DAR was approved and a new one where it was closed. - old_dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__dbgap_application=dbgap_application, - dbgap_data_access_snapshot__is_most_recent=False, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=3), - dbgap_current_status=models.dbGaPDataAccessRequest.APPROVED, - ) - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_dar_id=old_dar.dbgap_dar_id, - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__dbgap_application=dbgap_application, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), - dbgap_current_status=models.dbGaPDataAccessRequest.CLOSED, - ) - # Add the anvil group to the auth group for the workspace. - GroupGroupMembershipFactory.create( - parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 1) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.needs_action[0] - self.assertIsInstance(record, audit.RemoveAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.PREVIOUS_APPROVAL - ) - - def test_error_remove_access_unknown_reason(self): - """Access needs to be removed for an unknown reason.""" - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_application = factories.dbGaPApplicationFactory.create() - # Create an old snapshot where the DAR was rejected and a new one where it is still rejected. - old_dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__dbgap_application=dbgap_application, - dbgap_data_access_snapshot__is_most_recent=False, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=3), - dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, - ) - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_dar_id=old_dar.dbgap_dar_id, - dbgap_workspace=dbgap_workspace, - dbgap_data_access_snapshot__dbgap_application=dbgap_application, - dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=2), - dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, - ) - # Add the anvil group to the auth group for the workspace. - GroupGroupMembershipFactory.create( - parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 1) - record = dbgap_audit.errors[0] - self.assertIsInstance(record, audit.RemoveAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS - ) - - def test_error_remove_access_no_snapshot(self): - """Access needs to be removed for an unknown reason when there is no snapshot.""" - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_application = factories.dbGaPApplicationFactory.create() - # Add the anvil group to the auth group for the workspace. - GroupGroupMembershipFactory.create( - parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 1) - record = dbgap_audit.errors[0] - self.assertIsInstance(record, audit.RemoveAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual(record.dbgap_application, dbgap_application) - self.assertIsNone(record.data_access_request) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS - ) - - def test_error_remove_access_snapshot_no_dar(self): - """Group has access but there is no matching DAR.""" - # Create a workspace but no matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - # Add the anvil group to the auth group for the workspace. - snapshot = factories.dbGaPDataAccessSnapshotFactory.create() - GroupGroupMembershipFactory.create( - parent_group=dbgap_workspace.workspace.authorization_domains.first(), - child_group=snapshot.dbgap_application.anvil_access_group, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 0) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 1) - record = dbgap_audit.errors[0] - self.assertIsInstance(record, audit.RemoveAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual(record.dbgap_application, snapshot.dbgap_application) - self.assertEqual(record.data_access_request, None) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS - ) - - def test_approved_dar_for_different_workspace(self): - """There is an approved dar for a different workspace, but not this one.""" - # Create a workspace and matching DAR. - dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - # Create an unrelated workspace. - other_dbgap_workspace = factories.dbGaPWorkspaceFactory.create() - dbgap_snapshot = factories.dbGaPDataAccessSnapshotFactory.create() - # Create an approved DAR for the other workspace. - factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_workspace=other_dbgap_workspace, - dbgap_data_access_snapshot=dbgap_snapshot, - ) - # Create a rejected DAR for this workspace. - dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( - dbgap_data_access_snapshot=dbgap_snapshot, - dbgap_workspace=dbgap_workspace, - dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, - ) - dbgap_audit = audit.dbGaPWorkspaceAccessAudit(dbgap_workspace) - dbgap_audit.run_audit() - self.assertEqual(len(dbgap_audit.verified), 1) - self.assertEqual(len(dbgap_audit.needs_action), 0) - self.assertEqual(len(dbgap_audit.errors), 0) - record = dbgap_audit.verified[0] - self.assertIsInstance(record, audit.VerifiedNoAccess) - self.assertEqual(record.workspace, dbgap_workspace) - self.assertEqual( - record.dbgap_application, dar.dbgap_data_access_snapshot.dbgap_application - ) - self.assertEqual(record.data_access_request, dar) - self.assertEqual( - record.note, audit.dbGaPApplicationAccessAudit.DAR_NOT_APPROVED + record.dbgap_application, dar_2.dbgap_data_access_snapshot.dbgap_application ) + self.assertEqual(record.data_access_request, dar_2) class dbGaPAccessAuditTableTest(TestCase): From 04b4788f9dbd8be2c5bb25596967479d7c60c059 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Thu, 1 Feb 2024 16:25:11 -0800 Subject: [PATCH 02/12] Switch to using dbGaPAccessAudit when auditing workspcaes In the view that audits access to a single dbGaPWorkspace, use the revamped dbGaPAccessAudit class instead of the dbGaPWorkspaceAccessAudit class that no longer exists. Update a couple tests where necessary. --- primed/dbgap/tests/test_views.py | 33 ++++++++++++++++---------------- primed/dbgap/views.py | 4 +++- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/primed/dbgap/tests/test_views.py b/primed/dbgap/tests/test_views.py index 24d6060a..9cebcd2b 100644 --- a/primed/dbgap/tests/test_views.py +++ b/primed/dbgap/tests/test_views.py @@ -4235,15 +4235,14 @@ def test_context_data_access_audit(self): ) ) self.assertIn("data_access_audit", response.context_data) + data_access_audit = response.context_data["data_access_audit"] self.assertIsInstance( - response.context_data["data_access_audit"], - audit.dbGaPWorkspaceAccessAudit, - ) - self.assertTrue(response.context_data["data_access_audit"].completed) - self.assertEqual( - response.context_data["data_access_audit"].dbgap_workspace, - self.dbgap_workspace, + data_access_audit, + audit.dbGaPAccessAudit, ) + self.assertTrue(data_access_audit.completed) + self.assertEqual(data_access_audit.dbgap_workspace_queryset.count(), 1) + self.assertIn(self.dbgap_workspace, data_access_audit.dbgap_workspace_queryset) def test_context_verified_table_access(self): """verified_table shows a record when audit has verified access.""" @@ -4276,7 +4275,7 @@ def test_context_verified_table_access(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.APPROVED_DAR, + audit.dbGaPAccessAudit.APPROVED_DAR, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -4304,7 +4303,7 @@ def test_context_verified_table_no_snapshot(self): self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.NO_SNAPSHOTS, + audit.dbGaPAccessAudit.NO_SNAPSHOTS, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -4332,7 +4331,7 @@ def test_context_verified_table_no_dar(self): self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.NO_DAR, + audit.dbGaPAccessAudit.NO_DAR, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -4364,7 +4363,7 @@ def test_context_verified_table_other_dar(self): self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.NO_DAR, + audit.dbGaPAccessAudit.NO_DAR, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -4395,7 +4394,7 @@ def test_context_verified_table_dar_not_approved(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.DAR_NOT_APPROVED, + audit.dbGaPAccessAudit.DAR_NOT_APPROVED, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -4425,7 +4424,7 @@ def test_context_needs_action_table_grant(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.NEW_APPROVED_DAR, + audit.dbGaPAccessAudit.NEW_APPROVED_DAR, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4472,7 +4471,7 @@ def test_context_needs_action_table_remove(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.PREVIOUS_APPROVAL, + audit.dbGaPAccessAudit.PREVIOUS_APPROVAL, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4509,7 +4508,7 @@ def test_context_error_table_has_access(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS, + audit.dbGaPAccessAudit.ERROR_HAS_ACCESS, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4542,7 +4541,7 @@ def test_context_errors_table_no_snapshot_has_access(self): self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.ERROR_HAS_ACCESS, + audit.dbGaPAccessAudit.ERROR_HAS_ACCESS, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4575,7 +4574,7 @@ def test_context_errors_table_no_dar_has_access(self): self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPWorkspaceAccessAudit.ERROR_HAS_ACCESS, + audit.dbGaPAccessAudit.ERROR_HAS_ACCESS, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) diff --git a/primed/dbgap/views.py b/primed/dbgap/views.py index d52bc72b..dd53d36e 100644 --- a/primed/dbgap/views.py +++ b/primed/dbgap/views.py @@ -629,7 +629,9 @@ def get_object(self, queryset=None): def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) # Run the audit. - data_access_audit = audit.dbGaPWorkspaceAccessAudit(self.object) + data_access_audit = audit.dbGaPAccessAudit( + dbgap_workspace_queryset=self.model.objects.filter(pk=self.object.pk) + ) data_access_audit.run_audit() context["verified_table"] = data_access_audit.get_verified_table() context["errors_table"] = data_access_audit.get_errors_table() From 7221c8c7c26cae52addc29de6a62dfbad25c806b Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Thu, 1 Feb 2024 16:34:53 -0800 Subject: [PATCH 03/12] Use revamped dbGaPAccessAudit class when auditing an application --- primed/dbgap/tests/test_views.py | 46 ++++++++++++++++++++++++-------- primed/dbgap/views.py | 4 ++- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/primed/dbgap/tests/test_views.py b/primed/dbgap/tests/test_views.py index 9cebcd2b..d151d18f 100644 --- a/primed/dbgap/tests/test_views.py +++ b/primed/dbgap/tests/test_views.py @@ -3938,14 +3938,24 @@ def test_context_data_access_audit(self): self.client.force_login(self.user) response = self.client.get(self.get_url(self.application.dbgap_project_id)) self.assertIn("data_access_audit", response.context_data) + data_access_audit = response.context_data["data_access_audit"] self.assertIsInstance( - response.context_data["data_access_audit"], - audit.dbGaPApplicationAccessAudit, + data_access_audit, + audit.dbGaPAccessAudit, ) - self.assertTrue(response.context_data["data_access_audit"].completed) - self.assertEqual( - response.context_data["data_access_audit"].dbgap_application, - self.application, + self.assertTrue(data_access_audit.completed) + self.assertEqual(data_access_audit.dbgap_application_queryset.count(), 1) + self.assertIn(self.application, data_access_audit.dbgap_application_queryset) + + def test_context_data_access_audit_does_not_include_other_applications(self): + """The data_access_audit does not include other applications.""" + other_application = factories.dbGaPApplicationFactory.create() + self.client.force_login(self.user) + response = self.client.get(self.get_url(self.application.dbgap_project_id)) + data_access_audit = response.context_data["data_access_audit"] + self.assertEqual(data_access_audit.dbgap_application_queryset.count(), 1) + self.assertNotIn( + other_application, data_access_audit.dbgap_application_queryset ) def test_context_verified_table_access(self): @@ -3975,7 +3985,7 @@ def test_context_verified_table_access(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.APPROVED_DAR, + audit.dbGaPAccessAudit.APPROVED_DAR, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -3996,7 +4006,7 @@ def test_context_verified_table_no_access(self): self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.NO_DAR, + audit.dbGaPAccessAudit.NO_DAR, ) self.assertIsNone(table.rows[0].get_cell_value("action")) @@ -4022,7 +4032,7 @@ def test_context_needs_action_table_grant(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.NEW_APPROVED_DAR, + audit.dbGaPAccessAudit.NEW_APPROVED_DAR, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4064,7 +4074,7 @@ def test_context_needs_action_table_remove(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.PREVIOUS_APPROVAL, + audit.dbGaPAccessAudit.PREVIOUS_APPROVAL, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4096,7 +4106,7 @@ def test_context_error_table_has_access(self): self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) self.assertEqual( table.rows[0].get_cell_value("note"), - audit.dbGaPApplicationAccessAudit.ERROR_HAS_ACCESS, + audit.dbGaPAccessAudit.ERROR_HAS_ACCESS, ) self.assertIsNotNone(table.rows[0].get_cell_value("action")) @@ -4244,6 +4254,20 @@ def test_context_data_access_audit(self): self.assertEqual(data_access_audit.dbgap_workspace_queryset.count(), 1) self.assertIn(self.dbgap_workspace, data_access_audit.dbgap_workspace_queryset) + def test_context_data_access_audit_does_not_include_other_applications(self): + """The data_access_audit does not include other workspaces.""" + other_workspace = factories.dbGaPWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.dbgap_workspace.workspace.billing_project.name, + self.dbgap_workspace.workspace.name, + ) + ) + data_access_audit = response.context_data["data_access_audit"] + self.assertEqual(data_access_audit.dbgap_workspace_queryset.count(), 1) + self.assertNotIn(other_workspace, data_access_audit.dbgap_workspace_queryset) + def test_context_verified_table_access(self): """verified_table shows a record when audit has verified access.""" # Add a verified workspace. diff --git a/primed/dbgap/views.py b/primed/dbgap/views.py index dd53d36e..12bf2925 100644 --- a/primed/dbgap/views.py +++ b/primed/dbgap/views.py @@ -587,7 +587,9 @@ def get_context_data(self, **kwargs): context["latest_snapshot"] = latest_snapshot if latest_snapshot: # Run the audit. - data_access_audit = audit.dbGaPApplicationAccessAudit(self.object) + data_access_audit = audit.dbGaPAccessAudit( + dbgap_application_queryset=self.model.objects.filter(pk=self.object.pk) + ) data_access_audit.run_audit() context["verified_table"] = data_access_audit.get_verified_table() context["errors_table"] = data_access_audit.get_errors_table() From 84becb1eb8552ff3f4248e43adb08600642bd2de Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Thu, 1 Feb 2024 16:47:05 -0800 Subject: [PATCH 04/12] Check type of queryset inputs to dbGaPAccessAudit Raise an error if the input is not a queryset of the correct model. --- primed/dbgap/audit.py | 24 ++++++++--- primed/dbgap/tests/test_audit.py | 72 ++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 5 deletions(-) diff --git a/primed/dbgap/audit.py b/primed/dbgap/audit.py index f21c5f75..21beedc6 100644 --- a/primed/dbgap/audit.py +++ b/primed/dbgap/audit.py @@ -1,6 +1,7 @@ from dataclasses import dataclass import django_tables2 as tables +from django.db.models import QuerySet from django.urls import reverse from django.utils.safestring import mark_safe @@ -164,13 +165,26 @@ def __init__(self, dbgap_application_queryset=None, dbgap_workspace_queryset=Non self.verified = None self.needs_action = None self.errors = None + if dbgap_application_queryset is None: + dbgap_application_queryset = dbGaPApplication.objects.all() + if not ( + isinstance(dbgap_application_queryset, QuerySet) + and dbgap_application_queryset.model is dbGaPApplication + ): + raise ValueError( + "dbgap_application_queryset must be a queryset of dbGaPApplication objects." + ) self.dbgap_application_queryset = dbgap_application_queryset + if dbgap_workspace_queryset is None: + dbgap_workspace_queryset = dbGaPWorkspace.objects.all() + if not ( + isinstance(dbgap_workspace_queryset, QuerySet) + and dbgap_workspace_queryset.model is dbGaPWorkspace + ): + raise ValueError( + "dbgap_workspace_queryset must be a queryset of dbGaPWorkspace objects." + ) self.dbgap_workspace_queryset = dbgap_workspace_queryset - if not self.dbgap_application_queryset: - self.dbgap_application_queryset = dbGaPApplication.objects.all() - if not self.dbgap_workspace_queryset: - self.dbgap_workspace_queryset = dbGaPWorkspace.objects.all() - # Check types of workspaces. def run_audit(self): self.verified = [] diff --git a/primed/dbgap/tests/test_audit.py b/primed/dbgap/tests/test_audit.py index 4d44ce9a..2eae1695 100644 --- a/primed/dbgap/tests/test_audit.py +++ b/primed/dbgap/tests/test_audit.py @@ -503,6 +503,78 @@ def test_two_applications(self): ) self.assertEqual(record.data_access_request, dar_2) + def test_dbgap_application_queryset(self): + """dbGapAccessAudit only includes the specified dbgap_application_queryset objects.""" + dbgap_application_1 = factories.dbGaPApplicationFactory.create() + dbgap_application_2 = factories.dbGaPApplicationFactory.create() + dbgap_application_3 = factories.dbGaPApplicationFactory.create() + dbgap_audit = audit.dbGaPAccessAudit( + dbgap_application_queryset=models.dbGaPApplication.objects.filter( + pk__in=[dbgap_application_1.pk, dbgap_application_2.pk] + ) + ) + self.assertEqual(dbgap_audit.dbgap_application_queryset.count(), 2) + self.assertIn(dbgap_application_1, dbgap_audit.dbgap_application_queryset) + self.assertIn(dbgap_application_2, dbgap_audit.dbgap_application_queryset) + self.assertNotIn(dbgap_application_3, dbgap_audit.dbgap_application_queryset) + + def test_dbgap_workspace_queryset(self): + """dbGapAccessAudit only includes the specified dbgap_application_queryset objects.""" + dbgap_workspace_1 = factories.dbGaPWorkspaceFactory.create() + dbgap_workspace_2 = factories.dbGaPWorkspaceFactory.create() + dbgap_workspace_3 = factories.dbGaPWorkspaceFactory.create() + dbgap_audit = audit.dbGaPAccessAudit( + dbgap_workspace_queryset=models.dbGaPWorkspace.objects.filter( + pk__in=[dbgap_workspace_1.pk, dbgap_workspace_2.pk] + ) + ) + self.assertEqual(dbgap_audit.dbgap_workspace_queryset.count(), 2) + self.assertIn(dbgap_workspace_1, dbgap_audit.dbgap_workspace_queryset) + self.assertIn(dbgap_workspace_2, dbgap_audit.dbgap_workspace_queryset) + self.assertNotIn(dbgap_workspace_3, dbgap_audit.dbgap_workspace_queryset) + + def test_dbgap_workspace_queryset_wrong_class(self): + """dbGaPAccessAudit raises error if dbgap_workspace_queryset has the wrong model class.""" + with self.assertRaises(ValueError) as e: + audit.dbGaPAccessAudit( + dbgap_workspace_queryset=models.dbGaPApplication.objects.all() + ) + self.assertEqual( + str(e.exception), + "dbgap_workspace_queryset must be a queryset of dbGaPWorkspace objects.", + ) + + def test_dbgap_workspace_queryset_not_queryset(self): + """dbGaPAccessAudit raises error if dbgap_workspace_queryset is not a queryset.""" + dbgap_workspace = factories.dbGaPWorkspaceFactory.create() + with self.assertRaises(ValueError) as e: + audit.dbGaPAccessAudit(dbgap_workspace_queryset=dbgap_workspace) + self.assertEqual( + str(e.exception), + "dbgap_workspace_queryset must be a queryset of dbGaPWorkspace objects.", + ) + + def test_dbgap_application_queryset_wrong_class(self): + """dbGaPAccessAudit raises error if dbgap_application_queryset has the wrong model class.""" + with self.assertRaises(ValueError) as e: + audit.dbGaPAccessAudit( + dbgap_application_queryset=models.dbGaPWorkspace.objects.all() + ) + self.assertEqual( + str(e.exception), + "dbgap_application_queryset must be a queryset of dbGaPApplication objects.", + ) + + def test_dbgap_application_queryset_not_queryset(self): + """dbGaPAccessAudit raises error if dbgap_application_queryset is not a queryset.""" + dbgap_application = factories.dbGaPApplicationFactory.create() + with self.assertRaises(ValueError) as e: + audit.dbGaPAccessAudit(dbgap_application_queryset=dbgap_application) + self.assertEqual( + str(e.exception), + "dbgap_application_queryset must be a queryset of dbGaPApplication objects.", + ) + class dbGaPAccessAuditTableTest(TestCase): """Tests for the `dbGaPAccessAuditTableTest` table.""" From 716440511024d09b1ab899903818119dbe87cd55 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 12:05:08 -0800 Subject: [PATCH 05/12] Create dbGaP access groups with expected name in factory When creating a dbGaPApplication using a factory, give the AnVIL access group the name that it would be given if you created it via the create view. --- primed/dbgap/tests/factories.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/primed/dbgap/tests/factories.py b/primed/dbgap/tests/factories.py index df4d0916..810d2cda 100644 --- a/primed/dbgap/tests/factories.py +++ b/primed/dbgap/tests/factories.py @@ -2,6 +2,7 @@ ManagedGroupFactory, WorkspaceFactory, ) +from django.conf import settings from factory import ( Dict, DictFactory, @@ -99,7 +100,15 @@ class dbGaPApplicationFactory(DjangoModelFactory): principal_investigator = SubFactory(UserFactory) dbgap_project_id = Sequence(lambda n: n + 1) - anvil_access_group = SubFactory(ManagedGroupFactory) + anvil_access_group = SubFactory( + ManagedGroupFactory, + name=LazyAttribute( + lambda o: "{}_DBGAP_ACCESS_{}".format( + settings.ANVIL_DATA_ACCESS_GROUP_PREFIX, + o.factory_parent.dbgap_project_id, + ) + ), + ) class Meta: model = models.dbGaPApplication From 87dfad99ffc7e9361eb17f21b567ef2ce5908882 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 12:08:41 -0800 Subject: [PATCH 06/12] Create dbGaP auth domains with a related name When creating the dbGaP workspace auth domain using dbGaPWrkspace factory, give the auth domain group a name that is related to the workspace name. --- .../collaborative_analysis/tests/factories.py | 47 ++++++++++++++++++ primed/dbgap/tests/factories.py | 4 +- primed/templates/dbgap/dbgap_audit.html | 49 +++++++++++++++++++ 3 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 primed/collaborative_analysis/tests/factories.py create mode 100644 primed/templates/dbgap/dbgap_audit.html diff --git a/primed/collaborative_analysis/tests/factories.py b/primed/collaborative_analysis/tests/factories.py new file mode 100644 index 00000000..30175a73 --- /dev/null +++ b/primed/collaborative_analysis/tests/factories.py @@ -0,0 +1,47 @@ +from anvil_consortium_manager.tests.factories import ( + ManagedGroupFactory, + WorkspaceFactory, +) +from factory import LazyAttribute, SubFactory, post_generation +from factory.django import DjangoModelFactory + +from primed.users.tests.factories import UserFactory + +from .. import models + + +class CollaborativeAnalysisWorkspaceFactory(DjangoModelFactory): + class Meta: + model = models.CollaborativeAnalysisWorkspace + skip_postgeneration_save = True + + workspace = SubFactory(WorkspaceFactory, workspace_type="collab_analysis") + custodian = SubFactory(UserFactory) + analyst_group = SubFactory( + ManagedGroupFactory, + name=LazyAttribute( + lambda o: "analysts_{}".format(o.factory_parent.workspace.name) + ), + ) + + @post_generation + def authorization_domains(self, create, extracted, **kwargs): + # Add an authorization domain. + if not create: + # Simple build, do nothing. + return + + # Create an authorization domain. + auth_domain = ManagedGroupFactory.create( + name="auth_{}".format(self.workspace.name) + ) + print(auth_domain) + self.workspace.authorization_domains.add(auth_domain) + + @post_generation + def source_workspaces(self, create, extracted, **kwargs): + if not create or not extracted: + # Simple build, do nothing. + return + + self.source_workspaces.add(*extracted) diff --git a/primed/dbgap/tests/factories.py b/primed/dbgap/tests/factories.py index 810d2cda..6dca440e 100644 --- a/primed/dbgap/tests/factories.py +++ b/primed/dbgap/tests/factories.py @@ -91,7 +91,9 @@ def authorization_domains(self, create, extracted, **kwargs): return # Create an authorization domain. - auth_domain = ManagedGroupFactory.create() + auth_domain = ManagedGroupFactory.create( + name="auth_{}".format(self.workspace.name) + ) self.workspace.authorization_domains.add(auth_domain) diff --git a/primed/templates/dbgap/dbgap_audit.html b/primed/templates/dbgap/dbgap_audit.html new file mode 100644 index 00000000..11d8439a --- /dev/null +++ b/primed/templates/dbgap/dbgap_audit.html @@ -0,0 +1,49 @@ +{% extends "anvil_consortium_manager/base.html" %} +{% load django_tables2 %} + +{% block title %}dbGaP access audit{% endblock %} + + +{% block content %} + +

dbGaP access audit

+ +
+ Auditing accesss to all dbGaP workspaces for all dbGaP applications. +
+ +

Audit results

+ +
+

+ For each dbGaP workspace and application in the app, the audit checks that access is correct. + To have access to a workspace, the group associated with the dbGaP application must be a member of the authorization domain of that workspace. It does not check that the workspace is shared with the group. +

+

The audit result categories are explained below. +

    + +
  • Verified includes the following:
  • +
      +
    • No DAR is associated with this workspace and the group does not have access.
    • +
    • There is a DAR associated with this workspace, but it is not approved, and the group does nto have access.
    • +
    • There is an approved DAR associated with this workspace and the group has access.
    • +
    + +
  • Needs action includes the following:
  • +
      +
    • There is an approved DAR for this workspace, but the group does not have access.
    • +
    • A DAR that used to be approved for this workspace is no longer approved, and the group has access.
    • +
    + +
  • Errors
  • +
      +
    • The group has access to a workspace for an unknown reason.
    • +
    +
+

+

Any errors should be reported!

+
+ +{% include "__audit_tables.html" with verified_table=verified_table needs_action_table=needs_action_table errors_table=errors_table %} + +{% endblock content %} From 7014e7bbce7e300df98edde46b5a85109897a419 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 12:56:12 -0800 Subject: [PATCH 07/12] Add script to add example dbGaP data to database As part of this, update the dbGaP DAR factory to match the workpace consent abbreviation as well as code. --- add_dbgap_example_data.py | 119 ++++++++++++++++++++++++++++++++ primed/dbgap/tests/factories.py | 4 +- 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 add_dbgap_example_data.py diff --git a/add_dbgap_example_data.py b/add_dbgap_example_data.py new file mode 100644 index 00000000..c0d42e74 --- /dev/null +++ b/add_dbgap_example_data.py @@ -0,0 +1,119 @@ +# Temporary script to create some test data. +# Run with: python manage.py shell < add_cdsa_example_data.py + +from anvil_consortium_manager.tests.factories import GroupGroupMembershipFactory + +from primed.dbgap import models +from primed.dbgap.tests import factories +from primed.primed_anvil.tests.factories import StudyFactory +from primed.users.tests.factories import UserFactory + +# Studies +fhs = StudyFactory.create(short_name="FHS", full_name="Framingham Heart Study") +mesa = StudyFactory.create( + short_name="MESA", full_name="Multi-Ethnic Study of Atherosclerosis" +) + + +# dbGaP study accessions +dbgap_study_accession_fhs = factories.dbGaPStudyAccessionFactory.create( + dbgap_phs=7, studies=[fhs] +) +dbgap_study_accession_mesa = factories.dbGaPStudyAccessionFactory.create( + dbgap_phs=209, studies=[mesa] +) + + +# Create some dbGaP workspaces. +workspace_fhs_1 = factories.dbGaPWorkspaceFactory.create( + dbgap_study_accession=dbgap_study_accession_fhs, + dbgap_version=33, + dbgap_participant_set=12, + dbgap_consent_code=1, + dbgap_consent_abbreviation="HMB", +) +workspace_fhs_2 = factories.dbGaPWorkspaceFactory.create( + dbgap_study_accession=dbgap_study_accession_fhs, + dbgap_version=33, + dbgap_participant_set=12, + dbgap_consent_code=2, + dbgap_consent_abbreviation="GRU", +) +workspace_mesa_1 = factories.dbGaPWorkspaceFactory.create( + dbgap_study_accession=dbgap_study_accession_mesa, + dbgap_version=2, + dbgap_participant_set=1, + dbgap_consent_code=1, + dbgap_consent_abbreviation="HMB-NPU", +) +workspace_mesa_2 = factories.dbGaPWorkspaceFactory.create( + dbgap_study_accession=dbgap_study_accession_mesa, + dbgap_version=2, + dbgap_participant_set=1, + dbgap_consent_code=2, + dbgap_consent_abbreviation="HMB-NPU-IRB", +) + + +# Create some dbGaP applications +dbgap_application_1 = factories.dbGaPApplicationFactory.create( + principal_investigator=UserFactory.create(name="Ken", username="Ken"), + dbgap_project_id=33119, +) +# Add a snapshot +dar_snapshot_1 = factories.dbGaPDataAccessSnapshotFactory.create( + dbgap_application=dbgap_application_1 +) +# Add some data access requests. +dar_1_1 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace_fhs_1, + dbgap_data_access_snapshot=dar_snapshot_1, + dbgap_dac="NHLBI", + dbgap_current_status=models.dbGaPDataAccessRequest.APPROVED, +) +dar_1_2 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace_fhs_2, + dbgap_data_access_snapshot=dar_snapshot_1, + dbgap_dac="NHLBI", + dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, +) +dar_1_3 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace_mesa_1, + dbgap_data_access_snapshot=dar_snapshot_1, + dbgap_dac="NHLBI", + dbgap_current_status=models.dbGaPDataAccessRequest.APPROVED, +) +dar_1_4 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace_mesa_2, + dbgap_data_access_snapshot=dar_snapshot_1, + dbgap_dac="NHLBI", + dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, +) + +dbgap_application_2 = factories.dbGaPApplicationFactory.create( + principal_investigator=UserFactory.create(name="Alisa", username="Alisa"), + dbgap_project_id=33371, +) +# Add a snapshot +dar_snapshot_2 = factories.dbGaPDataAccessSnapshotFactory.create( + dbgap_application=dbgap_application_2 +) +# Add some data access requests, only for FHS. +dar_1_1 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace_fhs_1, + dbgap_data_access_snapshot=dar_snapshot_2, + dbgap_dac="NHLBI", + dbgap_current_status=models.dbGaPDataAccessRequest.APPROVED, +) +dar_1_2 = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace_fhs_2, + dbgap_data_access_snapshot=dar_snapshot_2, + dbgap_dac="NHLBI", + dbgap_current_status=models.dbGaPDataAccessRequest.APPROVED, +) + +# Now add dbGaP access groups. +GroupGroupMembershipFactory.create( + parent_group=workspace_fhs_1.workspace.authorization_domains.first(), + child_group=dbgap_application_1.anvil_access_group, +) diff --git a/primed/dbgap/tests/factories.py b/primed/dbgap/tests/factories.py index 6dca440e..32151573 100644 --- a/primed/dbgap/tests/factories.py +++ b/primed/dbgap/tests/factories.py @@ -166,7 +166,9 @@ class dbGaPDataAccessRequestForWorkspaceFactory(DjangoModelFactory): lambda o: o.dbgap_workspace.dbgap_participant_set ) dbgap_consent_code = LazyAttribute(lambda o: o.dbgap_workspace.dbgap_consent_code) - dbgap_consent_abbreviation = Faker("word") + dbgap_consent_abbreviation = LazyAttribute( + lambda o: o.dbgap_workspace.dbgap_consent_abbreviation + ) dbgap_current_status = models.dbGaPDataAccessRequest.APPROVED dbgap_dac = Faker("word") From df96271700e4c29374691358aba3aaf5629b418b Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 12:58:42 -0800 Subject: [PATCH 08/12] Add all *.db files to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 1821bccc..b54f9b3b 100644 --- a/.gitignore +++ b/.gitignore @@ -279,6 +279,7 @@ primed/media/ primed.db backup_primed.db +*.db ### dbbackups dbbackups/ From e7a6f04c0ba7ada0ffcc9d3566bcc64f57158c87 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 13:31:44 -0800 Subject: [PATCH 09/12] Add a workspace that has no DARs in example data script --- add_dbgap_example_data.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/add_dbgap_example_data.py b/add_dbgap_example_data.py index c0d42e74..03b91535 100644 --- a/add_dbgap_example_data.py +++ b/add_dbgap_example_data.py @@ -13,7 +13,9 @@ mesa = StudyFactory.create( short_name="MESA", full_name="Multi-Ethnic Study of Atherosclerosis" ) - +aric = StudyFactory.create( + short_name="ARIC", full_name="Atherosclerosis Risk in Communities" +) # dbGaP study accessions dbgap_study_accession_fhs = factories.dbGaPStudyAccessionFactory.create( @@ -22,6 +24,9 @@ dbgap_study_accession_mesa = factories.dbGaPStudyAccessionFactory.create( dbgap_phs=209, studies=[mesa] ) +dbgap_study_accession_aric = factories.dbGaPStudyAccessionFactory.create( + dbgap_phs=280, studies=[aric] +) # Create some dbGaP workspaces. @@ -31,6 +36,7 @@ dbgap_participant_set=12, dbgap_consent_code=1, dbgap_consent_abbreviation="HMB", + workspace__name="DBGAP_FHS_v33_p12_HMB", ) workspace_fhs_2 = factories.dbGaPWorkspaceFactory.create( dbgap_study_accession=dbgap_study_accession_fhs, @@ -38,6 +44,7 @@ dbgap_participant_set=12, dbgap_consent_code=2, dbgap_consent_abbreviation="GRU", + workspace__name="DBGAP_FHS_v33_p12_GRU", ) workspace_mesa_1 = factories.dbGaPWorkspaceFactory.create( dbgap_study_accession=dbgap_study_accession_mesa, @@ -45,6 +52,7 @@ dbgap_participant_set=1, dbgap_consent_code=1, dbgap_consent_abbreviation="HMB-NPU", + workspace__name="DBGAP_MESA_v2_p1_HMB-NPU", ) workspace_mesa_2 = factories.dbGaPWorkspaceFactory.create( dbgap_study_accession=dbgap_study_accession_mesa, @@ -52,6 +60,15 @@ dbgap_participant_set=1, dbgap_consent_code=2, dbgap_consent_abbreviation="HMB-NPU-IRB", + workspace__name="DBGAP_MESA_v2_p1_HMB-NPU-IRB", +) +workspace_aric = factories.dbGaPWorkspaceFactory.create( + dbgap_study_accession=dbgap_study_accession_aric, + dbgap_version=3, + dbgap_participant_set=1, + dbgap_consent_code=1, + dbgap_consent_abbreviation="HMB", + workspace__name="DBGAP_ARIC_v3_p1_HMB", ) From b1468de475f530aea7aaec8d3cb86027aa49db5e Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 14:40:50 -0800 Subject: [PATCH 10/12] Add view to audit all dbGaP applications and workspaces Add a view that audits access to all dbGaP workspaces for all applications. --- primed/dbgap/tests/test_audit.py | 24 +++ primed/dbgap/tests/test_views.py | 291 +++++++++++++++++++++++++++++++ primed/dbgap/urls.py | 5 +- primed/dbgap/views.py | 18 ++ 4 files changed, 336 insertions(+), 2 deletions(-) diff --git a/primed/dbgap/tests/test_audit.py b/primed/dbgap/tests/test_audit.py index 2eae1695..2ed0431a 100644 --- a/primed/dbgap/tests/test_audit.py +++ b/primed/dbgap/tests/test_audit.py @@ -256,6 +256,27 @@ def test_verified_no_access_dar_not_approved(self): self.assertEqual(record.data_access_request, dar) self.assertEqual(record.note, audit.dbGaPAccessAudit.DAR_NOT_APPROVED) + def test_verified_no_access_no_dar(self): + """run_audit with one application and one workspace that has verified no access.""" + # Create a workspace and matching DAR. + dbgap_application = factories.dbGaPApplicationFactory.create() + factories.dbGaPDataAccessSnapshotFactory.create( + dbgap_application=dbgap_application + ) + dbgap_workspace = factories.dbGaPWorkspaceFactory.create() + # Do not add the anvil group to the auth group for the workspace. + dbgap_audit = audit.dbGaPAccessAudit() + dbgap_audit.run_audit() + self.assertEqual(len(dbgap_audit.verified), 1) + self.assertEqual(len(dbgap_audit.needs_action), 0) + self.assertEqual(len(dbgap_audit.errors), 0) + record = dbgap_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedNoAccess) + self.assertEqual(record.workspace, dbgap_workspace) + self.assertEqual(record.dbgap_application, dbgap_application) + self.assertIsNone(record.data_access_request) + self.assertEqual(record.note, audit.dbGaPAccessAudit.NO_DAR) + def test_grant_access_new_approved_dar(self): # Create a workspace and matching DAR. # Workspace was created before the snapshot. @@ -575,6 +596,9 @@ def test_dbgap_application_queryset_not_queryset(self): "dbgap_application_queryset must be a queryset of dbGaPApplication objects.", ) + def test_two_applications_two_workspaces(self): + pass + class dbGaPAccessAuditTableTest(TestCase): """Tests for the `dbGaPAccessAuditTableTest` table.""" diff --git a/primed/dbgap/tests/test_views.py b/primed/dbgap/tests/test_views.py index d151d18f..575f288b 100644 --- a/primed/dbgap/tests/test_views.py +++ b/primed/dbgap/tests/test_views.py @@ -4603,6 +4603,297 @@ def test_context_errors_table_no_dar_has_access(self): self.assertIsNotNone(table.rows[0].get_cell_value("action")) +class dbGaPAuditTest(TestCase): + """Tests for the dbGaPAudit view.""" + + def setUp(self): + """Set up test class.""" + self.factory = RequestFactory() + # Create a user with both view and edit permission. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse( + "dbgap:audit", + args=args, + ) + + def get_view(self): + """Return the view being tested.""" + return views.dbGaPAudit.as_view() + + def test_view_redirect_not_logged_in(self): + "View redirects to login view when user is not logged in." + # Need a client for redirects. + response = self.client.get(self.get_url()) + self.assertRedirects( + response, + resolve_url(settings.LOGIN_URL) + "?next=" + self.get_url(), + ) + + def test_status_code_with_user_permission_staff_view(self): + """Returns successful response code if the user has staff view permission.""" + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, 200) + + def test_status_code_with_user_permission_view(self): + """Returns successful response code if the user has view permission.""" + user = User.objects.create_user(username="test-none", password="test-none") + user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.VIEW_PERMISSION_CODENAME + ) + ) + request = self.factory.get(self.get_url()) + request.user = user + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_access_without_user_permission(self): + """Raises permission denied if user has no permissions.""" + user_no_perms = User.objects.create_user( + username="test-none", password="test-none" + ) + request = self.factory.get(self.get_url()) + request.user = user_no_perms + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_context_data_access_audit(self): + """The data_access_audit exists in the context.""" + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("data_access_audit", response.context_data) + data_access_audit = response.context_data["data_access_audit"] + self.assertIsInstance( + data_access_audit, + audit.dbGaPAccessAudit, + ) + self.assertTrue(data_access_audit.completed) + + def test_no_applications_no_workspaces(self): + """Loads correctly if there are no applications or workspaces.""" + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, 200) + dbgap_audit = response.context_data["data_access_audit"] + self.assertEqual(dbgap_audit.dbgap_application_queryset.count(), 0) + self.assertEqual(dbgap_audit.dbgap_workspace_queryset.count(), 0) + + def test_one_applications_no_workspaces(self): + """Loads correctly if there is one application and no workspaces.""" + dbgap_application = factories.dbGaPApplicationFactory.create() + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, 200) + dbgap_audit = response.context_data["data_access_audit"] + self.assertEqual(dbgap_audit.dbgap_application_queryset.count(), 1) + self.assertIn(dbgap_application, dbgap_audit.dbgap_application_queryset) + self.assertEqual(dbgap_audit.dbgap_workspace_queryset.count(), 0) + self.assertEqual(len(dbgap_audit.verified), 0) + self.assertEqual(len(dbgap_audit.needs_action), 0) + self.assertEqual(len(dbgap_audit.errors), 0) + + def test_no_applications_one_workspaces(self): + """Loads correctly if there is no application one workspace.""" + dbgap_workspace = factories.dbGaPWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, 200) + dbgap_audit = response.context_data["data_access_audit"] + self.assertEqual(dbgap_audit.dbgap_application_queryset.count(), 0) + self.assertEqual(dbgap_audit.dbgap_workspace_queryset.count(), 1) + self.assertIn(dbgap_workspace, dbgap_audit.dbgap_workspace_queryset) + self.assertEqual(len(dbgap_audit.verified), 0) + self.assertEqual(len(dbgap_audit.needs_action), 0) + self.assertEqual(len(dbgap_audit.errors), 0) + + def test_three_applications_two_workspaces(self): + """Loads correctly if there is one application and no workspaces.""" + dbgap_application_1 = factories.dbGaPApplicationFactory.create() + dbgap_application_2 = factories.dbGaPApplicationFactory.create() + dbgap_application_3 = factories.dbGaPApplicationFactory.create() + dbgap_workspace_1 = factories.dbGaPWorkspaceFactory.create() + dbgap_workspace_2 = factories.dbGaPWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, 200) + dbgap_audit = response.context_data["data_access_audit"] + self.assertEqual(dbgap_audit.dbgap_application_queryset.count(), 3) + self.assertIn(dbgap_application_1, dbgap_audit.dbgap_application_queryset) + self.assertIn(dbgap_application_2, dbgap_audit.dbgap_application_queryset) + self.assertIn(dbgap_application_3, dbgap_audit.dbgap_application_queryset) + self.assertEqual(dbgap_audit.dbgap_workspace_queryset.count(), 2) + self.assertIn(dbgap_workspace_1, dbgap_audit.dbgap_workspace_queryset) + self.assertIn(dbgap_workspace_2, dbgap_audit.dbgap_workspace_queryset) + self.assertIn("verified_table", response.context_data) + self.assertEqual(len(response.context_data["verified_table"].rows), 6) + + def test_context_verified_table_access(self): + """verified_table shows a record when audit has verified access.""" + # Add a verified workspace. + workspace = factories.dbGaPWorkspaceFactory.create() + dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace + ) + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("verified_table", response.context_data) + table = response.context_data["verified_table"] + self.assertIsInstance( + table, + audit.dbGaPAccessAuditTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual(table.rows[0].get_cell_value("workspace"), workspace) + self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.dbGaPAccessAudit.APPROVED_DAR, + ) + self.assertIsNone(table.rows[0].get_cell_value("action")) + + def test_context_verified_table_no_access(self): + """verified_table shows a record when audit has verified no access.""" + snapshot = factories.dbGaPDataAccessSnapshotFactory.create() + workspace = factories.dbGaPWorkspaceFactory.create() + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("verified_table", response.context_data) + table = response.context_data["verified_table"] + self.assertIsInstance( + table, + audit.dbGaPAccessAuditTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("application"), snapshot.dbgap_application + ) + self.assertEqual(table.rows[0].get_cell_value("workspace"), workspace) + self.assertIsNone(table.rows[0].get_cell_value("data_access_request")) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.dbGaPAccessAudit.NO_DAR, + ) + self.assertIsNone(table.rows[0].get_cell_value("action")) + + def test_context_needs_action_table_grant(self): + """needs_action_table shows a record when audit finds that access needs to be granted.""" + workspace = factories.dbGaPWorkspaceFactory.create( + created=timezone.now() - timedelta(weeks=4) + ) + dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("needs_action_table", response.context_data) + table = response.context_data["needs_action_table"] + self.assertIsInstance( + table, + audit.dbGaPAccessAuditTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("application"), + dar.dbgap_data_access_snapshot.dbgap_application, + ) + self.assertEqual(table.rows[0].get_cell_value("workspace"), workspace) + self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.dbGaPAccessAudit.NEW_APPROVED_DAR, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + def test_context_needs_action_table_remove(self): + """needs_action_table shows a record when audit finds that access needs to be removed.""" + workspace = factories.dbGaPWorkspaceFactory.create() + dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace, + dbgap_current_status=models.dbGaPDataAccessRequest.CLOSED, + ) + # Create an old dar that was approved + factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_dar_id=dar.dbgap_dar_id, + dbgap_data_access_snapshot__dbgap_application=dar.dbgap_data_access_snapshot.dbgap_application, + dbgap_data_access_snapshot__created=timezone.now() - timedelta(weeks=4), + dbgap_data_access_snapshot__is_most_recent=False, + dbgap_workspace=workspace, + dbgap_current_status=models.dbGaPDataAccessRequest.APPROVED, + ) + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("needs_action_table", response.context_data) + table = response.context_data["needs_action_table"] + self.assertIsInstance( + table, + audit.dbGaPAccessAuditTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("application"), + dar.dbgap_data_access_snapshot.dbgap_application, + ) + self.assertEqual(table.rows[0].get_cell_value("workspace"), workspace) + self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.dbGaPAccessAudit.PREVIOUS_APPROVAL, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + def test_context_error_table_has_access(self): + """error shows a record when audit finds that access needs to be removed.""" + workspace = factories.dbGaPWorkspaceFactory.create() + # Create a rejected DAR. + dar = factories.dbGaPDataAccessRequestForWorkspaceFactory.create( + dbgap_workspace=workspace, + dbgap_current_status=models.dbGaPDataAccessRequest.REJECTED, + ) + # Create the membership. + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=dar.dbgap_data_access_snapshot.dbgap_application.anvil_access_group, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("errors_table", response.context_data) + table = response.context_data["errors_table"] + self.assertIsInstance( + table, + audit.dbGaPAccessAuditTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual(table.rows[0].get_cell_value("workspace"), workspace) + self.assertEqual(table.rows[0].get_cell_value("data_access_request"), dar) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.dbGaPAccessAudit.ERROR_HAS_ACCESS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + class dbGaPRecordsIndexTest(TestCase): """Tests for the dbGaPRecordsIndex view.""" diff --git a/primed/dbgap/urls.py b/primed/dbgap/urls.py index b79bdc07..4f7eba43 100644 --- a/primed/dbgap/urls.py +++ b/primed/dbgap/urls.py @@ -102,9 +102,10 @@ urlpatterns = [ - path("studies/", include(dbgap_study_accession_patterns)), path("applications/", include(dbgap_application_patterns)), + path("audit/", views.dbGaPAudit.as_view(), name="audit"), path("dars/", include(data_access_request_patterns)), - path("workspaces/", include(dbgap_workspace_patterns)), path("records/", include(records_patterns)), + path("studies/", include(dbgap_study_accession_patterns)), + path("workspaces/", include(dbgap_workspace_patterns)), ] diff --git a/primed/dbgap/views.py b/primed/dbgap/views.py index 12bf2925..18786ba4 100644 --- a/primed/dbgap/views.py +++ b/primed/dbgap/views.py @@ -552,6 +552,24 @@ def get_context_data(self, **kwargs): return context +class dbGaPAudit(AnVILConsortiumManagerStaffViewRequired, TemplateView): + """View to audit access for all dbGaPApplications and dbGaPWorkspaces.""" + + template_name = "dbgap/dbgap_audit.html" + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + # Run the audit. + data_access_audit = audit.dbGaPAccessAudit() + data_access_audit.run_audit() + print(data_access_audit.get_verified_table()) + context["verified_table"] = data_access_audit.get_verified_table() + context["errors_table"] = data_access_audit.get_errors_table() + context["needs_action_table"] = data_access_audit.get_needs_action_table() + context["data_access_audit"] = data_access_audit + return context + + class dbGaPApplicationAudit(AnVILConsortiumManagerStaffViewRequired, DetailView): """View to show audit results for a `dbGaPApplication`.""" From 315721ec7c6880a0a221c4489fbf0f5d4f9c9eee Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Fri, 2 Feb 2024 14:45:22 -0800 Subject: [PATCH 11/12] Add new dbGaP audit link to navbar --- primed/templates/dbgap/nav_items.html | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/primed/templates/dbgap/nav_items.html b/primed/templates/dbgap/nav_items.html index 3b1c7f23..d53b8314 100644 --- a/primed/templates/dbgap/nav_items.html +++ b/primed/templates/dbgap/nav_items.html @@ -20,14 +20,19 @@
  • List all current DARs
  • - {% if perms.anvil_consortium_manager.anvil_consortium_manager_staff_edit %} -
  • - Add a new dbGaP application -
  • -
  • - Update DARs for all applications -
  • - {% endif %} + {% if perms.anvil_consortium_manager.anvil_consortium_manager_staff_edit %} +
  • + Add a new dbGaP application +
  • +
  • + Update DARs for all applications +
  • + {% endif %} + {% if perms.anvil_consortium_manager.anvil_consortium_manager_staff_view %} +
  • + Audit dbGaP access +
  • + {% endif %}
  • From e0b4157003f95c3273b761ae6f5ff530e82692e9 Mon Sep 17 00:00:00 2001 From: Adrienne Stilp Date: Tue, 6 Feb 2024 10:46:09 -0800 Subject: [PATCH 12/12] Add has_access field to dbGaP audit results Add a field to the data classes for audit results, and set it automatically as a class attribute for different subclasses. Show the has_access field in the audit results table. --- primed/dbgap/audit.py | 13 +++++++++++-- primed/dbgap/tests/test_audit.py | 16 ++++++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/primed/dbgap/audit.py b/primed/dbgap/audit.py index 21beedc6..c82540f2 100644 --- a/primed/dbgap/audit.py +++ b/primed/dbgap/audit.py @@ -6,6 +6,8 @@ from django.utils.safestring import mark_safe # from . import models +from primed.primed_anvil.tables import BooleanIconColumn + from .models import ( dbGaPApplication, dbGaPDataAccessRequest, @@ -22,6 +24,7 @@ class AuditResult: workspace: dbGaPWorkspace note: str dbgap_application: dbGaPApplication + has_access: bool data_access_request: dbGaPDataAccessRequest = None def __post_init__(self): @@ -55,6 +58,7 @@ def get_table_dictionary(self): "data_access_request": self.data_access_request, "dar_accession": dar_accession, "dar_consent": dar_consent, + "has_access": self.has_access, "note": self.note, "action": self.get_action(), "action_url": self.get_action_url(), @@ -66,20 +70,22 @@ def get_table_dictionary(self): class VerifiedAccess(AuditResult): """Audit results class for when access has been verified.""" - pass + has_access: bool = True @dataclass class VerifiedNoAccess(AuditResult): """Audit results class for when no access has been verified.""" - pass + has_access: bool = False @dataclass class GrantAccess(AuditResult): """Audit results class for when access should be granted.""" + has_access: bool = False + def get_action(self): return "Grant access" @@ -97,6 +103,8 @@ def get_action_url(self): class RemoveAccess(AuditResult): """Audit results class for when access should be removed for a known reason.""" + has_access: bool = True + def get_action(self): return "Remove access" @@ -125,6 +133,7 @@ class dbGaPAccessAuditTable(tables.Table): data_access_request = tables.Column() dar_accession = tables.Column(verbose_name="DAR accession") dar_consent = tables.Column(verbose_name="DAR consent") + has_access = BooleanIconColumn(show_false_icon=True) note = tables.Column() action = tables.Column() diff --git a/primed/dbgap/tests/test_audit.py b/primed/dbgap/tests/test_audit.py index 2ed0431a..5531dd94 100644 --- a/primed/dbgap/tests/test_audit.py +++ b/primed/dbgap/tests/test_audit.py @@ -22,6 +22,7 @@ def test_verified_access(self): note="foo", ) self.assertIsNone(instance.get_action_url()) + self.assertTrue(instance.has_access) def test_verified_no_access(self): dbgap_workspace = factories.dbGaPWorkspaceFactory.create() @@ -33,6 +34,7 @@ def test_verified_no_access(self): note="foo", ) self.assertIsNone(instance.get_action_url()) + self.assertFalse(instance.has_access) def test_grant_access(self): dbgap_workspace = factories.dbGaPWorkspaceFactory.create() @@ -52,6 +54,7 @@ def test_grant_access(self): ], ) self.assertEqual(instance.get_action_url(), expected_url) + self.assertFalse(instance.has_access) def test_remove_access(self): dbgap_workspace = factories.dbGaPWorkspaceFactory.create() @@ -71,12 +74,15 @@ def test_remove_access(self): ], ) self.assertEqual(instance.get_action_url(), expected_url) + self.assertTrue(instance.has_access) def test_remove_access_no_dar(self): dbgap_workspace = factories.dbGaPWorkspaceFactory.create() dbgap_application = factories.dbGaPApplicationFactory.create() instance = audit.RemoveAccess( - workspace=dbgap_workspace, dbgap_application=dbgap_application, note="foo" + workspace=dbgap_workspace, + dbgap_application=dbgap_application, + note="foo", ) expected_url = reverse( "anvil_consortium_manager:managed_groups:member_groups:delete", @@ -86,6 +92,7 @@ def test_remove_access_no_dar(self): ], ) self.assertEqual(instance.get_action_url(), expected_url) + self.assertTrue(instance.has_access) def test_error(self): dbgap_workspace = factories.dbGaPWorkspaceFactory.create() @@ -95,6 +102,7 @@ def test_error(self): dbgap_application=dar.dbgap_data_access_snapshot.dbgap_application, data_access_request=dar, note="foo", + has_access=True, ) self.assertIsNone(instance.get_action_url()) @@ -102,7 +110,10 @@ def test_error_no_dar(self): dbgap_workspace = factories.dbGaPWorkspaceFactory.create() dbgap_application = factories.dbGaPApplicationFactory.create() instance = audit.Error( - workspace=dbgap_workspace, dbgap_application=dbgap_application, note="foo" + workspace=dbgap_workspace, + dbgap_application=dbgap_application, + note="foo", + has_access=True, ) self.assertIsNone(instance.get_action_url()) @@ -116,6 +127,7 @@ def test_post_init(self): dbgap_application=other_application, data_access_request=dar, note="foo", + has_access=False, )