diff --git a/add_collaborative_analysis_example_data.py b/add_collaborative_analysis_example_data.py new file mode 100644 index 00000000..550e9317 --- /dev/null +++ b/add_collaborative_analysis_example_data.py @@ -0,0 +1,117 @@ +"""Temporary script to create some example data for the collaborative_analysis app. + +Run with: python manage.py shell < add_collaborative_analysis_example_data.py +""" + +from anvil_consortium_manager.tests.factories import ( + AccountFactory, + BillingProjectFactory, + GroupAccountMembershipFactory, + GroupGroupMembershipFactory, + ManagedGroupFactory, +) + +from primed.cdsa.tests.factories import CDSAWorkspaceFactory +from primed.collaborative_analysis.tests.factories import ( + CollaborativeAnalysisWorkspaceFactory, +) +from primed.dbgap.tests.factories import dbGaPWorkspaceFactory +from primed.miscellaneous_workspaces.tests.factories import OpenAccessWorkspaceFactory + +billing_project = BillingProjectFactory.create(name="test_collab") + +# Create some collaborative analysis workspace. +# Workspace 1 +collaborative_analysis_workspace_1 = CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="collab_1_open_access", workspace__billing_project=billing_project +) +source_workspace_1_1 = OpenAccessWorkspaceFactory.create( + workspace__name="source_1_open_access", workspace__billing_project=billing_project +) +collaborative_analysis_workspace_1.source_workspaces.add(source_workspace_1_1.workspace) + +# Workspace 2 +collaborative_analysis_workspace_2 = CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="collab_2_dbgap_and_cdsa", + workspace__billing_project=billing_project, +) +source_workspace_2_1 = dbGaPWorkspaceFactory.create( + workspace__name="source_2_dbgap", workspace__billing_project=billing_project +) +collaborative_analysis_workspace_2.source_workspaces.add(source_workspace_2_1.workspace) +source_workspace_2_2 = CDSAWorkspaceFactory.create( + workspace__name="source_2_cdsa", workspace__billing_project=billing_project +) +collaborative_analysis_workspace_2.source_workspaces.add(source_workspace_2_2.workspace) + +# Workspace 3 - with an error +collaborative_analysis_workspace_3 = CollaborativeAnalysisWorkspaceFactory.create( + workspace__name="collab_3_with_error", workspace__billing_project=billing_project +) +source_workspace_3_1 = OpenAccessWorkspaceFactory.create( + workspace__name="source_3_open_access", workspace__billing_project=billing_project +) +collaborative_analysis_workspace_3.source_workspaces.add(source_workspace_3_1.workspace) + + +# Add accounts to the auth domains. +account_1 = AccountFactory.create( + user__name="Adrienne", verified=True, email="adrienne@example.com" +) +account_2 = AccountFactory.create( + user__name="Ben", verified=True, email="ben@example.com" +) +account_3 = AccountFactory.create( + user__name="Matt", verified=True, email="matt@example.com" +) +account_4 = AccountFactory.create( + user__name="Stephanie", verified=True, email="stephanie@example.com" +) + +# Set up collab analysis workspace one +# analyst group +GroupAccountMembershipFactory.create( + account=account_1, group=collaborative_analysis_workspace_1.analyst_group +) +GroupAccountMembershipFactory.create( + account=account_2, group=collaborative_analysis_workspace_1.analyst_group +) +# auth domains +GroupAccountMembershipFactory.create( + account=account_1, + group=collaborative_analysis_workspace_1.workspace.authorization_domains.first(), +) + +# Set up collab analysis workspace two +# analyst group +GroupAccountMembershipFactory.create( + account=account_3, group=collaborative_analysis_workspace_2.analyst_group +) +GroupAccountMembershipFactory.create( + account=account_4, group=collaborative_analysis_workspace_2.analyst_group +) +# auth domains +GroupAccountMembershipFactory.create( + account=account_3, + group=collaborative_analysis_workspace_2.workspace.authorization_domains.first(), +) +GroupAccountMembershipFactory.create( + account=account_3, + group=source_workspace_2_1.workspace.authorization_domains.first(), +) +GroupAccountMembershipFactory.create( + account=account_3, + group=source_workspace_2_2.workspace.authorization_domains.first(), +) +GroupAccountMembershipFactory.create( + account=account_4, + group=source_workspace_2_1.workspace.authorization_domains.first(), +) + +# Set up collab analysis workspace three +# Managed group to show an error +managed_group = ManagedGroupFactory.create(name="test-error") +GroupGroupMembershipFactory.create( + parent_group=collaborative_analysis_workspace_3.workspace.authorization_domains.first(), + child_group=managed_group, +) diff --git a/config/settings/base.py b/config/settings/base.py index eb563344..5d1c7f26 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -96,6 +96,7 @@ "primed.miscellaneous_workspaces", "primed.duo", "primed.cdsa", + "primed.collaborative_analysis", ] # https://docs.djangoproject.com/en/dev/ref/settings/#installed-apps @@ -376,6 +377,7 @@ "primed.miscellaneous_workspaces.adapters.SimulatedDataWorkspaceAdapter", "primed.miscellaneous_workspaces.adapters.ResourceWorkspaceAdapter", "primed.miscellaneous_workspaces.adapters.ConsortiumDevelWorkspaceAdapter", + "primed.collaborative_analysis.adapters.CollaborativeAnalysisWorkspaceAdapter", "primed.miscellaneous_workspaces.adapters.TemplateWorkspaceAdapter", "primed.miscellaneous_workspaces.adapters.DataPrepWorkspaceAdapter", ] diff --git a/config/urls.py b/config/urls.py index 83ada9ff..139e04ec 100644 --- a/config/urls.py +++ b/config/urls.py @@ -26,6 +26,12 @@ path("dbgap/", include("primed.dbgap.urls", namespace="dbgap")), path("duo/", include("primed.duo.urls", namespace="duo")), path("cdsa/", include("primed.cdsa.urls", namespace="cdsa")), + path( + "collaborative_analysis/", + include( + "primed.collaborative_analysis.urls", namespace="collaborative_analysis" + ), + ), ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/primed/cdsa/tests/factories.py b/primed/cdsa/tests/factories.py index 5dc4d773..610b63b4 100644 --- a/primed/cdsa/tests/factories.py +++ b/primed/cdsa/tests/factories.py @@ -122,7 +122,9 @@ def authorization_domains(self, create, extracted, **kwargs): return # Create an authorization domain. - auth_domain = ManagedGroupFactory.create() + auth_domain = ManagedGroupFactory.create( + name="auth_{}".format(self.workspace.name) + ) self.workspace.authorization_domains.add(auth_domain) class Meta: diff --git a/primed/collaborative_analysis/__init__.py b/primed/collaborative_analysis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/primed/collaborative_analysis/adapters.py b/primed/collaborative_analysis/adapters.py new file mode 100644 index 00000000..0e439385 --- /dev/null +++ b/primed/collaborative_analysis/adapters.py @@ -0,0 +1,20 @@ +from anvil_consortium_manager.adapters.workspace import BaseWorkspaceAdapter +from anvil_consortium_manager.forms import WorkspaceForm + +from . import forms, models, tables + + +class CollaborativeAnalysisWorkspaceAdapter(BaseWorkspaceAdapter): + """Adapter for CollaborativeAnalysisWorkspace.""" + + type = "collab_analysis" + name = "Collaborative Analysis workspace" + description = "Workspaces used for collaborative analyses" + list_table_class_staff_view = tables.CollaborativeAnalysisWorkspaceStaffTable + list_table_class_view = tables.CollaborativeAnalysisWorkspaceUserTable + workspace_form_class = WorkspaceForm + workspace_data_model = models.CollaborativeAnalysisWorkspace + workspace_data_form_class = forms.CollaborativeAnalysisWorkspaceForm + workspace_detail_template_name = ( + "collaborative_analysis/collaborativeanalysisworkspace_detail.html" + ) diff --git a/primed/collaborative_analysis/admin.py b/primed/collaborative_analysis/admin.py new file mode 100644 index 00000000..96b11254 --- /dev/null +++ b/primed/collaborative_analysis/admin.py @@ -0,0 +1,21 @@ +from django.contrib import admin +from simple_history.admin import SimpleHistoryAdmin + +from . import models + + +@admin.register(models.CollaborativeAnalysisWorkspace) +class CollaborativeAnalysisWorkspaceAdmin(SimpleHistoryAdmin): + """Admin class for the `CollaborativeAnalysisWorkspace` model.""" + + list_display = ( + "id", + "workspace", + "custodian", + ) + list_filter = ("proposal_id",) + sortable_by = ( + "id", + "workspace", + "custodian", + ) diff --git a/primed/collaborative_analysis/apps.py b/primed/collaborative_analysis/apps.py new file mode 100644 index 00000000..d8b69a0e --- /dev/null +++ b/primed/collaborative_analysis/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class CollabAnalysisConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "primed.collaborative_analysis" diff --git a/primed/collaborative_analysis/audit.py b/primed/collaborative_analysis/audit.py new file mode 100644 index 00000000..34cc2a5b --- /dev/null +++ b/primed/collaborative_analysis/audit.py @@ -0,0 +1,336 @@ +from dataclasses import dataclass +from typing import Union + +import django_tables2 as tables +from anvil_consortium_manager.models import ( + Account, + GroupAccountMembership, + GroupGroupMembership, + ManagedGroup, +) +from django.urls import reverse +from django.utils.safestring import mark_safe + +from primed.primed_anvil.tables import BooleanIconColumn + +from . import models + + +@dataclass +class AccessAuditResult: + """Base class to hold the result of an access audit for a CollaborativeAnalysisWorkspace.""" + + collaborative_analysis_workspace: models.CollaborativeAnalysisWorkspace + member: Union[Account, ManagedGroup] + note: str + has_access: bool + + def get_action_url(self): + """The URL that handles the action needed.""" + return None + + def get_action(self): + """An indicator of what action needs to be taken.""" + return None + + def get_table_dictionary(self): + """Return a dictionary that can be used to populate an instance of `SignedAgreementAccessAuditTable`.""" + row = { + "workspace": self.collaborative_analysis_workspace, + "member": self.member, + "has_access": self.has_access, + "note": self.note, + "action": self.get_action(), + "action_url": self.get_action_url(), + } + return row + + +@dataclass +class VerifiedAccess(AccessAuditResult): + """Audit results class for when an account has verified access.""" + + has_access: bool = True + + +@dataclass +class VerifiedNoAccess(AccessAuditResult): + """Audit results class for when an account has verified no access.""" + + has_access: bool = False + + +@dataclass +class GrantAccess(AccessAuditResult): + """Audit results class for when an account should be granted access.""" + + has_access: bool = False + + def get_action(self): + return "Grant access" + + def get_action_url(self): + if isinstance(self.member, Account): + return reverse( + "anvil_consortium_manager:managed_groups:member_accounts:new_by_account", + args=[ + self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, + self.member.uuid, + ], + ) + else: + return reverse( + "anvil_consortium_manager:managed_groups:member_groups:new_by_child", + args=[ + self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, + self.member.name, + ], + ) + + +@dataclass +class RemoveAccess(AccessAuditResult): + """Audit results class for when access for an account should be removed.""" + + has_access: bool = True + + def get_action(self): + return "Remove access" + + def get_action_url(self): + if isinstance(self.member, Account): + return reverse( + "anvil_consortium_manager:managed_groups:member_accounts:delete", + args=[ + self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, + self.member.uuid, + ], + ) + else: + return reverse( + "anvil_consortium_manager:managed_groups:member_groups:delete", + args=[ + self.collaborative_analysis_workspace.workspace.authorization_domains.first().name, + self.member.name, + ], + ) + + +class AccessAuditResultsTable(tables.Table): + """A table to show results from a CollaborativeAnalysisWorkspaceAccessAudit instance.""" + + workspace = tables.Column(linkify=True) + member = tables.Column(linkify=True) + has_access = BooleanIconColumn(show_false_icon=True) + note = tables.Column() + action = tables.Column() + + class Meta: + attrs = {"class": "table align-middle"} + + def render_action(self, record, value): + return mark_safe( + """{}""".format( + record["action_url"], value + ) + ) + + +class CollaborativeAnalysisWorkspaceAccessAudit: + """Class to audit access to a CollaborativeAnalysisWorkspace.""" + + # Allowed reasons for access. + IN_SOURCE_AUTH_DOMAINS = "Account is in all source auth domains for this workspace." + DCC_ACCESS = "DCC groups are allowed access." + + # Allowed reasons for no access. + NOT_IN_SOURCE_AUTH_DOMAINS = ( + "Account is not in all source auth domains for this workspace." + ) + NOT_IN_ANALYST_GROUP = "Account is not in the analyst group for this workspace." + INACTIVE_ACCOUNT = "Account is inactive." + + # Errors. + UNEXPECTED_GROUP_ACCESS = "Unexpected group added to the auth domain." + + results_table_class = AccessAuditResultsTable + + def __init__(self, queryset=None): + """Initialize the audit. + + Args: + queryset: A queryset of CollaborativeAnalysisWorkspaces to audit. + """ + if queryset is None: + queryset = models.CollaborativeAnalysisWorkspace.objects.all() + self.queryset = queryset + self.verified = [] + self.needs_action = [] + self.errors = [] + self.completed = False + + def _audit_workspace(self, workspace): + """Audit access to a single CollaborativeAnalysisWorkspace.""" + # Loop over analyst accounts for this workspace. + # In the loop, run the _audit_workspace_and_account method. + # Any remainig accounts in the auth domain that are not in the analyst group should be *errors*. + analyst_group = workspace.analyst_group + analyst_memberships = GroupAccountMembership.objects.filter(group=analyst_group) + # Get a list of accounts in the auth domain. + auth_domain_membership = [ + x.account + for x in GroupAccountMembership.objects.filter( + group=workspace.workspace.authorization_domains.first() + ) + ] + for membership in analyst_memberships: + self._audit_workspace_and_account(workspace, membership.account) + try: + auth_domain_membership.remove(membership.account) + except ValueError: + # This is fine - this happens if the account is not in the auth domain. + pass + + # Loop over remaining accounts in the auth domain. + for account in auth_domain_membership: + self.errors.append( + RemoveAccess( + collaborative_analysis_workspace=workspace, + member=account, + note=self.NOT_IN_ANALYST_GROUP, + ) + ) + + # Check that no groups have access. + group_memberships = GroupGroupMembership.objects.filter( + parent_group=workspace.workspace.authorization_domains.first(), + ).exclude( + # Ignore cc admins group - it is handled differently because it should have admin privileges. + child_group__name="PRIMED_CC_ADMINS", + ) + # CC groups that should have access. + cc_groups = ManagedGroup.objects.filter( + name__in=[ + "PRIMED_CC_WRITERS", + # "PRIMED_CC_MEMBERS", # CC_MEMBERS should not get access. + ] + ) + for cc_group in cc_groups: + try: + group_memberships.get(child_group=cc_group) + except GroupGroupMembership.DoesNotExist: + self.needs_action.append( + GrantAccess( + collaborative_analysis_workspace=workspace, + member=cc_group, + note=self.DCC_ACCESS, + ) + ) + else: + group_memberships = group_memberships.exclude(child_group=cc_group) + self.verified.append( + VerifiedAccess( + collaborative_analysis_workspace=workspace, + member=cc_group, + note=self.DCC_ACCESS, + ) + ) + # Any other groups are an error. + for membership in group_memberships: + self.errors.append( + RemoveAccess( + collaborative_analysis_workspace=workspace, + member=membership.child_group, + note=self.UNEXPECTED_GROUP_ACCESS, + ) + ) + + def _audit_workspace_and_account(self, collaborative_analysis_workspace, account): + """Audit access for a specific CollaborativeAnalysisWorkspace and account.""" + # Cases to consider: + # - analyst is in all relevant source auth domains, and is in the workspace auth domain. + # - analyst is in some but not all relevant source auth domains, and is in the workspace auth domain.. + # - analyst is in none of the relevant source auth domains, and is in the workspace auth domain.. + # - analyst is in all relevant source auth domains, and is not in the workspace auth domain. + # - analyst is in some but not all relevant source auth domains, and is not in the workspace auth domain. + # - analyst is in none of the relevant source auth domains, and is not in the workspace auth domain. + # - an account is in the workspace auth domain, but is not in the analyst group. + + # Check whether access is allowed. Start by assuming yes; set to false if the account should not have access. + access_allowed = True + account_groups = account.get_all_groups() + # Loop over all source workspaces. + for ( + source_workspace + ) in collaborative_analysis_workspace.source_workspaces.all(): + # Loop over all auth domains for that source workspace. + for source_auth_domain in source_workspace.authorization_domains.all(): + # If the user is not in the auth domain, they are not allowed to have access to the collab workspace. + # If so, break out of the loop - it is not necessary to check membership of the remaining auth domains. + # Note that this only breaks out of the inner loop. + # It would be more efficient to break out of the outer loop as well. + if source_auth_domain not in account_groups: + access_allowed = False + break + # Check whether the account is in the auth domain of the collab workspace. + in_auth_domain = ( + collaborative_analysis_workspace.workspace.authorization_domains.first() + in account_groups + ) + # Determine the audit result. + print(access_allowed) + print(in_auth_domain) + if access_allowed and in_auth_domain: + self.verified.append( + VerifiedAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.IN_SOURCE_AUTH_DOMAINS, + ) + ) + elif access_allowed and not in_auth_domain: + self.needs_action.append( + GrantAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.IN_SOURCE_AUTH_DOMAINS, + ) + ) + elif not access_allowed and in_auth_domain: + self.needs_action.append( + RemoveAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + ) + else: + self.verified.append( + VerifiedNoAccess( + collaborative_analysis_workspace=collaborative_analysis_workspace, + member=account, + note=self.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + ) + + def run_audit(self): + """Run the audit on the set of workspaces.""" + for workspace in self.queryset: + self._audit_workspace(workspace) + self.completed = True + + def get_verified_table(self): + """Return a table of verified results.""" + return self.results_table_class( + [x.get_table_dictionary() for x in self.verified] + ) + + def get_needs_action_table(self): + """Return a table of results where action is needed.""" + return self.results_table_class( + [x.get_table_dictionary() for x in self.needs_action] + ) + + def get_errors_table(self): + """Return a table of audit errors.""" + return self.results_table_class([x.get_table_dictionary() for x in self.errors]) diff --git a/primed/collaborative_analysis/forms.py b/primed/collaborative_analysis/forms.py new file mode 100644 index 00000000..5356a155 --- /dev/null +++ b/primed/collaborative_analysis/forms.py @@ -0,0 +1,51 @@ +from anvil_consortium_manager.forms import Bootstrap5MediaFormMixin +from dal import autocomplete +from django import forms +from django.core.exceptions import ValidationError + +from . import models + + +class CollaborativeAnalysisWorkspaceForm(Bootstrap5MediaFormMixin, forms.ModelForm): + """Form for a dbGaPWorkspace object.""" + + class Meta: + model = models.CollaborativeAnalysisWorkspace + fields = ( + "custodian", + "purpose", + "proposal_id", + "source_workspaces", + "analyst_group", + "workspace", + ) + widgets = { + "source_workspaces": autocomplete.ModelSelect2Multiple( + url="anvil_consortium_manager:workspaces:autocomplete", + attrs={"data-theme": "bootstrap-5"}, + ), + "custodian": autocomplete.ModelSelect2( + url="users:autocomplete", + attrs={"data-theme": "bootstrap-5"}, + ), + "analyst_group": autocomplete.ModelSelect2( + url="anvil_consortium_manager:managed_groups:autocomplete", + attrs={"data-theme": "bootstrap-5"}, + ), + } + + def clean(self): + """Custom checks: + + - workspace and source_workspace are different. + """ + cleaned_data = super().clean() + + # Workspace is not also a source_workspace. + workspace = cleaned_data.get("workspace", None) + source_workspaces = cleaned_data.get("source_workspaces", []) + if workspace and source_workspaces: + if workspace in source_workspaces: + raise ValidationError("source_workspaces cannot include workspace.") + + return cleaned_data diff --git a/primed/collaborative_analysis/migrations/0001_initial.py b/primed/collaborative_analysis/migrations/0001_initial.py new file mode 100644 index 00000000..fde5f548 --- /dev/null +++ b/primed/collaborative_analysis/migrations/0001_initial.py @@ -0,0 +1,193 @@ +# Generated by Django 4.2.7 on 2023-12-07 00:17 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion +import django_extensions.db.fields +import simple_history.models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("anvil_consortium_manager", "0015_add_new_permissions"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="HistoricalCollaborativeAnalysisWorkspace", + fields=[ + ( + "id", + models.BigIntegerField( + auto_created=True, blank=True, db_index=True, verbose_name="ID" + ), + ), + ( + "created", + django_extensions.db.fields.CreationDateTimeField( + auto_now_add=True, verbose_name="created" + ), + ), + ( + "modified", + django_extensions.db.fields.ModificationDateTimeField( + auto_now=True, verbose_name="modified" + ), + ), + ( + "purpose", + models.TextField( + help_text="The intended purpose for this workspace." + ), + ), + ( + "proposal_id", + models.IntegerField( + blank=True, + help_text="The ID of the proposal that this workspace is associated with.", + null=True, + ), + ), + ("history_id", models.AutoField(primary_key=True, serialize=False)), + ("history_date", models.DateTimeField(db_index=True)), + ("history_change_reason", models.CharField(max_length=100, null=True)), + ( + "history_type", + models.CharField( + choices=[("+", "Created"), ("~", "Changed"), ("-", "Deleted")], + max_length=1, + ), + ), + ( + "analyst_group", + models.ForeignKey( + blank=True, + db_constraint=False, + help_text="The AnVIL group containing analysts for this workspace.", + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="+", + to="anvil_consortium_manager.managedgroup", + ), + ), + ( + "custodian", + models.ForeignKey( + blank=True, + db_constraint=False, + help_text="The custodian for this workspace.", + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="+", + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "history_user", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="+", + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "workspace", + models.ForeignKey( + blank=True, + db_constraint=False, + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="+", + to="anvil_consortium_manager.workspace", + ), + ), + ], + options={ + "verbose_name": "historical collaborative analysis workspace", + "verbose_name_plural": "historical collaborative analysis workspaces", + "ordering": ("-history_date", "-history_id"), + "get_latest_by": ("history_date", "history_id"), + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name="CollaborativeAnalysisWorkspace", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "created", + django_extensions.db.fields.CreationDateTimeField( + auto_now_add=True, verbose_name="created" + ), + ), + ( + "modified", + django_extensions.db.fields.ModificationDateTimeField( + auto_now=True, verbose_name="modified" + ), + ), + ( + "purpose", + models.TextField( + help_text="The intended purpose for this workspace." + ), + ), + ( + "proposal_id", + models.IntegerField( + blank=True, + help_text="The ID of the proposal that this workspace is associated with.", + null=True, + ), + ), + ( + "analyst_group", + models.ForeignKey( + help_text="The AnVIL group containing analysts for this workspace.", + on_delete=django.db.models.deletion.PROTECT, + to="anvil_consortium_manager.managedgroup", + ), + ), + ( + "custodian", + models.ForeignKey( + help_text="The custodian for this workspace.", + on_delete=django.db.models.deletion.PROTECT, + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "source_workspaces", + models.ManyToManyField( + help_text="Workspaces contributing data to this workspace.", + related_name="collaborative_analysis_workspaces", + to="anvil_consortium_manager.workspace", + ), + ), + ( + "workspace", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + to="anvil_consortium_manager.workspace", + ), + ), + ], + options={ + "get_latest_by": "modified", + "abstract": False, + }, + ), + ] diff --git a/primed/collaborative_analysis/migrations/__init__.py b/primed/collaborative_analysis/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/primed/collaborative_analysis/models.py b/primed/collaborative_analysis/models.py new file mode 100644 index 00000000..edcac496 --- /dev/null +++ b/primed/collaborative_analysis/models.py @@ -0,0 +1,35 @@ +from anvil_consortium_manager.models import BaseWorkspaceData, ManagedGroup, Workspace +from django.conf import settings +from django.db import models +from django_extensions.db.models import TimeStampedModel + + +# Note that "RequesterModel" is not included, because we have the "custodian" tracked instead. +class CollaborativeAnalysisWorkspace(TimeStampedModel, BaseWorkspaceData): + + purpose = models.TextField( + help_text="The intended purpose for this workspace.", + ) + proposal_id = models.IntegerField( + help_text="The ID of the proposal that this workspace is associated with.", + blank=True, + null=True, + ) + # Other options: + # manager, organizer, supervisor, overseer, controller, organizer, + # custodian, caretaker, warden, + custodian = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.PROTECT, + help_text="The custodian for this workspace.", + ) + source_workspaces = models.ManyToManyField( + Workspace, + related_name="collaborative_analysis_workspaces", + help_text="Workspaces contributing data to this workspace.", + ) + analyst_group = models.ForeignKey( + ManagedGroup, + on_delete=models.PROTECT, + help_text="The AnVIL group containing analysts for this workspace.", + ) diff --git a/primed/collaborative_analysis/tables.py b/primed/collaborative_analysis/tables.py new file mode 100644 index 00000000..a17c5cb1 --- /dev/null +++ b/primed/collaborative_analysis/tables.py @@ -0,0 +1,55 @@ +import django_tables2 as tables +from anvil_consortium_manager.models import Workspace + + +class CollaborativeAnalysisWorkspaceStaffTable(tables.Table): + """Class to render a table of Workspace objects with CollaborativeAnalysisWorkspace data.""" + + name = tables.columns.Column(linkify=True) + billing_project = tables.Column(linkify=True) + collaborativeanalysisworkspace__custodian = tables.Column(linkify=True) + number_source_workspaces = tables.columns.Column( + accessor="pk", + verbose_name="Number of source workspaces", + orderable=False, + ) + + class Meta: + model = Workspace + fields = ( + "name", + "billing_project", + "collaborativeanalysisworkspace__custodian", + "number_source_workspaces", + ) + order_by = ("name",) + + def render_number_source_workspaces(self, record): + """Render the number of source workspaces.""" + return record.collaborativeanalysisworkspace.source_workspaces.count() + + +class CollaborativeAnalysisWorkspaceUserTable(tables.Table): + """Class to render a table of Workspace objects with CollaborativeAnalysisWorkspace data.""" + + name = tables.columns.Column(linkify=True) + billing_project = tables.Column() + number_source_workspaces = tables.columns.Column( + accessor="pk", + verbose_name="Number of source workspaces", + orderable=False, + ) + + class Meta: + model = Workspace + fields = ( + "name", + "billing_project", + "collaborativeanalysisworkspace__custodian", + "number_source_workspaces", + ) + order_by = ("name",) + + def render_number_source_workspaces(self, record): + """Render the number of source workspaces.""" + return record.collaborativeanalysisworkspace.source_workspaces.count() diff --git a/primed/collaborative_analysis/tests/__init__.py b/primed/collaborative_analysis/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/primed/collaborative_analysis/tests/test_audit.py b/primed/collaborative_analysis/tests/test_audit.py new file mode 100644 index 00000000..62baf77f --- /dev/null +++ b/primed/collaborative_analysis/tests/test_audit.py @@ -0,0 +1,1129 @@ +from anvil_consortium_manager.tests.factories import ( + AccountFactory, + GroupAccountMembershipFactory, + GroupGroupMembershipFactory, + ManagedGroupFactory, + WorkspaceAuthorizationDomainFactory, + WorkspaceFactory, +) +from django.test import TestCase +from django.urls import reverse + +from primed.cdsa.tests.factories import CDSAWorkspaceFactory +from primed.dbgap.tests.factories import dbGaPWorkspaceFactory + +from .. import audit +from . import factories + + +class WorkspaceAccessAuditResultTest(TestCase): + def setUp(self): + super().setUp() + + def test_account_verified_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + instance = audit.VerifiedAccess( + collaborative_analysis_workspace=workspace, member=account, note="test" + ) + self.assertIsNone(instance.get_action_url()) + + def test_account_verified_no_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + instance = audit.VerifiedNoAccess( + collaborative_analysis_workspace=workspace, member=account, note="test" + ) + self.assertIsNone(instance.get_action_url()) + + def test_account_grant_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + instance = audit.GrantAccess( + collaborative_analysis_workspace=workspace, member=account, note="test" + ) + expected_url = reverse( + "anvil_consortium_manager:managed_groups:member_accounts:new_by_account", + args=[workspace.workspace.authorization_domains.first().name, account.uuid], + ) + self.assertEqual(instance.get_action_url(), expected_url) + + def test_account_remove_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + account = AccountFactory.create() + instance = audit.RemoveAccess( + collaborative_analysis_workspace=workspace, member=account, note="test" + ) + expected_url = reverse( + "anvil_consortium_manager:managed_groups:member_accounts:delete", + args=[workspace.workspace.authorization_domains.first().name, account.uuid], + ) + self.assertEqual(instance.get_action_url(), expected_url) + + def test_group_verified_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + instance = audit.VerifiedAccess( + collaborative_analysis_workspace=workspace, member=group, note="test" + ) + self.assertIsNone(instance.get_action_url()) + + def test_group_verified_no_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + instance = audit.VerifiedNoAccess( + collaborative_analysis_workspace=workspace, member=group, note="test" + ) + self.assertIsNone(instance.get_action_url()) + + def test_group_grant_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + instance = audit.GrantAccess( + collaborative_analysis_workspace=workspace, member=group, note="test" + ) + expected_url = reverse( + "anvil_consortium_manager:managed_groups:member_groups:new_by_child", + args=[workspace.workspace.authorization_domains.first().name, group.name], + ) + self.assertEqual(instance.get_action_url(), expected_url) + + def test_group_remove_access(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + group = ManagedGroupFactory.create() + instance = audit.RemoveAccess( + collaborative_analysis_workspace=workspace, member=group, note="test" + ) + expected_url = reverse( + "anvil_consortium_manager:managed_groups:member_groups:delete", + args=[workspace.workspace.authorization_domains.first().name, group.name], + ) + self.assertEqual(instance.get_action_url(), expected_url) + + +class CollaborativeAnalysisWorkspaceAccessAudit(TestCase): + """Tests for the CollaborativeAnalysisWorkspaceAccessAudit class.""" + + def test_completed(self): + """completed is updated properly.""" + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + self.assertFalse(collab_audit.completed) + collab_audit.run_audit() + self.assertTrue(collab_audit.completed) + + def test_no_workspaces(self): + """Audit works if there are no source workspaces.""" + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit.run_audit() + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + + def test_one_workspaces_no_analysts(self): + """Audit works if there are analysts workspaces in the analyst group for a workspace.""" + factories.CollaborativeAnalysisWorkspaceFactory.create() + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit.run_audit() + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + + def test_analyst_in_collab_auth_domain_in_source_auth_domain(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_in_collab_auth_domain_not_in_source_auth_domain(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), account=account + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_not_in_collab_auth_domain_in_source_auth_domain(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_not_in_collab_auth_domain_not_in_source_auth_domain(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), account=account + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedNoAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_in_collab_auth_domain_two_source_auth_domains_in_both(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + source_auth_domain_2 = WorkspaceAuthorizationDomainFactory.create( + workspace=source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.all()[0], + account=account, + ) + GroupAccountMembershipFactory.create( + group=source_auth_domain_2.group, + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_in_collab_auth_domain_two_source_auth_domains_in_one(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + WorkspaceAuthorizationDomainFactory.create(workspace=source_workspace.workspace) + # add an extra auth doamin + WorkspaceAuthorizationDomainFactory.create(workspace=source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # GroupAccountMembershipFactory.create( + # group=source_auth_domain_2.group, + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_in_collab_auth_domain_two_source_auth_domains_in_neither(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + WorkspaceAuthorizationDomainFactory.create(workspace=source_workspace.workspace) + # add an extra auth doamin + WorkspaceAuthorizationDomainFactory.create(workspace=source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # GroupAccountMembershipFactory.create( + # group=source_auth_domain_2.group, + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_not_in_collab_auth_domain_two_source_auth_domains_in_both(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + source_auth_domain_1 = source_workspace.workspace.authorization_domains.first() + source_auth_domain_2 = WorkspaceAuthorizationDomainFactory.create( + workspace=source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_auth_domain_1, + account=account, + ) + GroupAccountMembershipFactory.create( + group=source_auth_domain_2.group, + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_not_in_collab_auth_domain_two_source_auth_domains_in_one(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + source_auth_domain_2 = WorkspaceAuthorizationDomainFactory.create( + workspace=source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_auth_domain_1, + # account=account, + # ) + GroupAccountMembershipFactory.create( + group=source_auth_domain_2.group, + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedNoAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_analyst_not_in_collab_auth_domain_two_source_auth_domains_in_neither(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace.workspace) + WorkspaceAuthorizationDomainFactory.create(workspace=source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_auth_domain_1, + # account=account, + # ) + # GroupAccountMembershipFactory.create( + # group=source_auth_domain_2.group, + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedNoAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_in_collab_auth_domain_no_source_workspaces(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = WorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_not_in_collab_auth_domain_no_source_workspaces(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace = WorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_in_collab_auth_domain_two_source_workspaces_in_both_auth_domains(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace_1 = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_1.workspace) + source_workspace_2 = CDSAWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_2.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace_1.workspace.authorization_domains.first(), + account=account, + ) + GroupAccountMembershipFactory.create( + group=source_workspace_2.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_in_collab_auth_domain_two_source_workspaces_in_one_auth_domains(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace_1 = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_1.workspace) + source_workspace_2 = CDSAWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_2.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace_1.workspace.authorization_domains.first(), + account=account, + ) + # GroupAccountMembershipFactory.create( + # group=source_workspace_2.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_in_collab_auth_domain_two_source_workspaces_in_neither_auth_domains(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace_1 = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_1.workspace) + source_workspace_2 = CDSAWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_2.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace_1.workspace.authorization_domains.first(), + # account=account, + # ) + # GroupAccountMembershipFactory.create( + # group=source_workspace_2.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=account + ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_not_in_collab_auth_domain_two_source_workspaces_in_both_auth_domains(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace_1 = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_1.workspace) + source_workspace_2 = CDSAWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_2.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace_1.workspace.authorization_domains.first(), + account=account, + ) + GroupAccountMembershipFactory.create( + group=source_workspace_2.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_not_in_collab_auth_domain_two_source_workspaces_in_one_auth_domains(self): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace_1 = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_1.workspace) + source_workspace_2 = CDSAWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_2.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace_1.workspace.authorization_domains.first(), + account=account, + ) + # GroupAccountMembershipFactory.create( + # group=source_workspace_2.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedNoAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_not_in_collab_auth_domain_two_source_workspaces_in_neither_auth_domains( + self, + ): + # Create accounts. + account = AccountFactory.create() + # Set up workspace. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Set up source workspaces. + source_workspace_1 = dbGaPWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_1.workspace) + source_workspace_2 = CDSAWorkspaceFactory.create() + workspace.source_workspaces.add(source_workspace_2.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace_1.workspace.authorization_domains.first(), + # account=account, + # ) + # GroupAccountMembershipFactory.create( + # group=source_workspace_2.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=workspace.workspace.authorization_domains.first(), account=account + # ) + # Set up audit + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + # Run audit + collab_audit._audit_workspace_and_account(workspace, account) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedNoAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, account) + self.assertEqual(record.note, collab_audit.NOT_IN_SOURCE_AUTH_DOMAINS) + + def test_two_analysts(self): + # Create an analyst that needs access. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + analyst_1 = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=analyst_1 + ) + # Create an analyst that has access. + analyst_2 = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace.analyst_group, account=analyst_2 + ) + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=analyst_2 + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, analyst_2) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, analyst_1) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_not_in_analyst_group(self): + # Create an analyst that needs access. + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Create an analyst that has access but is not in the analyst group. + analyst = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace.workspace.authorization_domains.first(), account=analyst + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 1) + record = collab_audit.errors[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, analyst) + self.assertEqual(record.note, collab_audit.NOT_IN_ANALYST_GROUP) + + def test_unexpected_group_in_auth_domain(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Add a group to the auth domain. + group = ManagedGroupFactory.create() + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 1) + record = collab_audit.errors[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, group) + self.assertEqual(record.note, collab_audit.UNEXPECTED_GROUP_ACCESS) + + def test_ignores_primed_admins_group(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Add a group to the auth domain. + group = ManagedGroupFactory.create(name="PRIMED_CC_ADMINS") + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + + def test_error_for_primed_cc_members_group(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Add a group to the auth domain. + group = ManagedGroupFactory.create(name="PRIMED_CC_MEMBERS") + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 1) + record = collab_audit.errors[0] + self.assertIsInstance(record, audit.RemoveAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, group) + self.assertEqual(record.note, collab_audit.UNEXPECTED_GROUP_ACCESS) + + def test_no_access_for_primed_cc_members_group(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Add a group to the auth domain. + ManagedGroupFactory.create(name="PRIMED_CC_MEMBERS") + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + + def test_verified_access_for_primed_cc_writers_group(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Add a group to the auth domain. + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + GroupGroupMembershipFactory.create( + parent_group=workspace.workspace.authorization_domains.first(), + child_group=group, + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 0) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, group) + self.assertEqual(record.note, collab_audit.DCC_ACCESS) + + def test_grant_access_for_primed_cc_writers_group(self): + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Add a group to the auth domain. + group = ManagedGroupFactory.create(name="PRIMED_CC_WRITERS") + # GroupGroupMembershipFactory.create( + # parent_group=workspace.workspace.authorization_domains.first(), + # child_group=group, + # ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit._audit_workspace(workspace) + self.assertEqual(len(collab_audit.verified), 0) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace) + self.assertEqual(record.member, group) + self.assertEqual(record.note, collab_audit.DCC_ACCESS) + + def test_two_workspaces(self): + # Create a workspace with an analyst that needs access. + workspace_1 = factories.CollaborativeAnalysisWorkspaceFactory.create() + analyst_1 = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace_1.analyst_group, account=analyst_1 + ) + # Create a workspace with an analyst that has access. + workspace_2 = factories.CollaborativeAnalysisWorkspaceFactory.create() + analyst_2 = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace_2.analyst_group, account=analyst_2 + ) + GroupAccountMembershipFactory.create( + group=workspace_2.workspace.authorization_domains.first(), account=analyst_2 + ) + collab_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit() + collab_audit.run_audit() + self.assertEqual(len(collab_audit.verified), 1) + self.assertEqual(len(collab_audit.needs_action), 1) + self.assertEqual(len(collab_audit.errors), 0) + record = collab_audit.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace_2) + self.assertEqual(record.member, analyst_2) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + record = collab_audit.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace_1) + self.assertEqual(record.member, analyst_1) + self.assertEqual(record.note, collab_audit.IN_SOURCE_AUTH_DOMAINS) + + def test_queryset(self): + """Audit only runs on the specified queryset of workspaces.""" + # Create a workspace with an analyst that needs access. + workspace_1 = factories.CollaborativeAnalysisWorkspaceFactory.create() + analyst_1 = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace_1.analyst_group, account=analyst_1 + ) + # Create a workspace with an analyst that has access. + workspace_2 = factories.CollaborativeAnalysisWorkspaceFactory.create() + analyst_2 = AccountFactory.create() + GroupAccountMembershipFactory.create( + group=workspace_2.analyst_group, account=analyst_2 + ) + GroupAccountMembershipFactory.create( + group=workspace_2.workspace.authorization_domains.first(), account=analyst_2 + ) + collab_audit_1 = audit.CollaborativeAnalysisWorkspaceAccessAudit( + queryset=[workspace_1] + ) + collab_audit_1.run_audit() + self.assertEqual(len(collab_audit_1.verified), 0) + self.assertEqual(len(collab_audit_1.needs_action), 1) + self.assertEqual(len(collab_audit_1.errors), 0) + record = collab_audit_1.needs_action[0] + self.assertIsInstance(record, audit.GrantAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace_1) + self.assertEqual(record.member, analyst_1) + self.assertEqual(record.note, collab_audit_1.IN_SOURCE_AUTH_DOMAINS) + collab_audit_2 = audit.CollaborativeAnalysisWorkspaceAccessAudit( + queryset=[workspace_2] + ) + collab_audit_2.run_audit() + self.assertEqual(len(collab_audit_2.verified), 1) + self.assertEqual(len(collab_audit_2.needs_action), 0) + self.assertEqual(len(collab_audit_2.errors), 0) + record = collab_audit_2.verified[0] + self.assertIsInstance(record, audit.VerifiedAccess) + self.assertEqual(record.collaborative_analysis_workspace, workspace_2) + self.assertEqual(record.member, analyst_2) + self.assertEqual(record.note, collab_audit_2.IN_SOURCE_AUTH_DOMAINS) + + +class AccessAuditResultsTableTest(TestCase): + """Tests for the `AccessAuditResultsTable` table.""" + + def test_no_rows(self): + """Table works with no rows.""" + table = audit.AccessAuditResultsTable([]) + self.assertIsInstance(table, audit.AccessAuditResultsTable) + self.assertEqual(len(table.rows), 0) + + def test_one_row_account(self): + """Table works with one row with an account member.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + member_account = AccountFactory.create() + data = [ + { + "workspace": workspace, + "member": member_account, + "note": "a note", + "action": "", + "action_url": "", + } + ] + table = audit.AccessAuditResultsTable(data) + self.assertIsInstance(table, audit.AccessAuditResultsTable) + self.assertEqual(len(table.rows), 1) + + def test_one_row_group(self): + """Table works with one row with a group member.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + member_group = ManagedGroupFactory.create() + data = [ + { + "workspace": workspace, + "member": member_group, + "note": "a note", + "action": "", + "action_url": "", + } + ] + table = audit.AccessAuditResultsTable(data) + self.assertIsInstance(table, audit.AccessAuditResultsTable) + self.assertEqual(len(table.rows), 1) + + def test_two_rows(self): + """Table works with two rows.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + member_account = AccountFactory.create() + member_group = ManagedGroupFactory.create() + data = [ + { + "workspace": workspace, + "member": member_account, + "has_access": True, + "note": "a note", + "action": "", + "action_url": "", + }, + { + "workspace": workspace, + "member": member_group, + "has_access": False, + "note": "a note", + "action": "", + "action_url": "", + }, + ] + table = audit.AccessAuditResultsTable(data) + self.assertIsInstance(table, audit.AccessAuditResultsTable) + self.assertEqual(len(table.rows), 2) + + def test_render_action(self): + """Render action works as expected for grant access types.""" + workspace = factories.CollaborativeAnalysisWorkspaceFactory.create() + member_group = ManagedGroupFactory.create() + data = [ + { + "workspace": workspace, + "member": member_group, + "note": "a note", + "action": "Grant", + "action_url": "foo", + } + ] + + table = audit.AccessAuditResultsTable(data) + self.assertIsInstance(table, audit.AccessAuditResultsTable) + self.assertEqual(len(table.rows), 1) + self.assertIn("foo", table.rows[0].get_cell("action")) + self.assertIn("Grant", table.rows[0].get_cell("action")) diff --git a/primed/collaborative_analysis/tests/test_forms.py b/primed/collaborative_analysis/tests/test_forms.py new file mode 100644 index 00000000..ededf2fb --- /dev/null +++ b/primed/collaborative_analysis/tests/test_forms.py @@ -0,0 +1,175 @@ +from anvil_consortium_manager.tests.factories import ( + ManagedGroupFactory, + WorkspaceFactory, +) +from django.core.exceptions import NON_FIELD_ERRORS +from django.test import TestCase + +from primed.users.tests.factories import UserFactory + +from .. import forms + + +class CollaborativeAnalysisWorkspaceFormTest(TestCase): + """Tests for the CollaborativeAnalysisWorkspaceForm class.""" + + form_class = forms.CollaborativeAnalysisWorkspaceForm + + def setUp(self): + """Create a workspace for use in the form.""" + self.custodian = UserFactory.create() + self.workspace = WorkspaceFactory.create() + self.source_workspace = WorkspaceFactory.create() + self.analyst_group = ManagedGroupFactory.create() + + def test_valid(self): + """Form is valid with necessary input.""" + form_data = { + "purpose": "test", + "source_workspaces": [self.source_workspace], + "analyst_group": self.analyst_group, + "custodian": self.custodian, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertTrue(form.is_valid()) + + def test_valid_with_proposal_id(self): + form_data = { + "purpose": "test", + "proposal_id": 1, + "source_workspaces": [self.source_workspace], + "analyst_group": self.analyst_group, + "custodian": self.custodian, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertTrue(form.is_valid()) + + def test_invalid_with_proposal_id_character(self): + form_data = { + "purpose": "test", + "proposal_id": "a", + "analyst_group": self.analyst_group, + "source_workspaces": [self.source_workspace], + "custodian": self.custodian, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn("proposal_id", form.errors) + self.assertEqual(len(form.errors["proposal_id"]), 1) + self.assertIn("whole number", form.errors["proposal_id"][0]) + + def test_valid_two_source_workspaces(self): + """Form is valid with necessary input.""" + source_workspace_2 = WorkspaceFactory.create() + form_data = { + "purpose": "test", + "source_workspaces": [self.source_workspace, source_workspace_2], + "custodian": self.custodian, + "analyst_group": self.analyst_group, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertTrue(form.is_valid()) + + def test_invalid_missing_purpose(self): + """Form is invalid when missing source_workspaces.""" + form_data = { + # "purpose": "test", + "source_workspaces": [self.source_workspace], + "custodian": self.custodian, + "analyst_group": self.analyst_group, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn("purpose", form.errors) + self.assertEqual(len(form.errors["purpose"]), 1) + self.assertIn("required", form.errors["purpose"][0]) + + def test_invalid_missing_source_workspaces(self): + """Form is invalid when missing source_workspaces.""" + form_data = { + "purpose": "test", + # "source_workspaces": [self.source_workspace], + "custodian": self.custodian, + "analyst_group": self.analyst_group, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn("source_workspaces", form.errors) + self.assertEqual(len(form.errors["source_workspaces"]), 1) + self.assertIn("required", form.errors["source_workspaces"][0]) + + def test_invalid_same_workspace_and_source_workspace(self): + """Form is invalid when missing source_workspaces.""" + form_data = { + "purpose": "test", + "source_workspaces": [self.workspace], + "custodian": self.custodian, + "analyst_group": self.analyst_group, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn(NON_FIELD_ERRORS, form.errors) + self.assertEqual(len(form.errors[NON_FIELD_ERRORS]), 1) + self.assertIn("cannot include workspace", form.errors[NON_FIELD_ERRORS][0]) + + def test_invalid_custodian(self): + """Form is invalid when missing custodian.""" + form_data = { + "purpose": "test", + "source_workspaces": [self.source_workspace], + # "custodian": self.custodian, + "analyst_group": self.analyst_group, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn("custodian", form.errors) + self.assertEqual(len(form.errors["custodian"]), 1) + self.assertIn("required", form.errors["custodian"][0]) + + def test_invalid_analyst_group(self): + """Form is invalid when missing custodian.""" + form_data = { + "purpose": "test", + "source_workspaces": [self.source_workspace], + "custodian": self.custodian, + # "analyst_group": self.analyst_group, + "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn("analyst_group", form.errors) + self.assertEqual(len(form.errors["analyst_group"]), 1) + self.assertIn("required", form.errors["analyst_group"][0]) + + def test_invalid_workspace(self): + """Form is invalid when missing workspace.""" + form_data = { + "purpose": "test", + "source_workspaces": [self.source_workspace], + "custodian": self.custodian, + "analyst_group": self.analyst_group, + # "workspace": self.workspace, + } + form = self.form_class(data=form_data) + self.assertFalse(form.is_valid()) + self.assertEqual(len(form.errors), 1) + self.assertIn("workspace", form.errors) + self.assertEqual(len(form.errors["workspace"]), 1) + self.assertIn("required", form.errors["workspace"][0]) + + # def test_source_workspace_types(self): + # pass diff --git a/primed/collaborative_analysis/tests/test_models.py b/primed/collaborative_analysis/tests/test_models.py new file mode 100644 index 00000000..20031d89 --- /dev/null +++ b/primed/collaborative_analysis/tests/test_models.py @@ -0,0 +1,48 @@ +from anvil_consortium_manager.tests.factories import ( + ManagedGroupFactory, + WorkspaceFactory, +) +from django.test import TestCase + +from primed.users.tests.factories import UserFactory + +from .. import models +from . import factories + + +class CollaborativeAnalysisWorkspaceTest(TestCase): + """Tests for the DataPrepWorkspace model.""" + + def test_model_saving(self): + """Creation using the model constructor and .save() works.""" + workspace = WorkspaceFactory.create() + user = UserFactory.create() + group = ManagedGroupFactory.create() + instance = models.CollaborativeAnalysisWorkspace( + workspace=workspace, + custodian=user, + analyst_group=group, + ) + instance.save() + self.assertIsInstance(instance, models.CollaborativeAnalysisWorkspace) + + def test_str_method(self): + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.assertIsInstance(str(instance), str) + self.assertEqual(str(instance), str(instance.workspace)) + + def test_one_source_workspace(self): + source_workspace = WorkspaceFactory.create() + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + instance.source_workspaces.add(source_workspace) + self.assertEqual(len(instance.source_workspaces.all()), 1) + self.assertIn(source_workspace, instance.source_workspaces.all()) + + def test_two_source_workspaces(self): + source_workspace_1 = WorkspaceFactory.create() + source_workspace_2 = WorkspaceFactory.create() + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + instance.source_workspaces.add(source_workspace_1, source_workspace_2) + self.assertEqual(len(instance.source_workspaces.all()), 2) + self.assertIn(source_workspace_1, instance.source_workspaces.all()) + self.assertIn(source_workspace_2, instance.source_workspaces.all()) diff --git a/primed/collaborative_analysis/tests/test_tables.py b/primed/collaborative_analysis/tests/test_tables.py new file mode 100644 index 00000000..f8482f99 --- /dev/null +++ b/primed/collaborative_analysis/tests/test_tables.py @@ -0,0 +1,104 @@ +"""Tests for the tables in the `collaborative_analysis` app.""" + +from anvil_consortium_manager.models import Workspace +from anvil_consortium_manager.tests.factories import WorkspaceFactory +from django.test import TestCase + +from .. import tables +from . import factories + + +class CollaborativeAnalysisWorkspaceStaffTableTest(TestCase): + model = Workspace + model_factory = factories.CollaborativeAnalysisWorkspaceFactory + table_class = tables.CollaborativeAnalysisWorkspaceStaffTable + + def test_row_count_with_no_objects(self): + table = self.table_class(self.model.objects.all()) + self.assertEqual(len(table.rows), 0) + + def test_row_count_with_one_object(self): + self.model_factory.create() + table = self.table_class(self.model.objects.all()) + self.assertEqual(len(table.rows), 1) + + def test_row_count_with_two_objects(self): + self.model_factory.create_batch(2) + table = self.table_class(self.model.objects.all()) + self.assertEqual(len(table.rows), 2) + + def test_number_source_workspaces_zero(self): + """Table shows correct count for number of source_workspaces.""" + self.model_factory.create() + table = self.table_class(self.model.objects.all()) + self.assertEqual(table.rows[0].get_cell("number_source_workspaces"), 0) + + def test_number_source_workspaces_one(self): + """Table shows correct count for number of source_workspaces.""" + source_workspace = WorkspaceFactory.create() + obj = self.model_factory.create() + obj.source_workspaces.add(source_workspace) + table = self.table_class( + self.model.objects.filter(workspace_type="collab_analysis") + ) + self.assertEqual(table.rows[0].get_cell("number_source_workspaces"), 1) + + def test_number_source_workspaces_two(self): + """Table shows correct count for number of source_workspaces.""" + source_workspace_1 = WorkspaceFactory.create() + source_workspace_2 = WorkspaceFactory.create() + obj = self.model_factory.create() + obj.source_workspaces.add(source_workspace_1) + obj.source_workspaces.add(source_workspace_2) + table = self.table_class( + self.model.objects.filter(workspace_type="collab_analysis") + ) + self.assertEqual(table.rows[0].get_cell("number_source_workspaces"), 2) + + +class CollaborativeAnalysisWorkspaceUserTableTest(TestCase): + model = Workspace + model_factory = factories.CollaborativeAnalysisWorkspaceFactory + table_class = tables.CollaborativeAnalysisWorkspaceUserTable + + def test_row_count_with_no_objects(self): + table = self.table_class(self.model.objects.all()) + self.assertEqual(len(table.rows), 0) + + def test_row_count_with_one_object(self): + self.model_factory.create() + table = self.table_class(self.model.objects.all()) + self.assertEqual(len(table.rows), 1) + + def test_row_count_with_two_objects(self): + self.model_factory.create_batch(2) + table = self.table_class(self.model.objects.all()) + self.assertEqual(len(table.rows), 2) + + def test_number_source_workspaces_zero(self): + """Table shows correct count for number of source_workspaces.""" + self.model_factory.create() + table = self.table_class(self.model.objects.all()) + self.assertEqual(table.rows[0].get_cell("number_source_workspaces"), 0) + + def test_number_source_workspaces_one(self): + """Table shows correct count for number of source_workspaces.""" + source_workspace = WorkspaceFactory.create() + obj = self.model_factory.create() + obj.source_workspaces.add(source_workspace) + table = self.table_class( + self.model.objects.filter(workspace_type="collab_analysis") + ) + self.assertEqual(table.rows[0].get_cell("number_source_workspaces"), 1) + + def test_number_source_workspaces_two(self): + """Table shows correct count for number of source_workspaces.""" + source_workspace_1 = WorkspaceFactory.create() + source_workspace_2 = WorkspaceFactory.create() + obj = self.model_factory.create() + obj.source_workspaces.add(source_workspace_1) + obj.source_workspaces.add(source_workspace_2) + table = self.table_class( + self.model.objects.filter(workspace_type="collab_analysis") + ) + self.assertEqual(table.rows[0].get_cell("number_source_workspaces"), 2) diff --git a/primed/collaborative_analysis/tests/test_views.py b/primed/collaborative_analysis/tests/test_views.py new file mode 100644 index 00000000..d5f67a8d --- /dev/null +++ b/primed/collaborative_analysis/tests/test_views.py @@ -0,0 +1,1025 @@ +"""Tests for views related to the `collaborative_analysis` app.""" + +import responses +from anvil_consortium_manager.models import AnVILProjectManagerAccess, Workspace +from anvil_consortium_manager.tests.factories import ( + AccountFactory, + BillingProjectFactory, + GroupAccountMembershipFactory, + GroupGroupMembershipFactory, + ManagedGroupFactory, +) +from anvil_consortium_manager.tests.utils import AnVILAPIMockTestMixin +from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.auth.models import Permission +from django.core.exceptions import PermissionDenied +from django.http import Http404 +from django.shortcuts import resolve_url +from django.test import RequestFactory, TestCase +from django.urls import reverse + +from primed.cdsa.tests.factories import CDSAWorkspaceFactory +from primed.dbgap.tests.factories import dbGaPWorkspaceFactory +from primed.miscellaneous_workspaces.tests.factories import OpenAccessWorkspaceFactory +from primed.users.tests.factories import UserFactory + +from .. import audit, models, views +from . import factories + +User = get_user_model() + + +class NavbarTest(TestCase): + """Tests for the navbar involving Collaborative Analysis links.""" + + def setUp(self): + """Set up test class.""" + self.factory = RequestFactory() + # Create a user with both view and edit permission. + + def get_url(self, *args): + """Get the url for the view being tested.""" + # Use the workspace landing page, since view users can see it. + return reverse("anvil_consortium_manager:workspaces:landing_page", args=args) + + def test_links_for_staff_view(self): + """Returns successful response code.""" + user = User.objects.create_user(username="test", password="test") + user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + self.client.force_login(user) + response = self.client.get(self.get_url()) + self.assertContains( + response, reverse("collaborative_analysis:workspaces:audit_all") + ) + + def test_links_for_view(self): + """Returns successful response code.""" + user = User.objects.create_user(username="test", password="test") + user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.VIEW_PERMISSION_CODENAME + ) + ) + self.client.force_login(user) + response = self.client.get(self.get_url()) + self.assertNotContains( + response, reverse("collaborative_analysis:workspaces:audit_all") + ) + + +class CollaborativeAnalysisWorkspaceDetailTest(TestCase): + """Tests of the WorkspaceDetail view from ACM with this app's CollaborativeAnalysisWorkspace model.""" + + def setUp(self): + """Set up test class.""" + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.VIEW_PERMISSION_CODENAME + ) + ) + + def test_status_code_with_user_permission(self): + """Returns successful response code.""" + obj = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(obj.workspace.get_absolute_url()) + self.assertEqual(response.status_code, 200) + + def test_links_to_source_workspace(self): + """Links to the source workspace appear on the detail page.""" + # TODO: Move this to a table in the context data when ACM allows. + obj = factories.CollaborativeAnalysisWorkspaceFactory.create() + dbgap_workspace = dbGaPWorkspaceFactory.create() + cdsa_workspace = CDSAWorkspaceFactory.create() + open_access_workspace = OpenAccessWorkspaceFactory.create() + obj.source_workspaces.add( + dbgap_workspace.workspace, + cdsa_workspace.workspace, + open_access_workspace.workspace, + ) + self.client.force_login(self.user) + response = self.client.get(obj.workspace.get_absolute_url()) + self.assertIn(dbgap_workspace.get_absolute_url(), response.content.decode()) + self.assertIn(cdsa_workspace.get_absolute_url(), response.content.decode()) + self.assertIn( + open_access_workspace.get_absolute_url(), response.content.decode() + ) + + def test_link_to_custodian(self): + """Links to the custodian's user profile appear on the detail page.""" + custodian = UserFactory.create() + obj = factories.CollaborativeAnalysisWorkspaceFactory.create( + custodian=custodian + ) + self.client.force_login(self.user) + response = self.client.get(obj.workspace.get_absolute_url()) + self.assertIn(custodian.get_absolute_url(), response.content.decode()) + + def test_link_to_analyst_group_staff_view(self): + """Links to the analyst group's detail page appear on the detail page for staff_viewers.""" + user = User.objects.create_user( + username="test-staff-view", password="test-staff-view" + ) + user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + obj = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(user) + response = self.client.get(obj.workspace.get_absolute_url()) + self.assertIn(obj.analyst_group.get_absolute_url(), response.content.decode()) + self.assertIn(obj.analyst_group.name, response.content.decode()) + + def test_link_to_analyst_group_view(self): + """Links to the analyst group's detail page do not appear on the detail page for viewers.""" + obj = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(obj.workspace.get_absolute_url()) + self.assertNotIn( + obj.analyst_group.get_absolute_url(), response.content.decode() + ) + self.assertNotIn(obj.analyst_group.name, response.content.decode()) + + def test_link_to_audit_staff_view(self): + """Links to the audit view page do appear on the detail page for staff viewers.""" + user = User.objects.create_user( + username="test-staff-view", password="test-staff-view" + ) + user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + obj = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(user) + response = self.client.get(obj.workspace.get_absolute_url()) + url = reverse( + "collaborative_analysis:workspaces:audit", + args=[obj.workspace.billing_project.name, obj.workspace.name], + ) + self.assertIn(url, response.content.decode()) + + def test_link_to_audit_view(self): + """Links to the audit view page do not appear on the detail page for viewers.""" + obj = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(obj.workspace.get_absolute_url()) + self.assertNotIn( + reverse( + "collaborative_analysis:workspaces:audit", + args=[obj.workspace.billing_project.name, obj.workspace.name], + ), + response.content.decode(), + ) + + +class CollaborativeAnalysisWorkspaceCreateTest(AnVILAPIMockTestMixin, TestCase): + """Tests of the WorkspaceCreate view from ACM with this app's CollaborativeAnalysisWorkspace model.""" + + api_success_code = 201 + + def setUp(self): + """Set up test class.""" + # The superclass uses the responses package to mock API responses. + super().setUp() + # Create a user with both view and edit permissions. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_EDIT_PERMISSION_CODENAME + ) + ) + self.workspace_type = "collab_analysis" + self.custodian = UserFactory.create() + self.source_workspace = dbGaPWorkspaceFactory.create().workspace + self.analyst_group = ManagedGroupFactory.create() + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse("anvil_consortium_manager:workspaces:new", args=args) + + def test_creates_workspace(self): + """Posting valid data to the form creates a workspace data object when using a custom adapter.""" + billing_project = BillingProjectFactory.create(name="test-billing-project") + url = self.api_client.rawls_entry_point + "/api/workspaces" + json_data = { + "namespace": "test-billing-project", + "name": "test-workspace", + "attributes": {}, + } + self.anvil_response_mock.add( + responses.POST, + url, + status=self.api_success_code, + match=[responses.matchers.json_params_matcher(json_data)], + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url(self.workspace_type), + { + "billing_project": billing_project.pk, + "name": "test-workspace", + # Workspace data form. + "workspacedata-TOTAL_FORMS": 1, + "workspacedata-INITIAL_FORMS": 0, + "workspacedata-MIN_NUM_FORMS": 1, + "workspacedata-MAX_NUM_FORMS": 1, + "workspacedata-0-custodian": self.custodian.pk, + "workspacedata-0-source_workspaces": [self.source_workspace.pk], + "workspacedata-0-purpose": "test", + "workspacedata-0-analyst_group": self.analyst_group.pk, + }, + ) + self.assertEqual(response.status_code, 302) + # The workspace is created. + new_workspace = Workspace.objects.latest("pk") + # Workspace data is added. + self.assertEqual(models.CollaborativeAnalysisWorkspace.objects.count(), 1) + new_workspace_data = models.CollaborativeAnalysisWorkspace.objects.latest("pk") + self.assertEqual(new_workspace_data.workspace, new_workspace) + + +class CollaborativeAnalysisWorkspaceImportTest(AnVILAPIMockTestMixin, TestCase): + """Tests of the WorkspaceImport view from ACM with this app's CollaborativeAnalysisWorkspace model.""" + + api_success_code = 200 + + def setUp(self): + """Set up test class.""" + # The superclass uses the responses package to mock API responses. + super().setUp() + # Create a user with both view and edit permissions. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_EDIT_PERMISSION_CODENAME + ) + ) + self.custodian = UserFactory.create() + self.source_workspace = dbGaPWorkspaceFactory.create().workspace + self.workspace_type = "collab_analysis" + self.analyst_group = ManagedGroupFactory.create() + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse("anvil_consortium_manager:workspaces:import", args=args) + + def get_api_url(self, billing_project_name, workspace_name): + """Return the Terra API url for a given billing project and workspace.""" + return ( + self.api_client.rawls_entry_point + + "/api/workspaces/" + + billing_project_name + + "/" + + workspace_name + ) + + def get_api_json_response( + self, billing_project, workspace, authorization_domains=[], access="OWNER" + ): + """Return a pared down version of the json response from the AnVIL API with only fields we need.""" + json_data = { + "accessLevel": access, + "owners": [], + "workspace": { + "authorizationDomain": [ + {"membersGroupName": x} for x in authorization_domains + ], + "name": workspace, + "namespace": billing_project, + "isLocked": False, + }, + } + return json_data + + def test_creates_workspace(self): + """Posting valid data to the form creates an UploadWorkspace object.""" + billing_project = BillingProjectFactory.create(name="billing-project") + workspace_name = "workspace" + # Available workspaces API call. + workspace_list_url = self.api_client.rawls_entry_point + "/api/workspaces" + self.anvil_response_mock.add( + responses.GET, + workspace_list_url, + match=[ + responses.matchers.query_param_matcher( + {"fields": "workspace.namespace,workspace.name,accessLevel"} + ) + ], + status=200, + json=[self.get_api_json_response(billing_project.name, workspace_name)], + ) + url = self.get_api_url(billing_project.name, workspace_name) + self.anvil_response_mock.add( + responses.GET, + url, + status=self.api_success_code, + json=self.get_api_json_response(billing_project.name, workspace_name), + ) + # ACL API call. + api_url_acl = ( + self.api_client.rawls_entry_point + + "/api/workspaces/" + + billing_project.name + + "/" + + workspace_name + + "/acl" + ) + self.anvil_response_mock.add( + responses.GET, + api_url_acl, + status=200, + json={ + "acl": { + self.service_account_email: { + "accessLevel": "OWNER", + "canCompute": True, + "canShare": True, + "pending": False, + } + } + }, + ) + self.client.force_login(self.user) + response = self.client.post( + self.get_url(self.workspace_type), + { + "workspace": billing_project.name + "/" + workspace_name, + # Workspace data form. + "workspacedata-TOTAL_FORMS": 1, + "workspacedata-INITIAL_FORMS": 0, + "workspacedata-MIN_NUM_FORMS": 1, + "workspacedata-MAX_NUM_FORMS": 1, + "workspacedata-0-custodian": self.custodian.pk, + "workspacedata-0-source_workspaces": [self.source_workspace.pk], + "workspacedata-0-purpose": "test", + "workspacedata-0-analyst_group": self.analyst_group.pk, + }, + ) + self.assertEqual(response.status_code, 302) + # The workspace is created. + new_workspace = Workspace.objects.latest("pk") + # Workspace data is added. + self.assertEqual(models.CollaborativeAnalysisWorkspace.objects.count(), 1) + new_workspace_data = models.CollaborativeAnalysisWorkspace.objects.latest("pk") + self.assertEqual(new_workspace_data.workspace, new_workspace) + + +class WorkspaceAuditTest(TestCase): + """Tests for the CollaborativeAnalysisWorkspaceAuditTest view.""" + + def setUp(self): + """Set up test class.""" + self.factory = RequestFactory() + # Create a user with both view and edit permission. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + self.collaborative_analysis_workspace = ( + factories.CollaborativeAnalysisWorkspaceFactory.create() + ) + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse( + "collaborative_analysis:workspaces:audit", + args=args, + ) + + def get_view(self): + """Return the view being tested.""" + return views.WorkspaceAudit.as_view() + + def test_view_redirect_not_logged_in(self): + "View redirects to login view when user is not logged in." + # Need a client for redirects. + response = self.client.get(self.get_url("foo", "bar")) + self.assertRedirects( + response, + resolve_url(settings.LOGIN_URL) + "?next=" + self.get_url("foo", "bar"), + ) + + def test_status_code_with_user_permission_view(self): + """Returns successful response code if the user has view permission.""" + request = self.factory.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + request.user = self.user + response = self.get_view()( + request, + billing_project_slug=self.collaborative_analysis_workspace.workspace.billing_project.name, + workspace_slug=self.collaborative_analysis_workspace.workspace.name, + ) + self.assertEqual(response.status_code, 200) + + def test_access_without_user_permission(self): + """Raises permission denied if user has no permissions.""" + user_no_perms = User.objects.create_user( + username="test-none", password="test-none" + ) + request = self.factory.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + request.user = user_no_perms + with self.assertRaises(PermissionDenied): + self.get_view()( + request, + billing_project_slug=self.collaborative_analysis_workspace.workspace.billing_project.name, + workspace_slug=self.collaborative_analysis_workspace.workspace.name, + ) + + def test_invalid_billing_project_name(self): + """Raises a 404 error with an invalid object dbgap_application_pk.""" + request = self.factory.get( + self.get_url("foo", self.collaborative_analysis_workspace.workspace.name) + ) + request.user = self.user + with self.assertRaises(Http404): + self.get_view()( + request, + billing_project_slug="foo", + workspace_slug=self.collaborative_analysis_workspace.workspace.name, + ) + + def test_invalid_workspace_name(self): + """Raises a 404 error with an invalid object dbgap_application_pk.""" + request = self.factory.get( + self.get_url(self.collaborative_analysis_workspace.workspace.name, "foo") + ) + request.user = self.user + with self.assertRaises(Http404): + self.get_view()( + request, + billing_project_slug=self.collaborative_analysis_workspace.workspace.billing_project.name, + workspace_slug="foo", + ) + + def test_context_data_access_audit(self): + """The data_access_audit exists in the context.""" + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + self.assertIn("data_access_audit", response.context_data) + self.assertIsInstance( + response.context_data["data_access_audit"], + audit.CollaborativeAnalysisWorkspaceAccessAudit, + ) + self.assertTrue(response.context_data["data_access_audit"].completed) + qs = response.context_data["data_access_audit"].queryset + self.assertEqual(len(qs), 1) + self.assertIn(self.collaborative_analysis_workspace, qs) + + def test_context_verified_table_access(self): + """verified_table shows a record when audit has one account with verified access.""" + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + self.collaborative_analysis_workspace.source_workspaces.add( + source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=self.collaborative_analysis_workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), + account=account, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + self.assertIn("verified_table", response.context_data) + table = response.context_data["verified_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + self.collaborative_analysis_workspace, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNone(table.rows[0].get_cell_value("action")) + + def test_context_verified_table_no_access(self): + """verified_table shows a record when audit has one account with verified no access.""" + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + self.collaborative_analysis_workspace.source_workspaces.add( + source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=self.collaborative_analysis_workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), account=account + # ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + self.assertIn("verified_table", response.context_data) + table = response.context_data["verified_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + self.collaborative_analysis_workspace, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNone(table.rows[0].get_cell_value("action")) + + def test_context_needs_action_table_grant(self): + """needs_action_table shows a record when audit finds that access needs to be granted.""" + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + self.collaborative_analysis_workspace.source_workspaces.add( + source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=self.collaborative_analysis_workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), account=account + # ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + self.assertIn("needs_action_table", response.context_data) + table = response.context_data["needs_action_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + self.collaborative_analysis_workspace, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + def test_context_needs_action_table_remoe(self): + """needs_action_table shows a record when audit finds that access needs to be removed.""" + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + self.collaborative_analysis_workspace.source_workspaces.add( + source_workspace.workspace + ) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=self.collaborative_analysis_workspace.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), + account=account, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + self.assertIn("needs_action_table", response.context_data) + table = response.context_data["needs_action_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + self.collaborative_analysis_workspace, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + def test_context_error_table_group_in_auth_domain(self): + """error shows a record when audit finds a group in the auth domain.""" + # Create accounts. + group = ManagedGroupFactory.create() + GroupGroupMembershipFactory.create( + parent_group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), + child_group=group, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get( + self.get_url( + self.collaborative_analysis_workspace.workspace.billing_project.name, + self.collaborative_analysis_workspace.workspace.name, + ) + ) + self.assertIn("errors_table", response.context_data) + table = response.context_data["errors_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + self.collaborative_analysis_workspace, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), group) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.UNEXPECTED_GROUP_ACCESS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + +class WorkspaceAuditAllTest(TestCase): + """Tests for the CollaborativeAnalysisWorkspaceAuditAllTest view.""" + + def setUp(self): + """Set up test class.""" + self.factory = RequestFactory() + # Create a user with both view and edit permission. + self.user = User.objects.create_user(username="test", password="test") + self.user.user_permissions.add( + Permission.objects.get( + codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME + ) + ) + + def get_url(self, *args): + """Get the url for the view being tested.""" + return reverse( + "collaborative_analysis:workspaces:audit_all", + args=args, + ) + + def get_view(self): + """Return the view being tested.""" + return views.WorkspaceAuditAll.as_view() + + def test_view_redirect_not_logged_in(self): + "View redirects to login view when user is not logged in." + # Need a client for redirects. + response = self.client.get(self.get_url()) + self.assertRedirects( + response, + resolve_url(settings.LOGIN_URL) + "?next=" + self.get_url(), + ) + + def test_status_code_with_user_permission_view(self): + """Returns successful response code if the user has view permission.""" + request = self.factory.get(self.get_url()) + request.user = self.user + response = self.get_view()(request) + self.assertEqual(response.status_code, 200) + + def test_access_without_user_permission(self): + """Raises permission denied if user has no permissions.""" + user_no_perms = User.objects.create_user( + username="test-none", password="test-none" + ) + request = self.factory.get(self.get_url()) + request.user = user_no_perms + with self.assertRaises(PermissionDenied): + self.get_view()(request) + + def test_context_data_access_audit_no_workspaces(self): + """The data_access_audit exists in the context.""" + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("data_access_audit", response.context_data) + self.assertIsInstance( + response.context_data["data_access_audit"], + audit.CollaborativeAnalysisWorkspaceAccessAudit, + ) + self.assertTrue(response.context_data["data_access_audit"].completed) + qs = response.context_data["data_access_audit"].queryset + self.assertEqual(len(qs), 0) + + def test_context_data_access_audit_one_workspace(self): + """The data_access_audit exists in the context.""" + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("data_access_audit", response.context_data) + self.assertIsInstance( + response.context_data["data_access_audit"], + audit.CollaborativeAnalysisWorkspaceAccessAudit, + ) + self.assertTrue(response.context_data["data_access_audit"].completed) + qs = response.context_data["data_access_audit"].queryset + self.assertEqual(len(qs), 1) + self.assertIn(instance, qs) + + def test_context_data_access_audit_two_workspaces(self): + """The data_access_audit exists in the context.""" + instance_1 = factories.CollaborativeAnalysisWorkspaceFactory.create() + instance_2 = factories.CollaborativeAnalysisWorkspaceFactory.create() + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("data_access_audit", response.context_data) + self.assertIsInstance( + response.context_data["data_access_audit"], + audit.CollaborativeAnalysisWorkspaceAccessAudit, + ) + self.assertTrue(response.context_data["data_access_audit"].completed) + qs = response.context_data["data_access_audit"].queryset + self.assertEqual(len(qs), 2) + self.assertIn(instance_1, qs) + self.assertIn(instance_2, qs) + + def test_context_verified_table_access(self): + """verified_table shows a record when audit has one account with verified access.""" + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + instance.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=instance.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=instance.workspace.authorization_domains.first(), + account=account, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("verified_table", response.context_data) + table = response.context_data["verified_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + instance, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNone(table.rows[0].get_cell_value("action")) + + def test_context_verified_table_no_access(self): + """verified_table shows a record when audit has one account with verified no access.""" + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + instance.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=instance.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=instance.workspace.authorization_domains.first(), account=account + # ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("verified_table", response.context_data) + table = response.context_data["verified_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + instance, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNone(table.rows[0].get_cell_value("action")) + + def test_context_needs_action_table_grant(self): + """needs_action_table shows a record when audit finds that access needs to be granted.""" + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + instance.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=instance.analyst_group, account=account + ) + # Source workspace auth domains membership. + GroupAccountMembershipFactory.create( + group=source_workspace.workspace.authorization_domains.first(), + account=account, + ) + # CollaborativeAnalysisWorkspace auth domain membership. + # GroupAccountMembershipFactory.create( + # group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), account=account + # ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("needs_action_table", response.context_data) + table = response.context_data["needs_action_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + instance, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + def test_context_needs_action_table_remove(self): + """needs_action_table shows a record when audit finds that access needs to be removed.""" + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Create accounts. + account = AccountFactory.create() + # Set up source workspaces. + source_workspace = dbGaPWorkspaceFactory.create() + instance.source_workspaces.add(source_workspace.workspace) + # Analyst group membership. + GroupAccountMembershipFactory.create( + group=instance.analyst_group, account=account + ) + # Source workspace auth domains membership. + # GroupAccountMembershipFactory.create( + # group=source_workspace.workspace.authorization_domains.first(), + # account=account, + # ) + # CollaborativeAnalysisWorkspace auth domain membership. + GroupAccountMembershipFactory.create( + group=instance.workspace.authorization_domains.first(), + account=account, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("needs_action_table", response.context_data) + table = response.context_data["needs_action_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + instance, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), account) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_SOURCE_AUTH_DOMAINS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) + + def test_context_error_table_group_in_auth_domain(self): + """error shows a record when audit finds a group in the auth domain.""" + instance = factories.CollaborativeAnalysisWorkspaceFactory.create() + # Create accounts. + group = ManagedGroupFactory.create() + GroupGroupMembershipFactory.create( + parent_group=instance.workspace.authorization_domains.first(), + child_group=group, + ) + # Check the table in the context. + self.client.force_login(self.user) + response = self.client.get(self.get_url()) + self.assertIn("errors_table", response.context_data) + table = response.context_data["errors_table"] + self.assertIsInstance( + table, + audit.AccessAuditResultsTable, + ) + self.assertEqual(len(table.rows), 1) + self.assertEqual( + table.rows[0].get_cell_value("workspace"), + instance, + ) + self.assertEqual(table.rows[0].get_cell_value("member"), group) + self.assertEqual( + table.rows[0].get_cell_value("note"), + audit.CollaborativeAnalysisWorkspaceAccessAudit.UNEXPECTED_GROUP_ACCESS, + ) + self.assertIsNotNone(table.rows[0].get_cell_value("action")) diff --git a/primed/collaborative_analysis/urls.py b/primed/collaborative_analysis/urls.py new file mode 100644 index 00000000..e53dcef9 --- /dev/null +++ b/primed/collaborative_analysis/urls.py @@ -0,0 +1,26 @@ +from django.urls import include, path + +from . import views + +app_name = "collaborative_analysis" + + +collaborative_analysis_workspace_patterns = ( + [ + path( + "audit/", + views.WorkspaceAuditAll.as_view(), + name="audit_all", + ), + path( + "//audit/", + views.WorkspaceAudit.as_view(), + name="audit", + ), + ], + "workspaces", +) + +urlpatterns = [ + path("workspaces/", include(collaborative_analysis_workspace_patterns)), +] diff --git a/primed/collaborative_analysis/views.py b/primed/collaborative_analysis/views.py new file mode 100644 index 00000000..a678e81b --- /dev/null +++ b/primed/collaborative_analysis/views.py @@ -0,0 +1,69 @@ +from anvil_consortium_manager.auth import AnVILConsortiumManagerStaffViewRequired +from django.http import Http404 +from django.utils.translation import gettext_lazy as _ +from django.views.generic import DetailView, TemplateView + +from . import audit, models + + +# Create your views here. +class WorkspaceAudit(AnVILConsortiumManagerStaffViewRequired, DetailView): + """View to show audit results for a `CollaborativeAnalysisWorkspace`.""" + + model = models.CollaborativeAnalysisWorkspace + template_name = "collaborative_analysis/collaborativeanalysisworkspace_audit.html" + + def get_object(self, queryset=None): + """Return the object the view is displaying.""" + if queryset is None: + queryset = self.get_queryset() + # Filter the queryset based on kwargs. + billing_project_slug = self.kwargs.get("billing_project_slug", None) + workspace_slug = self.kwargs.get("workspace_slug", None) + queryset = queryset.filter( + workspace__billing_project__name=billing_project_slug, + workspace__name=workspace_slug, + ) + try: + # Get the single item from the filtered queryset + obj = queryset.get() + except queryset.model.DoesNotExist: + raise Http404( + _("No %(verbose_name)s found matching the query") + % {"verbose_name": queryset.model._meta.verbose_name} + ) + return obj + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + # Run the audit + data_access_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit( + queryset=[self.object] + ) + data_access_audit.run_audit() + context["verified_table"] = data_access_audit.get_verified_table() + context["errors_table"] = data_access_audit.get_errors_table() + context["needs_action_table"] = data_access_audit.get_needs_action_table() + context["data_access_audit"] = data_access_audit + return context + + +class WorkspaceAuditAll(AnVILConsortiumManagerStaffViewRequired, TemplateView): + """View to show audit results for all `CollaborativeAnalysisWorkspace` objects.""" + + template_name = ( + "collaborative_analysis/collaborativeanalysisworkspace_audit_all.html" + ) + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + # Run the audit + data_access_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit( + queryset=models.CollaborativeAnalysisWorkspace.objects.all() + ) + data_access_audit.run_audit() + context["verified_table"] = data_access_audit.get_verified_table() + context["errors_table"] = data_access_audit.get_errors_table() + context["needs_action_table"] = data_access_audit.get_needs_action_table() + context["data_access_audit"] = data_access_audit + return context diff --git a/primed/templates/anvil_consortium_manager/navbar.html b/primed/templates/anvil_consortium_manager/navbar.html index 6dc96662..5b880f57 100644 --- a/primed/templates/anvil_consortium_manager/navbar.html +++ b/primed/templates/anvil_consortium_manager/navbar.html @@ -45,6 +45,7 @@ {% include 'cdsa/nav_items.html' %} + {% include 'collaborative_analysis/nav_items.html' %} {{ block.super }} diff --git a/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_audit.html b/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_audit.html new file mode 100644 index 00000000..8c64ca86 --- /dev/null +++ b/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_audit.html @@ -0,0 +1,53 @@ +{% extends "anvil_consortium_manager/base.html" %} + +{% block title %}Collaborative Analysis Workspace audit{% endblock %} + + +{% block content %} + +

Collaborative analysis workspace audit

+ +
+ +
+ +

Audit results

+ +
+

+ To have access to a Collaborative Analysis Workspace, an account must meet both of the following criteria: + +

    +
  • Be in the analyst group associated with the workspace
  • +
  • Be in the auth domain for all source workspaces
  • +
+

+

The audit result categories are explained below. +

    + +
  • Verified includes the following:
  • +
      +
    • An account in the analyst group is in the auth domain for this workspace and is in all auth domains for all source workspaces.
    • +
    • An account in the analyst group is not in the auth domain for this workspace and is not in all auth domains for all source workspaces.
    • +
    + +
  • Needs action includes the following:
  • +
      +
    • An account in the analyst group is not in the auth domain for this workspace and is in all auth domains for all source workspaces.
    • +
    • An account in the analyst group is in the auth domain for this workspace and is not in all auth domains for all source workspaces.
    • +
    + +
  • Errors
  • +
      +
    • A group is unexpectedly in the auth domain.
    • +
    +
+

+

Any errors should be reported!

+
+ +{% include "__audit_tables.html" with verified_table=verified_table needs_action_table=needs_action_table errors_table=errors_table %} + +{% endblock content %} diff --git a/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_audit_all.html b/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_audit_all.html new file mode 100644 index 00000000..40730a51 --- /dev/null +++ b/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_audit_all.html @@ -0,0 +1,52 @@ +{% extends "anvil_consortium_manager/base.html" %} + +{% block title %}Collaborative analysis workspace audit{% endblock %} + + +{% block content %} + +

Collaborative analysis workspace audit

+ +

Audit results

+ +
+ This page shows the audit results for all Collaborative Analysis Workspaces in this app. + +

+ To have access to a Collaborative Analysis Workspace, an account must meet both of the following criteria: + +

    +
  • Be in the analyst group associated with the workspace
  • +
  • Be in the auth domain for all source workspaces
  • +
+

+

The audit result categories are explained below. +

    + +
  • Verified includes the following:
  • +
      +
    • An account in the analyst group is in the auth domain for this workspace and is in all auth domains for all source workspaces.
    • +
    • An account in the analyst group is not in the auth domain for this workspace and is not in all auth domains for all source workspaces.
    • +
    • The PRIMED_CC_WRITERS group is in the auth domain for this workspace.
    • +
    + +
  • Needs action includes the following:
  • +
      +
    • An account in the analyst group is not in the auth domain for this workspace and is in all auth domains for all source workspaces.
    • +
    • An account in the analyst group is in the auth domain for this workspace and is not in all auth domains for all source workspaces.
    • +
    • An account is not in the analyst group but is in the auth domain of this workspace.
    • +
    • The PRIMED_CC_WRITERS group needs to be added to the auth domain for this workspace.
    • +
    + +
  • Errors
  • +
      +
    • A group is unexpectedly in the auth domain.
    • +
    +
+

+

Any errors should be reported!

+
+ +{% include "__audit_tables.html" with verified_table=verified_table needs_action_table=needs_action_table errors_table=errors_table %} + +{% endblock content %} diff --git a/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_detail.html b/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_detail.html new file mode 100644 index 00000000..e8d77c71 --- /dev/null +++ b/primed/templates/collaborative_analysis/collaborativeanalysisworkspace_detail.html @@ -0,0 +1,35 @@ +{% extends "anvil_consortium_manager/workspace_detail.html" %} + +{% block workspace_data %} +
+
+
Custodian
+ + {{ workspace_data_object.custodian.name }} + +
+ +
Source workspaces
+ {% for workspace in workspace_data_object.source_workspaces.all %} +

{{ workspace }}

+ {% endfor %} +
+ {% if perms.anvil_consortium_manager.anvil_consortium_manager_staff_view %} +
Analyst group
+ + {{ workspace_data_object.analyst_group.name }} + +
+ {% endif %} +
+{% endblock workspace_data %} + + +{% block action_buttons %} +{% if perms.anvil_consortium_manager.anvil_consortium_manager_staff_view %} +

+ Audit workspace access +

+{% endif %} +{{block.super}} +{% endblock action_buttons %} diff --git a/primed/templates/collaborative_analysis/nav_items.html b/primed/templates/collaborative_analysis/nav_items.html new file mode 100644 index 00000000..b12d240d --- /dev/null +++ b/primed/templates/collaborative_analysis/nav_items.html @@ -0,0 +1,13 @@ +