diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index 0b683cb37..567905497 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -14,6 +14,7 @@ Added ----- - Expose ``status`` on ``collection`` and ``entity`` viewset and allow filtering and sorting by it +- Add generic filtering by related objects which respects permissions Changed ------- diff --git a/resolwe/flow/filters.py b/resolwe/flow/filters.py index 7cd2d60f2..f93997e46 100644 --- a/resolwe/flow/filters.py +++ b/resolwe/flow/filters.py @@ -10,13 +10,13 @@ import types from copy import deepcopy from functools import partial -from typing import Callable, Union +from typing import Optional, Union from django.contrib.auth import get_user_model from django.contrib.auth.models import Group from django.contrib.postgres.search import SearchQuery, SearchRank from django.core.exceptions import ValidationError -from django.db.models import Count, F, ForeignKey, Q, Subquery +from django.db.models import Count, F, ForeignKey, Model, Q, Subquery from django.db.models.query import QuerySet from django_filters import rest_framework as filters from django_filters.constants import EMPTY_VALUES @@ -251,6 +251,95 @@ def __new__(mcs, name, bases, namespace): return super().__new__(mcs, name, bases, namespace) +class FilterWithPermissionsMeta(ResolweFilterMetaclass): + """Add filters for related objects with permissions. + + The base class must define the permission_filter class attribute in the format: + + permission_filters = [ + ("prefix", RelatedModel, RelatedModelFilter, ["field1", "field2"]), + ] + + This will add filters prefix_field1 and prefix_field2 and it will respect + permissions on the RelatedModel objects. + + When fields list is None all the filters from the RelatedModelFilter are added. + """ + + def __new__(mcs, name, bases, namespace): + """Inject filters with permissions.""" + + def _add_filters_with_permissions( + prefix: str, + BaseModel: Model, + BaseFilter: filters.FilterSet, + include_fields: Optional[list[str]], + ): + """Add filters on related objects with permissions. + + Consider all filters from BaseFilter filterset and add the one for fields + in include_fields list. + """ + + def filter_permissions( + self, + qs: QuerySet, + value: str, + original_filter: filters.Filter, + original_model: Model, + ): + """Respect permissions on entities.""" + + # Do not filter when value is empty. At least one of the values must be + # non-empty since form in the AnnotationValueFilter class requires it. + if value in EMPTY_VALUES: + return qs + + visible_objects = list( + original_filter.filter(original_model.objects.all(), value) + .filter_for_user(self.parent.request.user) + .values_list("pk", flat=True) + ) + return qs.filter(**{f"{prefix}__in": visible_objects}) + + # Add all filters from EntityFilter to namespaces before creating class. + for filter_name, filter in BaseFilter.get_filters().items(): + if ( + include_fields is not None + and filter.field_name not in include_fields + ): + continue + + new_filter_name = f"{prefix}__{filter_name}" + if filter_name == "id" or filter_name.startswith("id__"): + new_filter_name = f"{prefix}{filter_name[2:]}" + + filter_copy = deepcopy(filter) + filter_copy.field_name = f"{prefix}__{filter_copy.field_name}" + filter_method = partial( + filter_permissions, original_filter=filter, original_model=BaseModel + ) + # Bind the new_filter to filter instance and set it as new filter. + filter_copy.filter = types.MethodType(filter_method, filter_copy) + namespace[new_filter_name] = filter_copy + # If filter uses a method, add it to the namespace as well. + if filter_copy.method is not None: + namespace[filter_copy.method] = deepcopy( + getattr(BaseFilter, filter_copy.method) + ) + + for prefix, base_model, base_filter, included_fields in namespace.get( + "permission_filters", [] + ): + _add_filters_with_permissions( + prefix, base_model, base_filter, included_fields + ) + + # Create class with added filters. + klass = super().__new__(mcs, name, bases, namespace) + return klass + + class BaseResolweFilter( CheckQueryParamsMixin, filters.FilterSet, metaclass=ResolweFilterMetaclass ): @@ -551,64 +640,11 @@ def get_ordering(self, request, queryset, view): return self.get_default_ordering(view) -class AnnotationValueFieldMetaclass(ResolweFilterMetaclass): - """Add all entity filters prefixed with 'entity'.""" - - def __new__(mcs, name, bases, namespace): - """Inject extensions into the filter.""" - - def filter_permissions( - self, - qs: QuerySet, - value: str, - original_entity_filter: Callable[[QuerySet, str], QuerySet], - ): - """Respect permissions on entities.""" - # Do not filter when value is empty. At least one of the values must be - # non-empty since form in the AnnotationValueFilter class requires it. - if value in EMPTY_VALUES: - return qs - - # Filter the entities using the original entity filter and permissions. - visible_entities = list( - original_entity_filter(Entity.objects.all(), value) - .filter_for_user(self.parent.request.user) - .values_list("pk", flat=True) - ) - return qs.filter(**{f"{entity_path}__in": visible_entities}) - - entity_path = { - "AnnotationValueFilter": "entity", - "AnnotationFieldFilter": "values__entity", - }[name] - # Add all filters from EntityFilter to namespaces before creating class. - for filter_name, filter in EntityFilter.get_filters().items(): - new_name = f"entity__{filter_name}" - if filter_name == "id" or filter_name.startswith("id__"): - new_name = "entity" + filter_name[2:] - filter_copy = deepcopy(filter) - filter_copy.field_name = f"{entity_path}__{filter_copy.field_name}" - filter_method = partial( - filter_permissions, - original_entity_filter=filter.filter, - ) - # Bind the new_filter to filter instance and set it as new filter. - filter_copy.filter = types.MethodType(filter_method, filter_copy) - namespace[new_name] = filter_copy - # If filter uses a method, add it to the namespace as well. - if filter_copy.method is not None: - namespace[filter_copy.method] = deepcopy( - getattr(EntityFilter, filter_copy.method) - ) - - # Create class with added filters. - klass = ResolweFilterMetaclass.__new__(mcs, name, bases, namespace) - return klass - - -class AnnotationFieldFilter(BaseResolweFilter, metaclass=AnnotationValueFieldMetaclass): +class AnnotationFieldFilter(BaseResolweFilter, metaclass=FilterWithPermissionsMeta): """Filter the AnnotationField endpoint.""" + permission_filters = [("values__entity", Entity, EntityFilter, None)] + @classmethod def filter_for_field(cls, field, field_name, lookup_expr=None): """Add permission check for collections lookups. @@ -688,9 +724,10 @@ class Meta(BaseResolweFilter.Meta): } -class AnnotationValueFilter(BaseResolweFilter, metaclass=AnnotationValueFieldMetaclass): +class AnnotationValueFilter(BaseResolweFilter, metaclass=FilterWithPermissionsMeta): """Filter the AnnotationValue endpoint.""" + permission_filters = [("entity", Entity, EntityFilter, None)] label = filters.CharFilter(method="filter_by_label") def filter_by_label(self, queryset: QuerySet, name: str, value: str):