Skip to content

Commit

Permalink
Merge pull request #554 from UW-GAC/feature/ignore-audit-errors
Browse files Browse the repository at this point in the history
Allow audit errors to be ignored
  • Loading branch information
amstilp authored Dec 19, 2024
2 parents 6a3eedd + 13b2bd9 commit 123b8f4
Show file tree
Hide file tree
Showing 88 changed files with 11,147 additions and 8,125 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change log

## Devel

* Restructure audits by moving them to their own sub-app (`anvil_consortium_manager.auditor`)
* Allow users to ignore a specific ManagedGroupMembershipAudit "not in app" record

## 0.27.0 (2024-12-06)

* Allow a user to link an Account that is not or has never been linked to another user.
Expand Down
2 changes: 1 addition & 1 deletion anvil_consortium_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.27.0"
__version__ = "0.28.0"
485 changes: 0 additions & 485 deletions anvil_consortium_manager/audit/audit.py

This file was deleted.

File renamed without changes.
22 changes: 22 additions & 0 deletions anvil_consortium_manager/auditor/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Admin classes for the anvil_consortium_manager.auditor app."""

from django.contrib import admin
from simple_history.admin import SimpleHistoryAdmin

from . import models


@admin.register(models.IgnoredManagedGroupMembership)
class IgnoredManagedGroupMembershipAdmin(SimpleHistoryAdmin):
"""Admin class for the IgnoredManagedGroupMembership model."""

list_display = (
"pk",
"group",
"ignored_email",
"added_by",
)
search_fields = (
"group",
"ignored_email",
)
6 changes: 6 additions & 0 deletions anvil_consortium_manager/auditor/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class AuditorConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "anvil_consortium_manager.auditor"
Empty file.
18 changes: 18 additions & 0 deletions anvil_consortium_manager/auditor/audit/accounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from anvil_consortium_manager.models import Account

from .base import AnVILAudit, ModelInstanceResult


class AccountAudit(AnVILAudit):
"""Class that runs an audit for Account instances."""

ERROR_NOT_IN_ANVIL = "Not in AnVIL"
"""Error when the Account does not exist in AnVIL."""

def run_audit(self):
# Only checks active accounts.
for account in Account.objects.active():
model_instance_result = ModelInstanceResult(account)
if not account.anvil_exists():
model_instance_result.add_error(self.ERROR_NOT_IN_ANVIL)
self.add_result(model_instance_result)
201 changes: 201 additions & 0 deletions anvil_consortium_manager/auditor/audit/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from abc import ABC

import django_tables2 as tables


# Audit classes for individual model instances:
class ModelInstanceResult:
"""Class to hold an audit result for a specific instance of a model."""

def __init__(self, model_instance):
self.model_instance = model_instance
self.errors = set()

def __eq__(self, other):
return self.model_instance == other.model_instance and self.errors == other.errors

def __str__(self):
return str(self.model_instance)

def add_error(self, error):
"""Add an error to the audit result for this model instance."""
self.errors.add(error)

def ok(self):
"""Check whether an audit result has errors."""

if self.errors:
return False
else:
return True


class NotInAppResult:
"""Class to hold an audit result for a record that is not present in the app."""

def __init__(self, record):
self.record = record

def __str__(self):
return self.record

def __eq__(self, other):
return self.record == other.record


class IgnoredResult:
"""Class to hold an audit result for a specific record in an Ignore table."""

def __init__(self, model_instance, record=None):
self.record = record
self.model_instance = model_instance

def __eq__(self, other):
return self.model_instance == other.model_instance and self.record == other.record

def __str__(self):
return str(self.record)


# Tables for reporting audit results:
class VerifiedTable(tables.Table):
"""Table for verified results."""

model_instance = tables.columns.Column(linkify=True, orderable=False)


# Tables for reporting audit results:
class ErrorTable(tables.Table):
"""Table for results with errors."""

model_instance = tables.columns.Column(linkify=True, orderable=False)
errors = tables.columns.Column(orderable=False)

def render_errors(self, record):
return ", ".join(sorted(record.errors))


class NotInAppTable(tables.Table):
record = tables.columns.Column(orderable=False, empty_values=())


class IgnoredTable(tables.Table):
model_instance = tables.columns.Column(orderable=False, verbose_name="Details")
record = tables.columns.Column(orderable=False)

def render_model_instance(self, record):
return "See details"


# Audit classes for object classes:
class AnVILAudit(ABC):
"""Abstract base class for AnVIL audit results."""

verified_table_class = VerifiedTable
error_table_class = ErrorTable
not_in_app_table_class = NotInAppTable
ignored_table_class = IgnoredTable

def __init__(self):
self._model_instance_results = []
self._not_in_app_results = []
self._ignored_results = []

def ok(self):
model_instances_ok = all([x.ok() for x in self._model_instance_results])
not_in_app_ok = len(self._not_in_app_results) == 0
return model_instances_ok and not_in_app_ok

def run_audit(self):
raise NotImplementedError("Define a run_audit method.")

def add_result(self, result):
if isinstance(result, NotInAppResult):
self._add_not_in_app_result(result)
elif isinstance(result, IgnoredResult):
self._add_ignored_result(result)
elif isinstance(result, ModelInstanceResult):
self._add_model_instance_result(result)
else:
raise ValueError("result must be ModelInstanceResult, NotInAppResult or IgnoredResult.")

def _add_not_in_app_result(self, result):
# Check that it hasn't been added yet.
check = [x for x in self._not_in_app_results if x == result]
if len(check) > 0:
raise ValueError("Already added a result for {}.".format(result.record))
self._not_in_app_results.append(result)

def _add_model_instance_result(self, result):
check = [x for x in self._model_instance_results if x.model_instance == result.model_instance]
if len(check) > 0:
raise ValueError("Already added a result for {}.".format(result.model_instance))
self._model_instance_results.append(result)

def _add_ignored_result(self, result):
check = [x for x in self._ignored_results if x.model_instance == result.model_instance]
if len(check) > 0:
raise ValueError("Already added a result for {}.".format(result.model_instance))
self._ignored_results.append(result)

def get_result_for_model_instance(self, model_instance):
results = [x for x in self._model_instance_results if x.model_instance == model_instance]
if len(results) != 1:
raise ValueError("model_instance is not in the results.")
return results[0]

def get_verified_results(self):
return [x for x in self._model_instance_results if x.ok()]

def get_error_results(self):
return [x for x in self._model_instance_results if not x.ok()]

def get_ignored_results(self):
return self._ignored_results

def get_not_in_app_results(self):
return self._not_in_app_results

def get_verified_table(self):
return self.verified_table_class(self.get_verified_results())

def get_error_table(self):
return self.error_table_class(self.get_error_results())

def get_not_in_app_table(self):
return self.not_in_app_table_class(self.get_not_in_app_results())

def get_ignored_table(self):
return self.ignored_table_class(self.get_ignored_results())

def export(
self,
include_verified=True,
include_errors=True,
include_not_in_app=True,
include_ignored=True,
):
"""Return a dictionary representation of the audit results."""
exported_results = {}
if include_verified:
exported_results["verified"] = [
{"id": result.model_instance.pk, "instance": result.model_instance}
for result in self.get_verified_results()
]
if include_errors:
exported_results["errors"] = [
{
"id": result.model_instance.pk,
"instance": result.model_instance,
"errors": list(result.errors),
}
for result in self.get_error_results()
]
if include_not_in_app:
exported_results["not_in_app"] = list(sorted([x.record for x in self.get_not_in_app_results()]))
if include_ignored:
exported_results["ignored"] = [
{"id": result.model_instance.pk, "instance": result.model_instance, "record": result.record}
for result in self.get_ignored_results()
]
return exported_results
18 changes: 18 additions & 0 deletions anvil_consortium_manager/auditor/audit/billing_projects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from anvil_consortium_manager.models import BillingProject

from .base import AnVILAudit, ModelInstanceResult


class BillingProjectAudit(AnVILAudit):
"""Class that runs an audit for BillingProject instances."""

ERROR_NOT_IN_ANVIL = "Not in AnVIL"
"""Error when a BillingProject in the app does not exist in AnVIL."""

def run_audit(self):
# Check that all billing projects exist.
for billing_project in BillingProject.objects.filter(has_app_as_user=True).all():
model_instance_result = ModelInstanceResult(billing_project)
if not billing_project.anvil_exists():
model_instance_result.add_error(self.ERROR_NOT_IN_ANVIL)
self.add_result(model_instance_result)
Loading

0 comments on commit 123b8f4

Please sign in to comment.