From 94ab00e9bbf97f4b707dd225067fc577b936d3a4 Mon Sep 17 00:00:00 2001 From: Quentin BEY Date: Tue, 17 Dec 2024 18:21:24 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(teams)=20return=20parent=20teams=20in?= =?UTF-8?q?=20resource=20server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Also return the parent teams in the user's team endpoints. --- .../core/api/resource_server/viewsets.py | 41 +++++- .../resource_server_api/teams/test_list.py | 81 ++++++++++- .../teams/test_retrieve.py | 128 +++++++++++++++++- .../resource_server_api/teams/test_update.py | 92 +++++++++++++ 4 files changed, 337 insertions(+), 5 deletions(-) diff --git a/src/backend/core/api/resource_server/viewsets.py b/src/backend/core/api/resource_server/viewsets.py index 075a16c76..c14e4fb11 100644 --- a/src/backend/core/api/resource_server/viewsets.py +++ b/src/backend/core/api/resource_server/viewsets.py @@ -1,6 +1,10 @@ """Resource server API endpoints""" -from django.db.models import OuterRef, Prefetch, Subquery +import operator +from functools import reduce + +from django.db.models import OuterRef, Prefetch, Q, Subquery, Value +from django.db.models.functions import Coalesce from rest_framework import ( filters, @@ -56,6 +60,14 @@ class TeamViewSet( # pylint: disable=too-many-ancestors def get_queryset(self): """Custom queryset to get user related teams.""" + teams_queryset = models.Team.objects.filter( + accesses__user=self.request.user, + ) + depth_path = teams_queryset.values("depth", "path") + + if not depth_path: + return models.Team.objects.none() + user_role_query = models.TeamAccess.objects.filter( user=self.request.user, team=OuterRef("pk") ).values("role")[:1] @@ -74,10 +86,33 @@ def get_queryset(self): service_provider_prefetch, ) .filter( - accesses__user=self.request.user, + reduce( + operator.or_, + ( + Q( + # The team the user has access to + depth=d["depth"], + path=d["path"], + ) + | Q( + # The parent team the user has access to + depth__lt=d["depth"], + path__startswith=d["path"][: models.Team.steplen], + organization_id=self.request.user.organization_id, + ) + for d in depth_path + ), + ), service_providers__audience_id=service_provider_audience, ) - .annotate(user_role=Subquery(user_role_query)) + # Abilities are computed based on logged-in user's role for the team + # and if the user does not have access, it's ok to consider them as a member + # because it's a parent team. + .annotate( + user_role=Coalesce( + Subquery(user_role_query), Value(models.RoleChoices.MEMBER.value) + ) + ) ) def perform_create(self, serializer): diff --git a/src/backend/core/tests/resource_server_api/teams/test_list.py b/src/backend/core/tests/resource_server_api/teams/test_list.py index 121b334df..e72904130 100644 --- a/src/backend/core/tests/resource_server_api/teams/test_list.py +++ b/src/backend/core/tests/resource_server_api/teams/test_list.py @@ -64,7 +64,8 @@ def test_api_teams_list_authenticated( # pylint: disable=too-many-locals # Authenticate using the resource server, ie via the Authorization header with force_login_via_resource_server(client, user, service_provider.audience_id): - with django_assert_num_queries(4): # Count, Team, ServiceProvider, TeamAccess + with django_assert_num_queries(5): + # queries: Team path, Count, Team, ServiceProvider, TeamAccess response = client.get( "/resource-server/v1.0/teams/?ordering=created_at", format="json", @@ -182,3 +183,81 @@ def test_api_teams_order_param(client, force_login_via_resource_server): assert ( response_team_ids == team_ids ), "created_at values are not sorted from oldest to newest" + + +def test_api_teams_list_with_parent_teams(client, force_login_via_resource_server): + """ + Authenticated users should be able to list teams including parent teams. + Parent teams should not be listed if they don't have the service provider. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + root_team = factories.TeamFactory(name="Root", service_providers=[service_provider]) + first_team = factories.TeamFactory(name="First", parent_id=root_team.pk) + second_team = factories.TeamFactory( + name="Second", parent_id=first_team.pk, service_providers=[service_provider] + ) + + factories.TeamAccessFactory(user=user, team=second_team, role="member") + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + "/resource-server/v1.0/teams/", + format="json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_200_OK + response_data = response.json() + assert response_data["count"] == 2 + + team_ids = [team["id"] for team in response_data["results"]] + assert len(team_ids) == 2 + assert set(team_ids) == {str(root_team.id), str(second_team.id)} + + +def test_api_teams_list_with_parent_teams_other_organization( + client, force_login_via_resource_server +): + """ + Authenticated users should be able to list teams including parent teams. + Parent teams should not be listed if they don't have the service provider + or if the user does not belong to the organization. + """ + organization = factories.OrganizationFactory(with_registration_id=True) + user = factories.UserFactory(organization=organization) + service_provider = factories.ServiceProviderFactory() + + other_organization = factories.OrganizationFactory(with_registration_id=True) + root_team = factories.TeamFactory( + name="Root", + service_providers=[service_provider], + organization=other_organization, + ) + first_team = factories.TeamFactory( + name="First", parent_id=root_team.pk, organization=other_organization + ) + second_team = factories.TeamFactory( + name="Second", + parent_id=first_team.pk, + service_providers=[service_provider], + organization=other_organization, + ) + + factories.TeamAccessFactory(user=user, team=second_team, role="member") + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + "/resource-server/v1.0/teams/", + format="json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_200_OK + response_data = response.json() + assert response_data["count"] == 1 + + team_ids = [team["id"] for team in response_data["results"]] + assert len(team_ids) == 1 + assert set(team_ids) == {str(second_team.id)} diff --git a/src/backend/core/tests/resource_server_api/teams/test_retrieve.py b/src/backend/core/tests/resource_server_api/teams/test_retrieve.py index 058936097..cdf0e1ba7 100644 --- a/src/backend/core/tests/resource_server_api/teams/test_retrieve.py +++ b/src/backend/core/tests/resource_server_api/teams/test_retrieve.py @@ -8,7 +8,7 @@ from rest_framework.test import APIClient from core import factories -from core.factories import UserFactory +from core.factories import TeamAccessFactory, UserFactory pytestmark = pytest.mark.django_db @@ -97,3 +97,129 @@ def test_api_teams_retrieve_authenticated_other_service_provider( ) assert response.status_code == HTTP_404_NOT_FOUND + + +def test_api_teams_retrieve_authenticated_related_parent_same_organization( + client, force_login_via_resource_server +): + """ + Authenticated users should be allowed to retrieve a parent team + of a team to which they are related, only if they belong to the + same organization. + """ + organization = factories.OrganizationFactory(with_registration_id=True) + user = factories.UserFactory(organization=organization) + service_provider = factories.ServiceProviderFactory() + + root_team = factories.TeamFactory( + name="Root", + organization=organization, + ) + first_team = factories.TeamFactory( + name="First", + parent_id=root_team.pk, + organization=organization, + service_providers=[service_provider], + ) + second_team = factories.TeamFactory( + name="Second", + parent_id=first_team.pk, + service_providers=[service_provider], + organization=organization, + ) + TeamAccessFactory(user=user, team=second_team, role="member") + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + f"/resource-server/v1.0/teams/{first_team.pk}/", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "id": str(first_team.pk), + "name": first_team.name, + "created_at": first_team.created_at.isoformat().replace("+00:00", "Z"), + "updated_at": first_team.updated_at.isoformat().replace("+00:00", "Z"), + } + + +def test_api_teams_retrieve_authenticated_related_parent_other_organization( + client, force_login_via_resource_server +): + """ + Authenticated users should be allowed to retrieve a parent team + of a team to which they are related, only if they belong to the + same organization. + """ + organization = factories.OrganizationFactory(with_registration_id=True) + user = factories.UserFactory(organization=organization) + service_provider = factories.ServiceProviderFactory() + + other_organization = factories.OrganizationFactory(with_registration_id=True) + root_team = factories.TeamFactory( + name="Root", + organization=other_organization, + ) + first_team = factories.TeamFactory( + name="First", + parent_id=root_team.pk, + organization=other_organization, + service_providers=[service_provider], + ) + second_team = factories.TeamFactory( + name="Second", + parent_id=first_team.pk, + service_providers=[service_provider], + organization=other_organization, + ) + TeamAccessFactory(user=user, team=second_team, role="member") + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + f"/resource-server/v1.0/teams/{first_team.pk}/", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} + + +def test_api_teams_retrieve_authenticated_related_child_same_organization( + client, force_login_via_resource_server +): + """ + Authenticated users should NOT be allowed to retrieve a child team + of a team to which they are related, even if they belong to the + same organization. + """ + organization = factories.OrganizationFactory(with_registration_id=True) + user = factories.UserFactory(organization=organization) + service_provider = factories.ServiceProviderFactory() + + root_team = factories.TeamFactory( + name="Root", + organization=organization, + ) + first_team = factories.TeamFactory( + name="First", + parent_id=root_team.pk, + organization=organization, + service_providers=[service_provider], + ) + second_team = factories.TeamFactory( + name="Second", + parent_id=first_team.pk, + service_providers=[service_provider], + organization=organization, + ) + TeamAccessFactory(user=user, team=first_team, role="member") + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + f"/resource-server/v1.0/teams/{second_team.pk}/", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} diff --git a/src/backend/core/tests/resource_server_api/teams/test_update.py b/src/backend/core/tests/resource_server_api/teams/test_update.py index 852f7d689..467c95a49 100644 --- a/src/backend/core/tests/resource_server_api/teams/test_update.py +++ b/src/backend/core/tests/resource_server_api/teams/test_update.py @@ -244,3 +244,95 @@ def test_api_teams_update_administrator_or_owner_of_another( team.refresh_from_db() team_values = serializers.TeamSerializer(instance=team).data assert team_values == old_team_values + + +@pytest.mark.parametrize("role", ["administrator", "member", "owner"]) +def test_api_teams_update_parent_team(client, force_login_via_resource_server, role): + """ + Belonging to a team should NOT grant authorization to update + another parent team. + """ + organization = factories.OrganizationFactory(with_registration_id=True) + user = factories.UserFactory(organization=organization) + service_provider = factories.ServiceProviderFactory() + + root_team = factories.TeamFactory( + name="Root", + organization=organization, + ) + first_team = factories.TeamFactory( + name="First", + parent_id=root_team.pk, + organization=organization, + service_providers=[service_provider], + ) + second_team = factories.TeamFactory( + name="Second", + parent_id=first_team.pk, + service_providers=[service_provider], + organization=organization, + ) + factories.TeamAccessFactory(user=user, team=second_team, role=role) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.patch( + f"/resource-server/v1.0/teams/{first_team.pk}/", + { + "name": "New name", + }, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_403_FORBIDDEN + assert response.json() == { + "detail": "You do not have permission to perform this action." + } + + first_team.refresh_from_db() + assert first_team.name == "First" + + +@pytest.mark.parametrize("role", ["administrator", "member", "owner"]) +def test_api_teams_update_child_team(client, force_login_via_resource_server, role): + """ + Belonging to a team should NOT grant authorization to update + another child team. + """ + organization = factories.OrganizationFactory(with_registration_id=True) + user = factories.UserFactory(organization=organization) + service_provider = factories.ServiceProviderFactory() + + root_team = factories.TeamFactory( + name="Root", + organization=organization, + ) + first_team = factories.TeamFactory( + name="First", + parent_id=root_team.pk, + organization=organization, + service_providers=[service_provider], + ) + second_team = factories.TeamFactory( + name="Second", + parent_id=first_team.pk, + service_providers=[service_provider], + organization=organization, + ) + factories.TeamAccessFactory(user=user, team=first_team, role=role) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.patch( + f"/resource-server/v1.0/teams/{second_team.pk}/", + { + "name": "New name", + }, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} + + second_team.refresh_from_db() + assert second_team.name == "Second"