Skip to content

Commit

Permalink
Merge pull request #35448 from dimagi/jt/create-user-api-validation
Browse files Browse the repository at this point in the history
Create Invitation API [Validation]
  • Loading branch information
Jtang-1 authored Dec 6, 2024
2 parents 6e52bf2 + 3dfa157 commit cbac4e4
Show file tree
Hide file tree
Showing 17 changed files with 590 additions and 179 deletions.
105 changes: 105 additions & 0 deletions corehq/apps/api/tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from django.test import TestCase
from unittest.mock import patch

from corehq.apps.api.validation import WebUserResourceValidator
from corehq.apps.domain.models import Domain
from corehq.apps.users.models import WebUser
from corehq.util.test_utils import flag_enabled, flag_disabled


class TestWebUserResourceValidator(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.domain = Domain(name="test-domain", is_active=True)
cls.domain.save()
cls.addClassCleanup(cls.domain.delete)
cls.requesting_user = WebUser.create(cls.domain.name, "[email protected]", "123", None, None)
cls.validator = WebUserResourceValidator(cls.domain.name, cls.requesting_user)

@classmethod
def tearDownClass(cls):
cls.requesting_user.delete(None, None)
super().tearDownClass()

def test_validate_parameters(self):
params = {"email": "[email protected]", "role": "Admin"}
self.assertIsNone(self.validator.validate_parameters(params))

invalid_params = {"invalid_param": "value"}
self.assertEqual(self.validator.validate_parameters(invalid_params), "Invalid parameter(s): invalid_param")

@flag_enabled('TABLEAU_USER_SYNCING')
@patch('corehq.apps.users.models.WebUser.has_permission', return_value=True)
def test_validate_parameters_with_tableau_edit_permission(self, mock_has_permission):
params = {"email": "[email protected]", "role": "Admin", "tableau_role": "Viewer"}
self.assertIsNone(self.validator.validate_parameters(params))

@flag_disabled('TABLEAU_USER_SYNCING')
@patch('corehq.apps.users.models.WebUser.has_permission', return_value=False)
def test_validate_parameters_without_tableau_edit_permission(self, mock_has_permission):
params = {"email": "[email protected]", "role": "Admin", "tableau_role": "Viewer"}
self.assertEqual(self.validator.validate_parameters(params),
"You do not have permission to edit Tableau Configuration.")

@patch('corehq.apps.registration.validation.domain_has_privilege', return_value=True)
def test_validate_parameters_with_profile_permission(self, mock_domain_has_privilege):
params = {"email": "[email protected]", "role": "Admin", "profile": "some_profile"}
self.assertIsNone(self.validator.validate_parameters(params))

@patch('corehq.apps.registration.validation.domain_has_privilege', return_value=False)
def test_validate_parameters_without_profile_permission(self, mock_domain_has_privilege):
params = {"email": "[email protected]", "role": "Admin", "profile": "some_profile"}
self.assertEqual(self.validator.validate_parameters(params),
"This domain does not have user profile privileges.")

@patch('corehq.apps.registration.validation.domain_has_privilege', return_value=True)
def test_validate_parameters_with_location_privilege(self, mock_domain_has_privilege):
params = {"email": "[email protected]", "role": "Admin", "primary_location": "some_location"}
self.assertIsNone(self.validator.validate_parameters(params))
params = {"email": "[email protected]", "role": "Admin", "assigned_locations": "some_location"}
self.assertIsNone(self.validator.validate_parameters(params))

@patch('corehq.apps.registration.validation.domain_has_privilege', return_value=False)
def test_validate_parameters_without_location_privilege(self, mock_domain_has_privilege):
params = {"email": "[email protected]", "role": "Admin", "primary_location": "some_location"}
self.assertEqual(self.validator.validate_parameters(params),
"This domain does not have locations privileges.")

params = {"email": "[email protected]", "role": "Admin", "assigned_locations": "some_location"}
self.assertEqual(self.validator.validate_parameters(params),
"This domain does not have locations privileges.")

def test_validate_email(self):
self.assertIsNone(self.validator.validate_email("[email protected]", True))

self.assertEqual(self.validator.validate_email("[email protected]", True),
"A user with this email address is already in "
"this project or has a pending invitation.")

deactivated_user = WebUser.create(self.domain.name, "[email protected]", "123", None, None)
deactivated_user.is_active = False
deactivated_user.save()
self.assertEqual(self.validator.validate_email("[email protected]", True),
"A user with this email address is deactivated. ")

def test_validate_locations(self):
with patch('corehq.apps.user_importer.validation.LocationValidator.validate_spec') as mock_validate_spec:
mock_validate_spec.return_value = None
self.assertIsNone(self.validator.validate_locations(self.requesting_user.username,
["loc1", "loc2"], "loc1"))

actual_spec = mock_validate_spec.call_args[0][0]
self.assertEqual(actual_spec['username'], self.requesting_user.username)
self.assertCountEqual(actual_spec['location_code'], ["loc1", "loc2"])

self.assertEqual(
self.validator.validate_locations(self.requesting_user.username, ["loc1", "loc2"], "loc3"),
"Primary location must be one of the user's locations"
)

self.assertEqual(
self.validator.validate_locations(self.requesting_user.username, ["loc1", "loc2"], ""),
"Primary location can't be empty if the user has any locations set"
)
87 changes: 87 additions & 0 deletions corehq/apps/api/validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from memoized import memoized

from corehq.apps.custom_data_fields.models import CustomDataFieldsDefinition
from corehq.apps.reports.util import get_allowed_tableau_groups_for_domain
from corehq.apps.user_importer.importer import SiteCodeToLocationCache
from corehq.apps.user_importer.validation import (
RoleValidator,
ProfileValidator,
LocationValidator,
TableauGroupsValidator,
TableauRoleValidator,
CustomDataValidator,
EmailValidator,
)
from corehq.apps.users.validation import validate_primary_location_assignment
from corehq.apps.registration.validation import AdminInvitesUserFormValidator


class WebUserResourceValidator():
def __init__(self, domain, requesting_user):
self.domain = domain
self.requesting_user = requesting_user

@property
def roles_by_name(self):
from corehq.apps.users.views.utils import get_editable_role_choices
return {role[1]: role[0] for role in get_editable_role_choices(self.domain, self.requesting_user,
allow_admin_role=True)}

@property
@memoized
def profiles_by_name(self):
from corehq.apps.users.views.mobile.custom_data_fields import UserFieldsView
return CustomDataFieldsDefinition.get_profiles_by_name(self.domain, UserFieldsView.field_type)

@property
def location_cache(self):
return SiteCodeToLocationCache(self.domain)

def validate_parameters(self, parameters):
allowed_params = ['email', 'role', 'primary_location', 'assigned_locations',
'profile', 'custom_user_data', 'tableau_role', 'tableau_groups']
invalid_params = [param for param in parameters if param not in allowed_params]
if invalid_params:
return f"Invalid parameter(s): {', '.join(invalid_params)}"
return AdminInvitesUserFormValidator.validate_parameters(self.domain, self.requesting_user, parameters)

def validate_role(self, role):
spec = {'role': role}
return RoleValidator(self.domain, self.roles_by_name()).validate_spec(spec)

def validate_profile(self, new_profile_name):
profile_validator = ProfileValidator(self.domain, self.requesting_user, True, self.profiles_by_name())
spec = {'user_profile': new_profile_name}
return profile_validator.validate_spec(spec)

def validate_custom_data(self, custom_data, profile_name):
custom_data_validator = CustomDataValidator(self.domain, self.profiles_by_name())
spec = {'data': custom_data, 'user_profile': profile_name}
return custom_data_validator.validate_spec(spec)

def validate_email(self, email, is_post):
if is_post:
error = AdminInvitesUserFormValidator.validate_email(self.domain, email)
if error:
return error
email_validator = EmailValidator(self.domain, 'email')
spec = {'email': email}
return email_validator.validate_spec(spec)

def validate_locations(self, editable_user, assigned_location_codes, primary_location_code):
error = validate_primary_location_assignment(primary_location_code, assigned_location_codes)
if error:
return error

location_validator = LocationValidator(self.domain, self.requesting_user, self.location_cache, True)
location_codes = list(set(assigned_location_codes + [primary_location_code]))
spec = {'location_code': location_codes,
'username': editable_user}
return location_validator.validate_spec(spec)

def validate_tableau_group(self, tableau_groups):
allowed_groups_for_domain = get_allowed_tableau_groups_for_domain(self.domain) or []
return TableauGroupsValidator.validate_tableau_groups(allowed_groups_for_domain, tableau_groups)

def validate_tableau_role(self, tableau_role):
return TableauRoleValidator.validate_tableau_role(tableau_role)
12 changes: 12 additions & 0 deletions corehq/apps/custom_data_fields/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ def get_profile_required_for_user_type_list(cls, domain, field_type):
return profile_required_for_user_type_list
return None

@classmethod
def get_profiles_by_name(cls, domain, field_type):
definition = cls.get(domain, field_type)
if definition:
profiles = definition.get_profiles()
return {
profile.name: profile
for profile in profiles
}
else:
return {}

class FieldFilterConfig:
def __init__(self, required_only=False, is_required_check_func=None):
self.required_only = required_only
Expand Down
43 changes: 19 additions & 24 deletions corehq/apps/registration/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from corehq.apps.programs.models import Program
from corehq.toggles import WEB_USER_INVITE_ADDITIONAL_FIELDS
from corehq.apps.users.forms import SelectUserLocationForm, BaseTableauUserForm
from corehq.apps.users.models import CouchUser, WebUser
from corehq.apps.users.models import CouchUser


class RegisterWebUserForm(forms.Form):
Expand Down Expand Up @@ -433,7 +433,7 @@ def clean_eula_confirmed(self):
return data


class WebUserInvitationForm(BaseUserInvitationForm):
class AcceptedWebUserInvitationForm(BaseUserInvitationForm):
"""
Form for a brand new user, before they've created a domain or done anything on CommCare HQ.
"""
Expand Down Expand Up @@ -492,7 +492,7 @@ class AdminInvitesUserForm(SelectUserLocationForm):
max_length=User._meta.get_field('email').max_length)
role = forms.ChoiceField(choices=(), label="Project Role")

def __init__(self, data=None, excluded_emails=None, is_add_user=None,
def __init__(self, data=None, is_add_user=None,
role_choices=(), should_show_location=False, can_edit_tableau_config=False,
custom_data=None, *, domain, **kwargs):
self.custom_data = custom_data
Expand Down Expand Up @@ -520,8 +520,6 @@ def __init__(self, data=None, excluded_emails=None, is_add_user=None,
choices = [('', '')] + list((prog.get_id, prog.name) for prog in programs)
self.fields['program'].choices = choices

self.excluded_emails = [x.lower() for x in excluded_emails] if excluded_emails else []

if self.can_edit_tableau_config:
self._initialize_tableau_fields(data, domain)

Expand Down Expand Up @@ -554,6 +552,9 @@ def __init__(self, data=None, excluded_emails=None, is_add_user=None,
'primary_location',
)
)
else:
self.fields.pop('assigned_locations', None)
self.fields.pop('primary_location', None)
if self.can_edit_tableau_config:
fields.append(
crispy.Fieldset(
Expand Down Expand Up @@ -589,30 +590,17 @@ def __init__(self, data=None, excluded_emails=None, is_add_user=None,
),
)

def _validate_profile(self, profile_id):
valid_profile_ids = {choice[0] for choice in self.custom_data.form.fields[PROFILE_SLUG].widget.choices}
if profile_id and profile_id not in valid_profile_ids:
raise forms.ValidationError(
_('Invalid profile selected. Please select a valid profile.'),
)

def clean_email(self):
email = self.cleaned_data['email'].strip()
if email.lower() in self.excluded_emails:
raise forms.ValidationError(_("A user with this email address is already in "
"this project or has a pending invitation."))
web_user = WebUser.get_by_username(email)
if web_user and not web_user.is_active:
raise forms.ValidationError(_("A user with this email address is deactivated. "))

from corehq.apps.registration.validation import AdminInvitesUserFormValidator
error = AdminInvitesUserFormValidator.validate_email(self.domain, email)
if error:
raise forms.ValidationError(error)
return email

def clean(self):
cleaned_data = super(AdminInvitesUserForm, self).clean()

if (('tableau_role' in cleaned_data or 'tableau_group_indices' in cleaned_data)
and not self.can_edit_tableau_config):
raise forms.ValidationError(_("You do not have permission to edit Tableau Configuraion."))

if 'tableau_group_indices' in cleaned_data:
cleaned_data['tableau_group_ids'] = [
self.tableau_form.allowed_tableau_groups[int(i)].id
Expand All @@ -631,10 +619,17 @@ def clean(self):

if prefixed_profile_key in custom_user_data:
profile_id = custom_user_data.pop(prefixed_profile_key)
self._validate_profile(profile_id)
cleaned_data['profile'] = profile_id
cleaned_data['custom_user_data'] = get_prefixed(custom_user_data, self.custom_data.prefix)

from corehq.apps.registration.validation import AdminInvitesUserFormValidator
error = AdminInvitesUserFormValidator.validate_parameters(
self.domain,
self.request.couch_user,
cleaned_data.keys()
)
if error:
raise forms.ValidationError(error)
return cleaned_data

def _initialize_tableau_fields(self, data, domain):
Expand Down
Loading

0 comments on commit cbac4e4

Please sign in to comment.