Skip to content

Commit

Permalink
Add generic filtering which respects permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
gregorjerse committed Dec 2, 2024
1 parent 21bef85 commit 31c9773
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 59 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Added
-----
- Expose ``status`` on ``collection`` and ``entity`` viewset and allow
filtering and sorting by it
- Add generic filtering by related objects which respects permissions

Changed
-------
Expand Down
160 changes: 101 additions & 59 deletions resolwe/flow/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import types
from copy import deepcopy
from functools import partial
from typing import Callable, Union
from typing import Optional, Union

from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.contrib.postgres.search import SearchQuery, SearchRank
from django.core.exceptions import ValidationError
from django.db.models import Count, F, ForeignKey, Q, Subquery
from django.db.models import Count, F, ForeignKey, Model, Q, Subquery
from django.db.models.query import QuerySet
from django_filters import rest_framework as filters
from django_filters.constants import EMPTY_VALUES
Expand Down Expand Up @@ -251,6 +251,100 @@ def __new__(mcs, name, bases, namespace):
return super().__new__(mcs, name, bases, namespace)


class FilterWithPermissionsMeta(ResolweFilterMetaclass):
"""Add filters for related objects with permissions.
The base class must define the permission_filter class attribute in the format:
permission_filters = [
("prefix", RelatedModel, RelatedModelFilter, ["field1", "field2"]),
]
This will add filters prefix_field1 and prefix_field2 and it will respect
permissions on the RelatedModel objects.
When fields list is None all the filters from the RelatedModelFilter are added.
"""

def __new__(mcs, name, bases, namespace):
"""Inject filters with permissions."""

def _add_filters_with_permissions(
prefix: str,
related_path: str,
BaseModel: Model,
BaseFilter: filters.FilterSet,
include_fields: Optional[list[str]],
):
"""Add filters on related objects with permissions.
Consider all filters from BaseFilter filterset and add the one for fields
in include_fields list.
"""

def filter_permissions(
self,
qs: QuerySet,
value: str,
original_filter: filters.Filter,
original_model: Model,
):
"""Respect permissions on entities."""

# Do not filter when value is empty. At least one of the values must be
# non-empty since form in the AnnotationValueFilter class requires it.
if value in EMPTY_VALUES:
return qs

visible_objects = list(
original_filter.filter(original_model.objects.all(), value)
.filter_for_user(self.parent.request.user)
.values_list("pk", flat=True)
)
return qs.filter(**{f"{related_path}__in": visible_objects})

# Add all filters from EntityFilter to namespaces before creating class.
for filter_name, filter in BaseFilter.get_filters().items():
if (
include_fields is not None
and filter.field_name not in include_fields
):
continue

new_filter_name = f"{prefix}__{filter_name}"
if filter_name == "id" or filter_name.startswith("id__"):
new_filter_name = f"{prefix}{filter_name[2:]}"

filter_copy = deepcopy(filter)
filter_copy.field_name = f"{prefix}__{filter_copy.field_name}"
filter_method = partial(
filter_permissions, original_filter=filter, original_model=BaseModel
)
# Bind the new_filter to filter instance and set it as new filter.
filter_copy.filter = types.MethodType(filter_method, filter_copy)
namespace[new_filter_name] = filter_copy
# If filter uses a method, add it to the namespace as well.
if filter_copy.method is not None:
namespace[filter_copy.method] = deepcopy(
getattr(BaseFilter, filter_copy.method)
)

for (
prefix,
related_path,
base_model,
base_filter,
included_fields,
) in namespace.get("permission_filters", []):
_add_filters_with_permissions(
prefix, related_path, base_model, base_filter, included_fields
)

# Create class with added filters.
klass = super().__new__(mcs, name, bases, namespace)
return klass


class BaseResolweFilter(
CheckQueryParamsMixin, filters.FilterSet, metaclass=ResolweFilterMetaclass
):
Expand Down Expand Up @@ -551,64 +645,11 @@ def get_ordering(self, request, queryset, view):
return self.get_default_ordering(view)


class AnnotationValueFieldMetaclass(ResolweFilterMetaclass):
"""Add all entity filters prefixed with 'entity'."""

def __new__(mcs, name, bases, namespace):
"""Inject extensions into the filter."""

def filter_permissions(
self,
qs: QuerySet,
value: str,
original_entity_filter: Callable[[QuerySet, str], QuerySet],
):
"""Respect permissions on entities."""
# Do not filter when value is empty. At least one of the values must be
# non-empty since form in the AnnotationValueFilter class requires it.
if value in EMPTY_VALUES:
return qs

# Filter the entities using the original entity filter and permissions.
visible_entities = list(
original_entity_filter(Entity.objects.all(), value)
.filter_for_user(self.parent.request.user)
.values_list("pk", flat=True)
)
return qs.filter(**{f"{entity_path}__in": visible_entities})

entity_path = {
"AnnotationValueFilter": "entity",
"AnnotationFieldFilter": "values__entity",
}[name]
# Add all filters from EntityFilter to namespaces before creating class.
for filter_name, filter in EntityFilter.get_filters().items():
new_name = f"entity__{filter_name}"
if filter_name == "id" or filter_name.startswith("id__"):
new_name = "entity" + filter_name[2:]
filter_copy = deepcopy(filter)
filter_copy.field_name = f"{entity_path}__{filter_copy.field_name}"
filter_method = partial(
filter_permissions,
original_entity_filter=filter.filter,
)
# Bind the new_filter to filter instance and set it as new filter.
filter_copy.filter = types.MethodType(filter_method, filter_copy)
namespace[new_name] = filter_copy
# If filter uses a method, add it to the namespace as well.
if filter_copy.method is not None:
namespace[filter_copy.method] = deepcopy(
getattr(EntityFilter, filter_copy.method)
)

# Create class with added filters.
klass = ResolweFilterMetaclass.__new__(mcs, name, bases, namespace)
return klass


class AnnotationFieldFilter(BaseResolweFilter, metaclass=AnnotationValueFieldMetaclass):
class AnnotationFieldFilter(BaseResolweFilter, metaclass=FilterWithPermissionsMeta):
"""Filter the AnnotationField endpoint."""

permission_filters = [("entity", "values__entity", Entity, EntityFilter, None)]

@classmethod
def filter_for_field(cls, field, field_name, lookup_expr=None):
"""Add permission check for collections lookups.
Expand Down Expand Up @@ -688,9 +729,10 @@ class Meta(BaseResolweFilter.Meta):
}


class AnnotationValueFilter(BaseResolweFilter, metaclass=AnnotationValueFieldMetaclass):
class AnnotationValueFilter(BaseResolweFilter, metaclass=FilterWithPermissionsMeta):
"""Filter the AnnotationValue endpoint."""

permission_filters = [("entity", "entity", Entity, EntityFilter, None)]
label = filters.CharFilter(method="filter_by_label")

def filter_by_label(self, queryset: QuerySet, name: str, value: str):
Expand Down

0 comments on commit 31c9773

Please sign in to comment.