diff --git a/corehq/apps/api/resources/v1_0.py b/corehq/apps/api/resources/v1_0.py index 5d5f7d909fb4..9296141b2ad6 100644 --- a/corehq/apps/api/resources/v1_0.py +++ b/corehq/apps/api/resources/v1_0.py @@ -1,4 +1,8 @@ +from datetime import datetime + +from django.http import JsonResponse from django.urls import re_path as url + from tastypie import fields from tastypie.exceptions import ImmediateHttpResponse from tastypie.http import HttpNotFound @@ -7,10 +11,21 @@ DomainSpecificResourceMixin, HqBaseResource, ) +from corehq.apps.api.resources.auth import RequirePermissionAuthentication from corehq.apps.api.resources.meta import CustomResourceMeta from corehq.apps.users.role_utils import get_commcare_analytics_access_for_user_domain from corehq import toggles -from corehq.apps.users.models import CouchUser +from corehq.apps.locations.models import SQLLocation +from corehq.apps.users.models import ( + CouchUser, + HqPermissions, + Invitation, +) +from corehq.apps.reports.util import ( + get_tableau_group_ids_by_names, + get_tableau_groups_by_ids, +) +from corehq.apps.api.validation import WebUserResourceValidator, WebUserSpec class CommCareAnalyticsUserResource(CouchResourceMixin, HqBaseResource, DomainSpecificResourceMixin): @@ -49,3 +64,85 @@ def prepend_urls(self): return [ url(r"^$", self.wrap_view('dispatch_detail'), name='api_dispatch_detail'), ] + + +class InvitationResource(HqBaseResource, DomainSpecificResourceMixin): + id = fields.CharField(attribute='uuid', readonly=True, unique=True) + email = fields.CharField(attribute='email') + role = fields.CharField() + primary_location_id = fields.CharField(attribute='primary_location_id', null=True) + assigned_location_ids = fields.ListField(null=True) + profile = fields.CharField(null=True) + custom_user_data = fields.DictField(attribute='custom_user_data') + tableau_role = fields.CharField(attribute='tableau_role', null=True) + tableau_groups = fields.ListField(null=True) + + class Meta(CustomResourceMeta): + resource_name = "invitation" + authentication = RequirePermissionAuthentication(HqPermissions.edit_web_users) + allowed_methods = ['post'] + always_return_data = True + + def dehydrate_role(self, bundle): + return bundle.obj.get_role_name() + + def dehydrate_assigned_location_ids(self, bundle): + return list(bundle.obj.assigned_locations.values_list('location_id', flat=True)) + + def dehydrate_tableau_groups(self, bundle): + return [group.name for group in get_tableau_groups_by_ids(bundle.obj.tableau_group_ids, + bundle.request.domain)] + + def dehydrate_profile(self, bundle): + if bundle.obj.profile: + return bundle.obj.profile.name + + def obj_create(self, bundle, **kwargs): + domain = kwargs['domain'] + validator = WebUserResourceValidator(domain, bundle.request.couch_user) + spec = WebUserSpec( + email=bundle.data.pop('email'), + role=bundle.data.pop('role'), + primary_location_id=bundle.data.pop('primary_location_id', None), + assigned_location_ids=bundle.data.pop('assigned_location_ids', None), + profile=bundle.data.pop('profile', None), + custom_user_data=bundle.data.pop('custom_user_data', None), + tableau_role=bundle.data.pop('tableau_role', None), + tableau_groups=bundle.data.pop('tableau_groups', None), + unhandled_data=bundle.data, + ) + errors = validator.is_valid(spec, True) + if errors: + raise ImmediateHttpResponse(JsonResponse({"errors": errors}, status=400)) + + profile = validator.profiles_by_name.get(spec.profile) + role_id = validator.roles_by_name.get(spec.role) + tableau_group_ids = get_tableau_group_ids_by_names(spec.tableau_groups or [], domain) + + primary_loc_id = None + assigned_locs = [] + if spec.assigned_location_ids: + primary_loc_id = spec.primary_location_id + assigned_locs = SQLLocation.active_objects.filter( + location_id__in=spec.assigned_location_ids, domain=domain) + real_ids = [loc.location_id for loc in assigned_locs] + + if missing_ids := set(spec.assigned_location_ids) - set(real_ids): + raise ImmediateHttpResponse(JsonResponse( + {"error": f"Could not find location ids: {', '.join(missing_ids)}."}, status=400)) + + invite = Invitation.objects.create( + domain=domain, + email=spec.email.lower(), + role=role_id, + primary_location_id=primary_loc_id, + profile=profile, + custom_user_data=spec.custom_user_data or {}, + tableau_role=spec.tableau_role, + tableau_group_ids=tableau_group_ids, + invited_by=bundle.request.couch_user.user_id, + invited_on=datetime.utcnow(), + ) + invite.assigned_locations.set(assigned_locs) + bundle.obj = invite + return bundle diff --git a/corehq/apps/api/tests/test_user_resources.py b/corehq/apps/api/tests/test_user_resources.py index d9d135f42058..378b64f9d831 100644 --- a/corehq/apps/api/tests/test_user_resources.py +++ b/corehq/apps/api/tests/test_user_resources.py @@ -29,6 +29,7 @@ UserHistory, UserRole, WebUser, + Invitation, ) from corehq.apps.users.role_utils import ( UserRolePresets, @@ -745,3 +746,92 @@ def test_user_roles_returned(self): 'roles': ['sql_lab', 'dataset_editor'] } self.assertEqual(response.json(), expected_response_obj) + + +class TestInvitationResource(APIResourceTest): + resource = v1_0.InvitationResource + api_name = 'v1' + + @classmethod + def setUpClass(cls): + super().setUpClass() + initialize_domain_with_default_roles(cls.domain.name) + cls.definition = CustomDataFieldsDefinition(domain=cls.domain.name, + field_type=UserFieldsView.field_type) + cls.definition.save() + cls.definition.set_fields([ + Field( + slug='imaginary', + label='Imaginary Person', + choices=['yes', 'no'], + ), + ]) + cls.definition.save() + cls.profile = CustomDataFieldsProfile( + name='character', + fields={'imaginary': 'yes'}, + definition=cls.definition, + ) + cls.profile.save() + cls.loc_type = LocationType.objects.create(domain=cls.domain.name, name='loc_type') + cls.loc1 = SQLLocation.objects.create( + location_id='loc1', location_type=cls.loc_type, domain=cls.domain.name) + cls.loc2 = SQLLocation.objects.create( + location_id='loc2', location_type=cls.loc_type, domain=cls.domain.name) + + @classmethod + def tearDownClass(cls): + cls.definition.delete() + super().tearDownClass() + + @patch('corehq.apps.api.validation.WebUserResourceValidator.is_valid') + @patch('corehq.apps.api.resources.v1_0.get_tableau_group_ids_by_names') + @patch('corehq.apps.api.resources.v1_0.get_tableau_groups_by_ids') + def test_create(self, mock_get_tableau_groups_by_ids, + mock_get_tableau_group_ids_by_names, mock_is_valid): + mock_is_valid.return_value = [] + mock_get_tableau_group_ids_by_names.return_value = ["123", "456"] + from corehq.apps.reports.util import TableauGroupTuple + mock_get_tableau_groups_by_ids.return_value = [TableauGroupTuple("group1", "123")] + self.assertEqual(0, Invitation.objects.all().count()) + + user_json = { + "email": "jdoe1@example.org", + "role": "App Editor", + } + + response = self._assert_auth_post_resource(self.list_endpoint, + json.dumps(user_json), + content_type='application/json') + self.assertEqual(response.status_code, 201) + invitation = Invitation.objects.get(email="jdoe1@example.org") + self.addCleanup(invitation.delete) + self.assertEqual(invitation.get_role_name(), "App Editor") + + user_json = { + "email": "jdoe2@example.org", + "role": "App Editor", + "primary_location_id": "loc1", + "assigned_location_ids": ["loc1", "loc2"], + "profile": "character", + "custom_user_data": { + "favorite_subject": "math", + }, + "tableau_role": "Viewer", + "tableau_groups": ["group1", "group2"] + } + response = self._assert_auth_post_resource(self.list_endpoint, + json.dumps(user_json), + content_type='application/json') + self.assertEqual(response.status_code, 201) + self.assertEqual(mock_is_valid.call_count, 2) + + invitation = Invitation.objects.get(email="jdoe2@example.org") + self.addCleanup(invitation.delete) + self.assertEqual(invitation.get_role_name(), "App Editor") + self.assertEqual(invitation.primary_location, self.loc1) + self.assertEqual(list(invitation.assigned_locations.all()), [self.loc1, self.loc2]) + self.assertEqual(invitation.profile, self.profile) + self.assertEqual(invitation.custom_user_data["favorite_subject"], "math") + self.assertEqual(invitation.tableau_role, "Viewer") + self.assertEqual(invitation.tableau_group_ids, ["123", "456"]) diff --git a/corehq/apps/api/tests/test_validation.py b/corehq/apps/api/tests/test_validation.py index 46f20537b696..c7c0fdd8cd38 100644 --- a/corehq/apps/api/tests/test_validation.py +++ b/corehq/apps/api/tests/test_validation.py @@ -1,10 +1,17 @@ from django.test import TestCase from unittest.mock import patch -from corehq.apps.api.validation import WebUserResourceValidator +from corehq.apps.api.validation import WebUserResourceValidator, WebUserSpec +from corehq.apps.custom_data_fields.models import ( + CustomDataFieldsDefinition, + CustomDataFieldsProfile, + Field, +) from corehq.apps.domain.models import Domain from corehq.apps.users.models import WebUser +from corehq.apps.users.views.mobile.custom_data_fields import UserFieldsView from corehq.util.test_utils import flag_enabled, flag_disabled +from corehq.apps.users.role_utils import initialize_domain_with_default_roles class TestWebUserResourceValidator(TestCase): @@ -15,61 +22,113 @@ def setUpClass(cls): cls.domain = Domain(name="test-domain", is_active=True) cls.domain.save() cls.addClassCleanup(cls.domain.delete) + initialize_domain_with_default_roles(cls.domain.name) cls.requesting_user = WebUser.create(cls.domain.name, "test@example.com", "123", None, None) + cls.requesting_user.set_role(cls.domain.name, 'admin') + cls.requesting_user.save() cls.validator = WebUserResourceValidator(cls.domain.name, cls.requesting_user) + cls.definition = CustomDataFieldsDefinition.get_or_create(cls.domain.name, UserFieldsView.field_type) + cls.definition.save() + cls.definition.set_fields([ + Field( + slug='imaginary', + label='Imaginary Person', + choices=['yes', 'no'], + ), + ]) + cls.definition.save() + cls.profile = CustomDataFieldsProfile( + name='character', + fields={'imaginary': 'yes'}, + definition=cls.definition, + ) + cls.profile.save() + @classmethod def tearDownClass(cls): cls.requesting_user.delete(None, None) super().tearDownClass() + def test_simple_is_valid(self): + spec = WebUserSpec(email="newtest@example.com", + role="App Editor") + self.assertEqual(self.validator.is_valid(spec, True), []) + def test_validate_parameters(self): params = {"email": "test@example.com", "role": "Admin"} - self.assertIsNone(self.validator.validate_parameters(params)) + self.assertEqual(self.validator.validate_parameters(params, True), []) + + params = {"email": "test@example.com", "role": "Admin"} + self.assertEqual(self.validator.validate_parameters(params, False), ["Invalid parameter(s): email"]) invalid_params = {"invalid_param": "value"} - self.assertEqual(self.validator.validate_parameters(invalid_params), "Invalid parameter(s): invalid_param") + self.assertEqual(self.validator.validate_parameters(invalid_params, True), + ["Invalid parameter(s): invalid_param"]) + self.assertEqual(self.validator.validate_parameters(invalid_params, False), + ["Invalid parameter(s): invalid_param"]) @flag_enabled('TABLEAU_USER_SYNCING') @patch('corehq.apps.users.models.WebUser.has_permission', return_value=True) def test_validate_parameters_with_tableau_edit_permission(self, mock_has_permission): params = {"email": "test@example.com", "role": "Admin", "tableau_role": "Viewer"} - self.assertIsNone(self.validator.validate_parameters(params)) + self.assertEqual(self.validator.validate_parameters(params, True), []) @flag_disabled('TABLEAU_USER_SYNCING') @patch('corehq.apps.users.models.WebUser.has_permission', return_value=False) def test_validate_parameters_without_tableau_edit_permission(self, mock_has_permission): params = {"email": "test@example.com", "role": "Admin", "tableau_role": "Viewer"} - self.assertEqual(self.validator.validate_parameters(params), - "You do not have permission to edit Tableau Configuration.") + self.assertEqual(self.validator.validate_parameters(params, True), + ["You do not have permission to edit Tableau Configuration."]) - @patch('corehq.apps.registration.validation.domain_has_privilege', return_value=True) + @patch('corehq.apps.api.validation.domain_has_privilege', return_value=True) def test_validate_parameters_with_profile_permission(self, mock_domain_has_privilege): params = {"email": "test@example.com", "role": "Admin", "profile": "some_profile"} - self.assertIsNone(self.validator.validate_parameters(params)) + self.assertEqual(self.validator.validate_parameters(params, True), []) - @patch('corehq.apps.registration.validation.domain_has_privilege', return_value=False) + @patch('corehq.apps.api.validation.domain_has_privilege', return_value=False) def test_validate_parameters_without_profile_permission(self, mock_domain_has_privilege): params = {"email": "test@example.com", "role": "Admin", "profile": "some_profile"} - self.assertEqual(self.validator.validate_parameters(params), - "This domain does not have user profile privileges.") + self.assertEqual(self.validator.validate_parameters(params, True), + ["This domain does not have user profile privileges."]) - @patch('corehq.apps.registration.validation.domain_has_privilege', return_value=True) + @patch('corehq.apps.api.validation.domain_has_privilege', return_value=True) def test_validate_parameters_with_location_privilege(self, mock_domain_has_privilege): - params = {"email": "test@example.com", "role": "Admin", "primary_location": "some_location"} - self.assertIsNone(self.validator.validate_parameters(params)) - params = {"email": "test@example.com", "role": "Admin", "assigned_locations": "some_location"} - self.assertIsNone(self.validator.validate_parameters(params)) + params = {"email": "test@example.com", "role": "Admin", "primary_location_id": "some_location"} + self.assertEqual(self.validator.validate_parameters(params, True), []) + params = {"email": "test@example.com", "role": "Admin", "assigned_location_ids": "some_location"} + self.assertEqual(self.validator.validate_parameters(params, True), []) - @patch('corehq.apps.registration.validation.domain_has_privilege', return_value=False) + @patch('corehq.apps.api.validation.domain_has_privilege', return_value=False) def test_validate_parameters_without_location_privilege(self, mock_domain_has_privilege): - params = {"email": "test@example.com", "role": "Admin", "primary_location": "some_location"} - self.assertEqual(self.validator.validate_parameters(params), - "This domain does not have locations privileges.") - - params = {"email": "test@example.com", "role": "Admin", "assigned_locations": "some_location"} - self.assertEqual(self.validator.validate_parameters(params), - "This domain does not have locations privileges.") + params = {"email": "test@example.com", "role": "Admin", "primary_location_id": "some_location"} + self.assertEqual(self.validator.validate_parameters(params, True), + ["This domain does not have locations privileges."]) + + params = {"email": "test@example.com", "role": "Admin", "assigned_location_ids": "some_location"} + self.assertEqual(self.validator.validate_parameters(params, True), + ["This domain does not have locations privileges."]) + + def test_validate_role(self): + self.assertIsNone(self.validator.validate_role("App Editor")) + self.assertEqual(self.validator.validate_role("Fake Role"), + "Role 'Fake Role' does not exist or you do not have permission to access it") + + def test_validate_role_with_no_role_input(self): + self.assertIsNone(self.validator.validate_role(None)) + + def test_validate_profile_with_no_profile_input(self): + self.definition.profile_required_for_user_type = [UserFieldsView.WEB_USER] + self.definition.save() + self.assertIsNone(self.validator.validate_profile(None, False)) + self.assertEqual(self.validator.validate_profile(None, True), + "A profile must be assigned to users of the following type(s): Web Users") + self.definition.profile_required_for_user_type = [] + self.definition.save() + + def test_validate_profile_with_conflicting_user_data(self): + self.assertEqual(self.validator.validate_custom_data_with_profile({'imaginary': 'yes'}, 'character'), + ["'imaginary' is defined by the profile so cannot be set directly"]) def test_validate_email(self): self.assertIsNone(self.validator.validate_email("newtest@example.com", True)) @@ -84,15 +143,32 @@ def test_validate_email(self): self.assertEqual(self.validator.validate_email("deactivated@example.com", True), "A user with this email address is deactivated. ") + def test_validate_email_with_no_email_input(self): + self.assertIsNone(self.validator.validate_email(None, True)) + def test_validate_locations(self): - with patch('corehq.apps.user_importer.validation.LocationValidator.validate_spec') as mock_validate_spec: - mock_validate_spec.return_value = None + self.assertIsNone(self.validator.validate_locations(self.requesting_user.username, + None, None)) + self.assertEqual( + self.validator.validate_locations(self.requesting_user.username, ["loc1", "loc2"], None), + "Both primary_location and locations must be provided together." + ) + self.assertEqual( + self.validator.validate_locations(self.requesting_user.username, None, 'loc1'), + "Both primary_location and locations must be provided together." + ) + + with patch( + 'corehq.apps.user_importer.validation.LocationValidator.validate_location_ids' + ) as mock_validate_location_ids: + mock_validate_location_ids.return_value = None self.assertIsNone(self.validator.validate_locations(self.requesting_user.username, ["loc1", "loc2"], "loc1")) - actual_spec = mock_validate_spec.call_args[0][0] - self.assertEqual(actual_spec['username'], self.requesting_user.username) - self.assertCountEqual(actual_spec['location_code'], ["loc1", "loc2"]) + user_result = mock_validate_location_ids.call_args[0][0] + self.assertEqual(user_result.editable_user.username, self.requesting_user.username) + location_ids = mock_validate_location_ids.call_args[0][1] + self.assertCountEqual(location_ids, ["loc1", "loc2"]) self.assertEqual( self.validator.validate_locations(self.requesting_user.username, ["loc1", "loc2"], "loc3"), diff --git a/corehq/apps/api/urls.py b/corehq/apps/api/urls.py index 5900994641bc..7a30a6cec112 100644 --- a/corehq/apps/api/urls.py +++ b/corehq/apps/api/urls.py @@ -182,6 +182,7 @@ def versioned_apis(api_list): fixtures.v0_6.LookupTableItemResource.get_urlpattern('v2'), v0_5.NavigationEventAuditResource.get_urlpattern('v1'), v1_0.CommCareAnalyticsUserResource.get_urlpattern('v1'), + v1_0.InvitationResource.get_urlpattern('v1'), ] diff --git a/corehq/apps/api/validation.py b/corehq/apps/api/validation.py index e0a1dad0deec..f11675472c85 100644 --- a/corehq/apps/api/validation.py +++ b/corehq/apps/api/validation.py @@ -1,8 +1,17 @@ from memoized import memoized +from dataclasses import dataclass, asdict +from typing import List -from corehq.apps.custom_data_fields.models import CustomDataFieldsDefinition +from django.utils.translation import gettext as _ + +from corehq import privileges +from corehq.apps.accounting.utils import domain_has_privilege +from corehq.apps.custom_data_fields.models import ( + CustomDataFieldsDefinition, + PROFILE_SLUG, +) from corehq.apps.reports.util import get_allowed_tableau_groups_for_domain -from corehq.apps.user_importer.importer import SiteCodeToLocationCache +from corehq.apps.users.models import CouchUser, Invitation from corehq.apps.user_importer.validation import ( RoleValidator, ProfileValidator, @@ -11,9 +20,32 @@ TableauRoleValidator, CustomDataValidator, EmailValidator, + UserAccessValidator, + UserRetrievalResult, ) from corehq.apps.users.validation import validate_primary_location_assignment from corehq.apps.registration.validation import AdminInvitesUserFormValidator +from corehq.toggles import TABLEAU_USER_SYNCING + + +@dataclass +class WebUserSpec: + email: str + role: str = None + primary_location_id: str = None + assigned_location_ids: List[str] = None + profile: str = None + custom_user_data: dict = None + tableau_role: str = None + tableau_groups: List[str] = None + unhandled_data: dict = None + + def get_full_spec(self): + spec_dict = {k: v for k, v in asdict(self).items() + if k != 'unhandled_data' and v is not None} + if self.unhandled_data: + spec_dict.update(self.unhandled_data) + return spec_dict class WebUserResourceValidator(): @@ -21,6 +53,31 @@ def __init__(self, domain, requesting_user): self.domain = domain self.requesting_user = requesting_user + def is_valid(self, spec: WebUserSpec, is_post): + errors = [] + validators = [ + (self.validate_parameters, [set(spec.get_full_spec().keys()), is_post]), + (self.validate_required_fields, [spec, is_post]), + (self.validate_role, [spec.role]), + (self.validate_profile, [spec.profile, is_post]), + (self.validate_custom_data, [spec.custom_user_data, spec.profile]), + (self.validate_custom_data_with_profile, [spec.custom_user_data, spec.profile]), + (self.validate_email, [spec.email, is_post]), + (self.validate_locations, [spec.email, spec.assigned_location_ids, spec.primary_location_id]), + (self.validate_user_access, [spec.email]), + (self.validate_tableau_group, [spec.tableau_groups]), + (self.validate_tableau_role, [spec.tableau_role]), + ] + + for validator, args in validators: + error = validator(*args) + if isinstance(error, list): + errors += error + elif error: + errors.append(error) + + return errors + @property def roles_by_name(self): from corehq.apps.users.views.utils import get_editable_role_choices @@ -33,34 +90,76 @@ def profiles_by_name(self): from corehq.apps.users.views.mobile.custom_data_fields import UserFieldsView return CustomDataFieldsDefinition.get_profiles_by_name(self.domain, UserFieldsView.field_type) - @property - def location_cache(self): - return SiteCodeToLocationCache(self.domain) - - def validate_parameters(self, parameters): - allowed_params = ['email', 'role', 'primary_location', 'assigned_locations', + def validate_parameters(self, parameters, is_post): + errors = [] + allowed_params = ['role', 'primary_location_id', 'assigned_location_ids', 'profile', 'custom_user_data', 'tableau_role', 'tableau_groups'] + if is_post: + allowed_params.append('email') invalid_params = [param for param in parameters if param not in allowed_params] if invalid_params: - return f"Invalid parameter(s): {', '.join(invalid_params)}" - return AdminInvitesUserFormValidator.validate_parameters(self.domain, self.requesting_user, parameters) + errors.append(f"Invalid parameter(s): {', '.join(invalid_params)}") + + if 'tableau_role' in parameters or 'tableau_groups' in parameters: + can_edit_tableau_config = ( + self.requesting_user.has_permission(self.domain, 'edit_user_tableau_config') + and TABLEAU_USER_SYNCING.enabled(self.domain) + ) + if not can_edit_tableau_config: + errors.append(_("You do not have permission to edit Tableau Configuration.")) + + if 'profile' in parameters and not domain_has_privilege(self.domain, privileges.APP_USER_PROFILES): + errors.append(_("This domain does not have user profile privileges.")) + + if (('primary_location_id' in parameters or 'assigned_location_ids' in parameters) + and not domain_has_privilege(self.domain, privileges.LOCATIONS)): + errors.append(_("This domain does not have locations privileges.")) + + return errors + + def validate_required_fields(self, spec: WebUserSpec, is_post): + email = spec.email + role = spec.role + if is_post: + if not email or not role: + return _("'email' and 'role' are required for each user") + else: + if role == '': + return _("'role' is required for each user") def validate_role(self, role): spec = {'role': role} - return RoleValidator(self.domain, self.roles_by_name()).validate_spec(spec) + return RoleValidator(self.domain, self.roles_by_name).validate_spec(spec) - def validate_profile(self, new_profile_name): - profile_validator = ProfileValidator(self.domain, self.requesting_user, True, self.profiles_by_name()) + def validate_profile(self, new_profile_name, is_post): + if not is_post and new_profile_name is None: + return None + profile_validator = ProfileValidator(self.domain, self.requesting_user, True, self.profiles_by_name) spec = {'user_profile': new_profile_name} return profile_validator.validate_spec(spec) def validate_custom_data(self, custom_data, profile_name): - custom_data_validator = CustomDataValidator(self.domain, self.profiles_by_name()) + custom_data_validator = CustomDataValidator(self.domain, self.profiles_by_name, True) spec = {'data': custom_data, 'user_profile': profile_name} return custom_data_validator.validate_spec(spec) + def validate_custom_data_with_profile(self, custom_data, profile_name): + if custom_data is None or profile_name is None: + return + + errors = [] + profile = self.profiles_by_name.get(profile_name) + + system_fields = set(profile.fields.keys()) if profile else set() + system_fields.add(PROFILE_SLUG) + + for key in custom_data.keys(): + if key in system_fields: + errors.append(_("'{}' is defined by the profile so cannot be set directly").format(key)) + return errors + def validate_email(self, email, is_post): - if is_post: + if is_post and email is not None: error = AdminInvitesUserFormValidator.validate_email(self.domain, email) if error: return error @@ -68,20 +167,41 @@ def validate_email(self, email, is_post): spec = {'email': email} return email_validator.validate_spec(spec) - def validate_locations(self, editable_user, assigned_location_codes, primary_location_code): - error = validate_primary_location_assignment(primary_location_code, assigned_location_codes) + def validate_locations(self, editable_user, assigned_location_ids, primary_location_id): + if assigned_location_ids is None and primary_location_id is None: + return + if ((assigned_location_ids is not None and primary_location_id is None) + or (assigned_location_ids is None and primary_location_id is not None)): + return _('Both primary_location and locations must be provided together.') + + error = validate_primary_location_assignment(primary_location_id, assigned_location_ids) if error: return error + location_validator = LocationValidator(self.domain, self.requesting_user, None, True) + user_result = self._get_invitation_or_editable_user(editable_user, self.domain) + return location_validator.validate_location_ids(user_result, assigned_location_ids) - location_validator = LocationValidator(self.domain, self.requesting_user, self.location_cache, True) - location_codes = list(set(assigned_location_codes + [primary_location_code])) - spec = {'location_code': location_codes, - 'username': editable_user} - return location_validator.validate_spec(spec) + def validate_user_access(self, editable_user): + user_access_validator = UserAccessValidator(self.domain, self.requesting_user, True) + spec = {'username': editable_user} + return user_access_validator.validate_spec(spec) def validate_tableau_group(self, tableau_groups): + if tableau_groups is None: + return allowed_groups_for_domain = get_allowed_tableau_groups_for_domain(self.domain) or [] return TableauGroupsValidator.validate_tableau_groups(allowed_groups_for_domain, tableau_groups) def validate_tableau_role(self, tableau_role): + if tableau_role is None: + return return TableauRoleValidator.validate_tableau_role(tableau_role) + + def _get_invitation_or_editable_user(self, username_or_email, domain) -> UserRetrievalResult: + editable_user = None + try: + invitation = Invitation.objects.get(domain=domain, email=username_or_email, is_accepted=False) + return UserRetrievalResult(invitation=invitation) + except Invitation.DoesNotExist: + editable_user = CouchUser.get_by_username(username_or_email, strict=True) + return UserRetrievalResult(editable_user=editable_user) diff --git a/corehq/apps/reports/util.py b/corehq/apps/reports/util.py index 6d78637dd774..8e5d4701b344 100644 --- a/corehq/apps/reports/util.py +++ b/corehq/apps/reports/util.py @@ -486,24 +486,27 @@ def _notify_tableau_exception(e, domain): def get_tableau_groups_by_ids(interested_group_ids: List, domain: str, session: TableauAPISession = None) -> List[TableauGroupTuple]: - session = session or TableauAPISession.create_session_for_domain(domain) - group_json = session.query_groups() + group_json = get_tableau_group_json(domain, session) filtered_group_json = [group for group in group_json if group['id'] in interested_group_ids] return _group_json_to_tuples(filtered_group_json) -@quickcache(['domain'], timeout=2 * 60) def get_tableau_group_ids_by_names(group_names: List, domain: str, session: TableauAPISession = None) -> List[str]: ''' Returns a list of all Tableau group ids on the site derived from tableau group names passed in. ''' - session = session or TableauAPISession.create_session_for_domain(domain) - group_json = session.query_groups() + group_json = get_tableau_group_json(domain, session) filtered_group_json = [group for group in group_json if group['name'] in group_names] return [tup.id for tup in _group_json_to_tuples(filtered_group_json)] +@quickcache(['domain'], timeout=2 * 60) +def get_tableau_group_json(domain: str, session: TableauAPISession = None): + session = session or TableauAPISession.create_session_for_domain(domain) + return session.query_groups() + + def get_matching_tableau_users_from_other_domains(user): return list(TableauUser.objects.filter( username=user.username, diff --git a/corehq/apps/user_importer/importer.py b/corehq/apps/user_importer/importer.py index 4974c5d96c64..55516b7f7f4e 100644 --- a/corehq/apps/user_importer/importer.py +++ b/corehq/apps/user_importer/importer.py @@ -334,14 +334,14 @@ def create_or_update_web_user_invite(email, domain, role_qualified_id, upload_us email=email, domain=domain, is_accepted=False, - tableau_role=tableau_role, - tableau_group_ids=tableau_group_ids, defaults={ 'invited_by': upload_user.user_id, 'invited_on': datetime.utcnow(), + 'tableau_role': tableau_role, + 'tableau_group_ids': tableau_group_ids, 'primary_location': SQLLocation.by_location_id(primary_location_id), 'role': role_qualified_id, - 'profile': profile + 'profile': profile, }, ) assigned_locations = [SQLLocation.by_location_id(assigned_location_id) diff --git a/corehq/apps/user_importer/tests/test_validators.py b/corehq/apps/user_importer/tests/test_validators.py index 8342cfb68e62..e2e617117635 100644 --- a/corehq/apps/user_importer/tests/test_validators.py +++ b/corehq/apps/user_importer/tests/test_validators.py @@ -30,6 +30,7 @@ CustomDataValidator, TableauRoleValidator, TableauGroupsValidator, + UserAccessValidator, ) from corehq.apps.users.dbaccessors import delete_all_users from corehq.apps.users.models import CommCareUser, HqPermissions, Invitation, WebUser @@ -338,6 +339,7 @@ def setUpClass(cls): cls.editable_user = WebUser.create(cls.domain, 'editable-user', 'password', None, None) cls.validator = LocationValidator(cls.domain, cls.upload_user, SiteCodeToLocationCache(cls.domain), True) + cls.user_access_validator = UserAccessValidator(cls.domain, cls.upload_user, True) def test_success(self): self.editable_user.reset_locations(self.domain, [self.locations['Cambridge'].location_id]) @@ -352,27 +354,26 @@ def test_cant_edit_web_user(self): user_spec = {'username': self.editable_user.username, 'location_code': [self.locations['Middlesex'].site_code, self.locations['Cambridge'].site_code]} - validation_result = self.validator.validate_spec(user_spec) - assert validation_result == self.validator.error_message_user_access + validation_result = self.user_access_validator.validate_spec(user_spec) + assert validation_result == self.user_access_validator.error_message_user_access user_spec = {'username': self.editable_user.username} - validation_result = self.validator.validate_spec(user_spec) - assert validation_result == self.validator.error_message_user_access + validation_result = self.user_access_validator.validate_spec(user_spec) + assert validation_result == self.user_access_validator.error_message_user_access def test_cant_edit_commcare_user(self): - self.cc_user_validator = LocationValidator(self.domain, self.upload_user, - SiteCodeToLocationCache(self.domain), False) + self.cc_user_validator = UserAccessValidator(self.domain, self.upload_user, False) self.editable_cc_user = CommCareUser.create(self.domain, 'cc-username', 'password', None, None) self.editable_cc_user.reset_locations([self.locations['Suffolk'].location_id]) user_spec = {'user_id': self.editable_cc_user._id, 'location_code': [self.locations['Middlesex'].site_code, self.locations['Cambridge'].site_code]} validation_result = self.cc_user_validator.validate_spec(user_spec) - assert validation_result == self.validator.error_message_user_access + assert validation_result == self.user_access_validator.error_message_user_access user_spec = {'user_id': self.editable_cc_user._id} validation_result = self.cc_user_validator.validate_spec(user_spec) - assert validation_result == self.validator.error_message_user_access + assert validation_result == self.user_access_validator.error_message_user_access def test_cant_edit_invitation(self): self.invitation = Invitation.objects.create( @@ -385,12 +386,12 @@ def test_cant_edit_invitation(self): user_spec = {'username': self.invitation.email, 'location_code': [self.locations['Middlesex'].site_code, self.locations['Cambridge'].site_code]} - validation_result = self.validator.validate_spec(user_spec) - assert validation_result == self.validator.error_message_user_access + validation_result = self.user_access_validator.validate_spec(user_spec) + assert validation_result == self.user_access_validator.error_message_user_access user_spec = {'username': self.invitation.email} - validation_result = self.validator.validate_spec(user_spec) - assert validation_result == self.validator.error_message_user_access + validation_result = self.user_access_validator.validate_spec(user_spec) + assert validation_result == self.user_access_validator.error_message_user_access def test_cant_add_location(self): self.editable_user.reset_locations(self.domain, [self.locations['Cambridge'].location_id]) diff --git a/corehq/apps/user_importer/validation.py b/corehq/apps/user_importer/validation.py index 0cb2c2bc1ac2..bfbe78e63e56 100644 --- a/corehq/apps/user_importer/validation.py +++ b/corehq/apps/user_importer/validation.py @@ -49,7 +49,8 @@ def get_user_import_validators(domain_obj, all_specs, is_web_user_import, all_us ExistingUserValidator(domain, all_specs), TargetDomainValidator(upload_domain), ProfileValidator(domain, upload_user, is_web_user_import, all_user_profiles_by_name), - LocationValidator(domain, upload_user, location_cache, is_web_user_import) + LocationValidator(domain, upload_user, location_cache, is_web_user_import), + UserAccessValidator(domain, upload_user, is_web_user_import) ] if is_web_user_import: return validators + [RequiredWebFieldsValidator(domain), DuplicateValidator(domain, 'email', all_specs), @@ -501,15 +502,13 @@ def validate_spec(self, spec): return self.error_existing_user.format(self.confirmation_sms_header, errors_formatted) -class LocationValidator(ImportValidator): +class UserAccessValidator(ImportValidator): error_message_user_access = _("Based on your locations you do not have permission to edit this user or user " "invitation") - error_message_location_access = _("You do not have permission to assign or remove these locations: {}") - def __init__(self, domain, upload_user, location_cache, is_web_user_import): + def __init__(self, domain, upload_user, is_web_user_import): super().__init__(domain) self.upload_user = upload_user - self.location_cache = location_cache self.is_web_user_import = is_web_user_import def validate_spec(self, spec): @@ -518,15 +517,41 @@ def validate_spec(self, spec): if user_access_error: return user_access_error + def _validate_uploading_user_access_to_editable_user_or_invitation(self, user_result): + # Get current locations for editable user or user invitation and ensure uploading user + # can access those locations + if user_result.invitation: + if not user_can_access_invite(self.domain, self.upload_user, user_result.invitation): + return self.error_message_user_access.format(user_result.invitation.email) + elif user_result.editable_user: + if not user_can_access_other_user(self.domain, self.upload_user, user_result.editable_user): + return self.error_message_user_access.format(user_result.editable_user.username) + + +class LocationValidator(ImportValidator): + error_message_location_access = _("You do not have permission to assign or remove these locations: {}") + + def __init__(self, domain, upload_user, location_cache, is_web_user_import): + super().__init__(domain) + self.upload_user = upload_user + self.location_cache = location_cache + self.is_web_user_import = is_web_user_import + + def validate_spec(self, spec): if 'location_code' in spec: + user_result = _get_invitation_or_editable_user(spec, self.is_web_user_import, self.domain) locs_being_assigned = self._get_locs_ids_being_assigned(spec) - current_locs = self._get_current_locs(user_result) + return self.validate_location_ids(user_result, locs_being_assigned) - user_location_access_error = self._validate_user_location_permission(current_locs, locs_being_assigned) - location_cannot_have_users_error = None - if toggles.USH_RESTORE_FILE_LOCATION_CASE_SYNC_RESTRICTION.enabled(self.domain): - location_cannot_have_users_error = self._validate_locations_allow_users(locs_being_assigned) - return user_location_access_error or location_cannot_have_users_error + def validate_location_ids(self, user_result, location_ids_being_assigned): + current_loc_ids = self._get_current_loc_ids(user_result) + + user_location_access_error = self._validate_user_location_permission(current_loc_ids, + location_ids_being_assigned) + location_cannot_have_users_error = None + if toggles.USH_RESTORE_FILE_LOCATION_CASE_SYNC_RESTRICTION.enabled(self.domain): + location_cannot_have_users_error = self._validate_locations_allow_users(location_ids_being_assigned) + return user_location_access_error or location_cannot_have_users_error def _get_locs_ids_being_assigned(self, spec): from corehq.apps.user_importer.importer import find_location_id @@ -535,29 +560,19 @@ def _get_locs_ids_being_assigned(self, spec): locs_ids_being_assigned = find_location_id(location_codes, self.location_cache) return locs_ids_being_assigned - def _get_current_locs(self, user_result): - current_locs = [] - if user_result.invitation: - current_locs = user_result.invitation.assigned_locations.all() - elif user_result.editable_user: - current_locs = user_result.editable_user.get_location_ids(self.domain) - return current_locs - - def _validate_uploading_user_access_to_editable_user_or_invitation(self, user_result): - # Get current locations for editable user or user invitation and ensure uploading user - # can access those locations + def _get_current_loc_ids(self, user_result): + current_loc_ids = [] if user_result.invitation: - if not user_can_access_invite(self.domain, self.upload_user, user_result.invitation): - return self.error_message_user_access.format(user_result.invitation.email) + current_loc_ids = [loc.location_id for loc in user_result.invitation.assigned_locations.all()] elif user_result.editable_user: - if not user_can_access_other_user(self.domain, self.upload_user, user_result.editable_user): - return self.error_message_user_access.format(user_result.editable_user.username) + current_loc_ids = user_result.editable_user.get_location_ids(self.domain) + return current_loc_ids - def _validate_user_location_permission(self, current_locs, locs_ids_being_assigned): + def _validate_user_location_permission(self, current_loc_ids, locs_ids_being_assigned): # Ensure the uploading user is only adding the user to/removing from *new locations* that # the uploading user has permission to access. problem_location_ids = user_can_change_locations(self.domain, self.upload_user, - current_locs, locs_ids_being_assigned) + current_loc_ids, locs_ids_being_assigned) if problem_location_ids: return self.error_message_location_access.format( ', '.join(SQLLocation.objects.filter(