Skip to content

Commit

Permalink
Add view to resolve a collab analysis workspace audit
Browse files Browse the repository at this point in the history
Add a view that runs an audit on a specific email and
CollaborativeAnalysisWorkspace and (if a post request) handles that
audit appopriately. The view can also handle htmx requests. Modify
the workspace audit classes for CollaborativeAnalsyisWorkspaces as
necessary to achieve this: add a str method that can be displayed on
the detail page; set the "action" text for different classes with a
default, and change the action_url to be the same for all the
different types of audit result classes. Add a get_all_results method
to the CollaborativeAnalysisWorkspaceAccessAudit class.
  • Loading branch information
amstilp committed Feb 26, 2024
1 parent dad7e58 commit a108f9d
Show file tree
Hide file tree
Showing 7 changed files with 1,594 additions and 115 deletions.
184 changes: 91 additions & 93 deletions primed/collaborative_analysis/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ class AccessAuditResult:
member: Union[Account, ManagedGroup]
note: str
has_access: bool
action: str = None

def get_action_url(self):
"""The URL that handles the action needed."""
return None

def get_action(self):
"""An indicator of what action needs to be taken."""
return None
return reverse(
"collaborative_analysis:audit:resolve",
args=[
self.collaborative_analysis_workspace.workspace.billing_project.name,
self.collaborative_analysis_workspace.workspace.name,
self.member.email,
],
)

def get_table_dictionary(self):
"""Return a dictionary that can be used to populate an instance of `SignedAgreementAccessAuditTable`."""
Expand All @@ -40,7 +43,7 @@ def get_table_dictionary(self):
"member": self.member,
"has_access": self.has_access,
"note": self.note,
"action": self.get_action(),
"action": self.action,
"action_url": self.get_action_url(),
}
return row
Expand All @@ -52,68 +55,40 @@ class VerifiedAccess(AccessAuditResult):

has_access: bool = True

def __str__(self):
return f"Verified access: {self.note}"


@dataclass
class VerifiedNoAccess(AccessAuditResult):
"""Audit results class for when an account has verified no access."""

has_access: bool = False

def __str__(self):
return f"Verified no access: {self.note}"


@dataclass
class GrantAccess(AccessAuditResult):
"""Audit results class for when an account should be granted access."""

has_access: bool = False
action: str = "Grant access"

def get_action(self):
return "Grant access"

def get_action_url(self):
if isinstance(self.member, Account):
return reverse(
"anvil_consortium_manager:managed_groups:member_accounts:new_by_account",
args=[
self.collaborative_analysis_workspace.workspace.authorization_domains.first().name,
self.member.uuid,
],
)
else:
return reverse(
"anvil_consortium_manager:managed_groups:member_groups:new_by_child",
args=[
self.collaborative_analysis_workspace.workspace.authorization_domains.first().name,
self.member.name,
],
)
def __str__(self):
return f"Grant access: {self.note}"


@dataclass
class RemoveAccess(AccessAuditResult):
"""Audit results class for when access for an account should be removed."""

has_access: bool = True
action: str = "Remove access"

def get_action(self):
return "Remove access"

def get_action_url(self):
if isinstance(self.member, Account):
return reverse(
"anvil_consortium_manager:managed_groups:member_accounts:delete",
args=[
self.collaborative_analysis_workspace.workspace.authorization_domains.first().name,
self.member.uuid,
],
)
else:
return reverse(
"anvil_consortium_manager:managed_groups:member_groups:delete",
args=[
self.collaborative_analysis_workspace.workspace.authorization_domains.first().name,
self.member.name,
],
)
def __str__(self):
return f"Remove access: {self.note}"


class AccessAuditResultsTable(tables.Table):
Expand Down Expand Up @@ -281,70 +256,93 @@ def _audit_workspace_and_account(self, collaborative_analysis_workspace, account
# - analyst is in some but not all relevant source auth domains, and is not in the workspace auth domain.
# - analyst is in none of the relevant source auth domains, and is not in the workspace auth domain.
# - an account is in the workspace auth domain, but is not in the analyst group.

# Check whether access is allowed. Start by assuming yes; set to false if the account should not have access.
access_allowed = True
# Get all groups for the account.
account_groups = account.get_all_groups()
# Loop over all source workspaces.
for (
source_workspace
) in collaborative_analysis_workspace.source_workspaces.all():
# Loop over all auth domains for that source workspace.
for source_auth_domain in source_workspace.authorization_domains.all():
# If the user is not in the auth domain, they are not allowed to have access to the collab workspace.
# If so, break out of the loop - it is not necessary to check membership of the remaining auth domains.
# Note that this only breaks out of the inner loop.
# It would be more efficient to break out of the outer loop as well.
if source_auth_domain not in account_groups:
access_allowed = False
break
# Check whether the account is in the analyst group.
in_analyst_group = (
collaborative_analysis_workspace.analyst_group in account_groups
)
# Check whether the account is in the auth domain of the collab workspace.
in_auth_domain = (
collaborative_analysis_workspace.workspace.authorization_domains.first()
in account_groups
)
# Determine the audit result.
print(access_allowed)
print(in_auth_domain)
if access_allowed and in_auth_domain:
self.verified.append(
VerifiedAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.IN_SOURCE_AUTH_DOMAINS,
if in_analyst_group:
# Check whether access is allowed. Start by assuming yes, and then
# set to false if the account should not have access.
access_allowed = True
# Loop over all source workspaces.
for (
source_workspace
) in collaborative_analysis_workspace.source_workspaces.all():
# Loop over all auth domains for that source workspace.
for source_auth_domain in source_workspace.authorization_domains.all():
# If the user is not in the auth domain, they are not allowed to have access to the workspace.
# If so, break out of the loop - not necessary to check membership of the remaining auth domains.
# Note that this only breaks out of the inner loop.
# It would be more efficient to break out of the outer loop as well.
if source_auth_domain not in account_groups:
access_allowed = False
break
if access_allowed and in_auth_domain:
self.verified.append(
VerifiedAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.IN_SOURCE_AUTH_DOMAINS,
)
)
)
elif access_allowed and not in_auth_domain:
self.needs_action.append(
GrantAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.IN_SOURCE_AUTH_DOMAINS,
elif access_allowed and not in_auth_domain:
self.needs_action.append(
GrantAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.IN_SOURCE_AUTH_DOMAINS,
)
)
)
elif not access_allowed and in_auth_domain:
self.needs_action.append(
RemoveAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.NOT_IN_SOURCE_AUTH_DOMAINS,
elif not access_allowed and in_auth_domain:
self.needs_action.append(
RemoveAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.NOT_IN_SOURCE_AUTH_DOMAINS,
)
)
else:
self.verified.append(
VerifiedNoAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.NOT_IN_SOURCE_AUTH_DOMAINS,
)
)
)
else:
self.verified.append(
VerifiedNoAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.NOT_IN_SOURCE_AUTH_DOMAINS,
if in_auth_domain:
self.needs_action.append(
RemoveAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.NOT_IN_ANALYST_GROUP,
)
)
else:
self.verified.append(
VerifiedNoAccess(
collaborative_analysis_workspace=collaborative_analysis_workspace,
member=account,
note=self.NOT_IN_ANALYST_GROUP,
)
)
)

def run_audit(self):
"""Run the audit on the set of workspaces."""
for workspace in self.queryset:
self._audit_workspace(workspace)
self.completed = True

def get_all_results(self):
return self.verified + self.needs_action + self.errors

def get_verified_table(self):
"""Return a table of verified results."""
return self.results_table_class(
Expand Down
72 changes: 60 additions & 12 deletions primed/collaborative_analysis/tests/test_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,31 @@ def test_account_verified_access(self):
instance = audit.VerifiedAccess(
collaborative_analysis_workspace=workspace, member=account, note="test"
)
self.assertIsNone(instance.get_action_url())
expected_url = reverse(
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
account.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

def test_account_verified_no_access(self):
workspace = factories.CollaborativeAnalysisWorkspaceFactory.create()
account = AccountFactory.create()
instance = audit.VerifiedNoAccess(
collaborative_analysis_workspace=workspace, member=account, note="test"
)
self.assertIsNone(instance.get_action_url())
expected_url = reverse(
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
account.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

def test_account_grant_access(self):
workspace = factories.CollaborativeAnalysisWorkspaceFactory.create()
Expand All @@ -43,8 +59,12 @@ def test_account_grant_access(self):
collaborative_analysis_workspace=workspace, member=account, note="test"
)
expected_url = reverse(
"anvil_consortium_manager:managed_groups:member_accounts:new_by_account",
args=[workspace.workspace.authorization_domains.first().name, account.uuid],
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
account.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

Expand All @@ -55,8 +75,12 @@ def test_account_remove_access(self):
collaborative_analysis_workspace=workspace, member=account, note="test"
)
expected_url = reverse(
"anvil_consortium_manager:managed_groups:member_accounts:delete",
args=[workspace.workspace.authorization_domains.first().name, account.uuid],
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
account.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

Expand All @@ -66,15 +90,31 @@ def test_group_verified_access(self):
instance = audit.VerifiedAccess(
collaborative_analysis_workspace=workspace, member=group, note="test"
)
self.assertIsNone(instance.get_action_url())
expected_url = reverse(
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
group.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

def test_group_verified_no_access(self):
workspace = factories.CollaborativeAnalysisWorkspaceFactory.create()
group = ManagedGroupFactory.create()
instance = audit.VerifiedNoAccess(
collaborative_analysis_workspace=workspace, member=group, note="test"
)
self.assertIsNone(instance.get_action_url())
expected_url = reverse(
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
group.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

def test_group_grant_access(self):
workspace = factories.CollaborativeAnalysisWorkspaceFactory.create()
Expand All @@ -83,8 +123,12 @@ def test_group_grant_access(self):
collaborative_analysis_workspace=workspace, member=group, note="test"
)
expected_url = reverse(
"anvil_consortium_manager:managed_groups:member_groups:new_by_child",
args=[workspace.workspace.authorization_domains.first().name, group.name],
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
group.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

Expand All @@ -95,8 +139,12 @@ def test_group_remove_access(self):
collaborative_analysis_workspace=workspace, member=group, note="test"
)
expected_url = reverse(
"anvil_consortium_manager:managed_groups:member_groups:delete",
args=[workspace.workspace.authorization_domains.first().name, group.name],
"collaborative_analysis:audit:resolve",
args=[
workspace.workspace.billing_project.name,
workspace.workspace.name,
group.email,
],
)
self.assertEqual(instance.get_action_url(), expected_url)

Expand Down
Loading

0 comments on commit a108f9d

Please sign in to comment.