Skip to content

Commit

Permalink
Run the collaborator audit as part of the run_dbgap_audit command
Browse files Browse the repository at this point in the history
  • Loading branch information
amstilp committed Jun 14, 2024
1 parent e2a3eab commit cbd3f05
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 50 deletions.
36 changes: 24 additions & 12 deletions primed/dbgap/management/commands/run_dbgap_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.template.loader import render_to_string
from django.urls import reverse

from ...audit import access_audit
from ...audit import access_audit, collaborator_audit


class Command(BaseCommand):
Expand All @@ -17,36 +17,44 @@ def add_arguments(self, parser):
help="""Email to which to send access reports that need action or have errors.""",
)

def handle(self, *args, **options):
def run_access_audit(self, *args, **options):
self.stdout.write("Running dbGaP access audit... ", ending="")
data_access_audit = access_audit.dbGaPAccessAudit()
data_access_audit.run_audit()
audit = access_audit.dbGaPAccessAudit()
audit.run_audit()
self._handle_audit_results(audit, reverse("dbgap:audit:access:all"), **options)

def run_collaborator_audit(self, *args, **options):
self.stdout.write("Running dbGaP collaborator audit... ", ending="")
audit = collaborator_audit.dbGaPCollaboratorAudit()
audit.run_audit()
self._handle_audit_results(audit, reverse("dbgap:audit:collaborators:all"), **options)

def _handle_audit_results(self, audit, url, **options):
# Report errors and needs access.
audit_ok = data_access_audit.ok()
audit_ok = audit.ok()
# Construct the url for handling errors.
url = "https://" + Site.objects.get_current().domain + reverse("dbgap:audit:access:all")
url = "https://" + Site.objects.get_current().domain + url
if audit_ok:
self.stdout.write(self.style.SUCCESS("ok!"))
else:
self.stdout.write(self.style.ERROR("problems found."))

# Print results
self.stdout.write("* Verified: {}".format(len(data_access_audit.verified)))
self.stdout.write("* Needs action: {}".format(len(data_access_audit.needs_action)))
self.stdout.write("* Errors: {}".format(len(data_access_audit.errors)))
self.stdout.write("* Verified: {}".format(len(audit.verified)))
self.stdout.write("* Needs action: {}".format(len(audit.needs_action)))
self.stdout.write("* Errors: {}".format(len(audit.errors)))

if not audit_ok:
self.stdout.write(self.style.ERROR(f"Please visit {url} to resolve these issues."))

# Send email if requested and there are problems.
email = options["email"]
subject = "dbGaP access audit - problems found"
subject = "{} - problems found".format(audit.__class__.__name__)
html_body = render_to_string(
"primed_anvil/email_audit_report.html",
context={
"title": "dbGaP access audit",
"data_access_audit": data_access_audit,
"title": "dbGaP collaborator audit",
"data_access_audit": audit,
"url": url,
},
)
Expand All @@ -58,3 +66,7 @@ def handle(self, *args, **options):
fail_silently=False,
html_message=html_body,
)

def handle(self, *args, **options):
self.run_access_audit(*args, **options)
self.run_collaborator_audit(*args, **options)
238 changes: 200 additions & 38 deletions primed/dbgap/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from io import StringIO

from anvil_consortium_manager.tests.factories import GroupGroupMembershipFactory
from anvil_consortium_manager.tests.factories import (
AccountFactory,
GroupGroupMembershipFactory,
)
from django.contrib.sites.models import Site
from django.core import mail
from django.core.management import call_command
Expand All @@ -14,46 +17,70 @@
class RunDbGaPAuditTest(TestCase):
"""Tests for the run_dbgap_audit command"""

def test_command_output_no_records(self):
def test_no_dbgap_applications(self):
"""Test command output."""
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
self.assertIn("Running dbGaP access audit... ok!", out.getvalue())
self.assertIn("* Verified: 0", out.getvalue())
self.assertIn("* Needs action: 0", out.getvalue())
self.assertIn("* Errors: 0", out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP access audit... ok!",
"* Verified: 0",
"* Needs action: 0",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP collaborator audit... ok!",
"* Verified: 0",
"* Needs action: 0",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_command_run_audit_one_instance_verified(self):
def test_access_audit_one_instance_verified(self):
"""Test command output with one verified instance."""
# Create a workspace and matching DAR.
factories.dbGaPWorkspaceFactory.create()
factories.dbGaPApplicationFactory.create()
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
self.assertIn("Running dbGaP access audit... ok!", out.getvalue())
self.assertIn("* Verified: 1", out.getvalue())
self.assertIn("* Needs action: 0", out.getvalue())
self.assertIn("* Errors: 0", out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP access audit... ok!",
"* Verified: 1",
"* Needs action: 0",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_command_run_audit_one_instance_needs_action(self):
def test_access_audit_one_instance_needs_action(self):
"""Test command output with one needs_action instance."""
# Create a workspace and matching DAR.
dbgap_workspace = factories.dbGaPWorkspaceFactory.create()
factories.dbGaPDataAccessRequestForWorkspaceFactory.create(dbgap_workspace=dbgap_workspace)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
self.assertIn("Running dbGaP access audit... problems found.", out.getvalue())
self.assertIn("* Verified: 0", out.getvalue())
self.assertIn("* Needs action: 1", out.getvalue())
self.assertIn("* Errors: 0", out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP access audit... problems found.",
"* Verified: 0",
"* Needs action: 1",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_command_run_audit_one_instance_error(self):
def test_access_audit_one_instance_error(self):
"""Test command output with one error instance."""
# Create a workspace and matching DAR.
dbgap_workspace = factories.dbGaPWorkspaceFactory.create()
Expand All @@ -64,14 +91,19 @@ def test_command_run_audit_one_instance_error(self):
)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
self.assertIn("Running dbGaP access audit... problems found.", out.getvalue())
self.assertIn("* Verified: 0", out.getvalue())
self.assertIn("* Needs action: 0", out.getvalue())
self.assertIn("* Errors: 1", out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP access audit... problems found.",
"* Verified: 0",
"* Needs action: 0",
"* Errors: 1",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_command_run_audit_one_instance_verified_email(self):
def test_access_audit_one_instance_verified_email(self):
"""No email is sent when there are no errors."""
# Create a workspace and matching DAR.
factories.dbGaPWorkspaceFactory.create()
Expand All @@ -82,24 +114,29 @@ def test_command_run_audit_one_instance_verified_email(self):
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_command_run_audit_one_instance_needs_action_email(self):
def test_access_audit_one_instance_needs_action_email(self):
"""Email is sent for one needs_action instance."""
# Create a workspace and matching DAR.
dbgap_workspace = factories.dbGaPWorkspaceFactory.create()
factories.dbGaPDataAccessRequestForWorkspaceFactory.create(dbgap_workspace=dbgap_workspace)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", email="[email protected]", stdout=out)
self.assertIn("Running dbGaP access audit... problems found.", out.getvalue())
self.assertIn("* Verified: 0", out.getvalue())
self.assertIn("* Needs action: 1", out.getvalue())
self.assertIn("* Errors: 0", out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP access audit... problems found.",
"* Verified: 0",
"* Needs action: 1",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# One message has been sent by default.
self.assertEqual(len(mail.outbox), 1)
email = mail.outbox[0]
self.assertEqual(email.to, ["[email protected]"])
self.assertEqual(email.subject, "dbGaP access audit - problems found")
self.assertEqual(email.subject, "dbGaPAccessAudit - problems found")

def test_command_run_audit_one_instance_error_email(self):
def test_access_audit_one_instance_error_email(self):
"""Test command output with one error instance."""
# Create a workspace and matching DAR.
dbgap_workspace = factories.dbGaPWorkspaceFactory.create()
Expand All @@ -110,17 +147,22 @@ def test_command_run_audit_one_instance_error_email(self):
)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", email="[email protected]", stdout=out)
self.assertIn("Running dbGaP access audit... problems found.", out.getvalue())
self.assertIn("* Verified: 0", out.getvalue())
self.assertIn("* Needs action: 0", out.getvalue())
self.assertIn("* Errors: 1", out.getvalue())
expected_string = "\n".join(
[
"Running dbGaP access audit... problems found.",
"* Verified: 0",
"* Needs action: 0",
"* Errors: 1",
]
)
self.assertIn(expected_string, out.getvalue())
# One message has been sent by default.
self.assertEqual(len(mail.outbox), 1)
email = mail.outbox[0]
self.assertEqual(email.to, ["[email protected]"])
self.assertEqual(email.subject, "dbGaP access audit - problems found")
self.assertEqual(email.subject, "dbGaPAccessAudit - problems found")

def test_different_domain(self):
def test_access_audit_different_domain(self):
"""Test command output when a different domain is specified."""
site = Site.objects.create(domain="foobar.com", name="test")
site.save()
Expand All @@ -132,6 +174,126 @@ def test_different_domain(self):
self.assertIn("Running dbGaP access audit... problems found.", out.getvalue())
self.assertIn("https://foobar.com", out.getvalue())

def test_command_collaborator_audit(self):
"""run_dbgap_audit runs an audit on collaborators."""
self.fail()
def test_collaborator_audit_one_instance_verified(self):
"""Test command output with one verified instance."""
# Create a workspace and matching DAR.
factories.dbGaPApplicationFactory.create()
# Verified no access for PI.
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
expected_string = "\n".join(
[
"Running dbGaP collaborator audit... ok!",
"* Verified: 1",
"* Needs action: 0",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_collaborator_audit_one_instance_needs_action(self):
"""Test command output with one needs_action instance."""
application = factories.dbGaPApplicationFactory.create()
AccountFactory.create(user=application.principal_investigator)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
expected_string = "\n".join(
[
"Running dbGaP collaborator audit... problems found.",
"* Verified: 0",
"* Needs action: 1",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_collaborator_audit_one_instance_error(self):
"""Test command output with one error instance."""
application = factories.dbGaPApplicationFactory.create()
GroupGroupMembershipFactory(
parent_group=application.anvil_access_group,
)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
expected_string = "\n".join(
[
"Running dbGaP collaborator audit... problems found.",
"* Verified: 1", # PI - no linked account, verified no access.
"* Needs action: 0",
"* Errors: 1",
]
)
self.assertIn(expected_string, out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_collaborator_audit_one_instance_verified_email(self):
"""No email is sent when there are no errors."""
factories.dbGaPApplicationFactory.create()
# Verified no access for PI.
out = StringIO()
call_command("run_dbgap_audit", "--no-color", email="[email protected]", stdout=out)
self.assertIn("Running dbGaP collaborator audit... ok!", out.getvalue())
# Zero messages have been sent by default.
self.assertEqual(len(mail.outbox), 0)

def test_collaborator_audit_one_instance_needs_action_email(self):
"""Email is sent for one needs_action instance."""
# Create a workspace and matching DAR.
application = factories.dbGaPApplicationFactory.create()
AccountFactory.create(user=application.principal_investigator)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", email="[email protected]", stdout=out)
expected_string = "\n".join(
[
"Running dbGaP collaborator audit... problems found.",
"* Verified: 0",
"* Needs action: 1",
"* Errors: 0",
]
)
self.assertIn(expected_string, out.getvalue())
# One message has been sent by default.
self.assertEqual(len(mail.outbox), 1)
email = mail.outbox[0]
self.assertEqual(email.to, ["[email protected]"])
self.assertEqual(email.subject, "dbGaPCollaboratorAudit - problems found")

def test_collaborator_audit_one_instance_error_email(self):
"""Test command output with one error instance."""
application = factories.dbGaPApplicationFactory.create()
GroupGroupMembershipFactory(
parent_group=application.anvil_access_group,
)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", email="[email protected]", stdout=out)
expected_string = "\n".join(
[
"Running dbGaP collaborator audit... problems found.",
"* Verified: 1", # PI - no linked account, verified no access.
"* Needs action: 0",
"* Errors: 1",
]
)
self.assertIn(expected_string, out.getvalue())
# One message has been sent by default.
self.assertEqual(len(mail.outbox), 1)
email = mail.outbox[0]
self.assertEqual(email.to, ["[email protected]"])
self.assertEqual(email.subject, "dbGaPCollaboratorAudit - problems found")

def test_collaborator_audit_different_domain(self):
"""Test command output when a different domain is specified."""
site = Site.objects.create(domain="foobar.com", name="test")
site.save()
with self.settings(SITE_ID=site.id):
application = factories.dbGaPApplicationFactory.create()
AccountFactory.create(user=application.principal_investigator)
out = StringIO()
call_command("run_dbgap_audit", "--no-color", stdout=out)
self.assertIn("Running dbGaP collaborator audit... problems found.", out.getvalue())
self.assertIn("https://foobar.com", out.getvalue())

0 comments on commit cbd3f05

Please sign in to comment.