Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/es/location-owners' into autosta…
Browse files Browse the repository at this point in the history
…ging
  • Loading branch information
orangejenny committed Mar 27, 2024
2 parents 4e67880 + 599b268 commit 08a3a9d
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 66 deletions.
2 changes: 1 addition & 1 deletion corehq/apps/callcenter/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def get_call_center_cases(domain_name, case_type, user=None):

if user:
case_ids = CommCareCase.objects.get_open_case_ids_in_domain_by_type(
domain_name, case_type=case_type, owner_ids=user.get_owner_ids())
domain_name, case_type=case_type, owner_ids=user.get_owner_ids(domain_name))
else:
case_ids = CommCareCase.objects.get_open_case_ids_in_domain_by_type(
domain_name, case_type=case_type)
Expand Down
18 changes: 0 additions & 18 deletions corehq/apps/locations/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,23 +779,5 @@ def for_domain(cls, domain):
return cls(domain=domain)


def get_case_sharing_groups_for_locations(locations, for_user_id=None):
# safety check to make sure all locations belong to same domain
assert len({location.domain for location in locations}) <= 1

for location in locations:
if location.location_type.shares_cases:
yield location.case_sharing_group_object(for_user_id)

location_ids = [location.pk for location in locations if location.location_type.view_descendants]
descendants = []
if location_ids:
where = Q(domain=locations[0].domain, parent_id__in=location_ids)
descendants = SQLLocation.objects.get_queryset_descendants(where).filter(
location_type__shares_cases=True, is_archived=False)
for loc in descendants:
yield loc.case_sharing_group_object(for_user_id)


def get_domain_locations(domain):
return SQLLocation.active_objects.filter(domain=domain)
4 changes: 2 additions & 2 deletions corehq/apps/locations/tests/test_location_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_cant_save_wont_save(self):
def test_get_owner_ids(self):
loc_type = self.loc.location_type
self.assertFalse(loc_type.shares_cases)
owner_ids = self.user.get_owner_ids()
owner_ids = self.user.get_owner_ids(self.domain.name)
self.assertEqual(1, len(owner_ids))
self.assertEqual(self.user._id, owner_ids[0])

Expand All @@ -129,7 +129,7 @@ def test_get_owner_ids(self):
loc_type.save()
# we have to re-create the user object because various things are cached
user = CommCareUser.wrap(self.user.to_json())
owner_ids = user.get_owner_ids()
owner_ids = user.get_owner_ids(self.domain.name)
self.assertEqual(2, len(owner_ids))
self.assertEqual(self.loc.location_id, owner_ids[1])

Expand Down
4 changes: 1 addition & 3 deletions corehq/apps/locations/tests/test_location_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,13 @@ def setUp(self):
first_name='Location types',
last_name='Tester',
)
self.addCleanup(self.user.delete, self.domain, deleted_by=None)

@classmethod
def tearDownClass(cls):
cls.project.delete()
super(TestLocationTypeOwnership, cls).tearDownClass()

def tearDown(self):
self.user.delete(self.domain, deleted_by=None)

def test_no_case_sharing(self):
no_case_sharing_type = make_loc_type('no-case-sharing', domain=self.domain)
location = make_loc('loc', type=no_case_sharing_type.name, domain=self.domain)
Expand Down
50 changes: 24 additions & 26 deletions corehq/apps/users/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@
from .user_data import SQLUserData # noqa
from corehq import toggles, privileges
from corehq.apps.accounting.utils import domain_has_privilege
from corehq.apps.locations.models import (
get_case_sharing_groups_for_locations,
)

WEB_USER = 'web'
COMMCARE_USER = 'commcare'
Expand Down Expand Up @@ -1135,6 +1132,19 @@ def get_user_session_data(self, domain):
})
return session_data

def _get_case_owning_locations(self, domain):
"""
:return: queryset of case-owning locations either directly assigned to the
user or descendant from an assigned location that views descendants
"""
from corehq.apps.locations.models import SQLLocation

yield from self.get_sql_locations(domain).filter(location_type__shares_cases=True)

yield from SQLLocation.objects.get_queryset_descendants(
self.get_sql_locations(domain).filter(location_type__view_descendants=True)
).filter(location_type__shares_cases=True, is_archived=False)

def delete(self, deleted_by_domain, deleted_by, deleted_via=None):
from corehq.apps.users.model_log import UserModelAction

Expand Down Expand Up @@ -1805,9 +1815,9 @@ def _get_deleted_form_ids(self):
def _get_deleted_case_ids(self):
return CommCareCase.objects.get_deleted_case_ids_by_owner(self.domain, self.user_id)

def get_owner_ids(self, domain=None):
def get_owner_ids(self, domain):
owner_ids = [self.user_id]
owner_ids.extend([g._id for g in self.get_case_sharing_groups()])
owner_ids.extend(g._id for g in self.get_case_sharing_groups())
return owner_ids

def unretire(self, unretired_by_domain, unretired_by, unretired_via=None):
Expand Down Expand Up @@ -1915,10 +1925,10 @@ def get_case_sharing_groups(self):
)

# get faked location group objects
groups = list(get_case_sharing_groups_for_locations(
self.get_sql_locations(self.domain),
self._id
))
groups = [
location.case_sharing_group_object(self._id)
for location in self._get_case_owning_locations(self.domain)
]
groups += [group for group in Group.by_user_id(self._id) if group.case_sharing]

has_at_privilege = domain_has_privilege(self.domain, privileges.ATTENDANCE_TRACKING)
Expand All @@ -1932,23 +1942,6 @@ def get_reporting_groups(self):
from corehq.apps.groups.models import Group
return [group for group in Group.by_user_id(self._id) if group.reporting]

@classmethod
def cannot_share(cls, domain, limit=None, skip=0):
users_checked = list(cls.by_domain(domain, limit=limit, skip=skip))
if not users_checked:
# stop fetching when you come back with none
return []
users = [user for user in users_checked if len(user.get_case_sharing_groups()) != 1]
if limit is not None:
total = cls.total_by_domain(domain)
max_limit = min(total - skip, limit)
if len(users) < max_limit:
new_limit = max_limit - len(users_checked)
new_skip = skip + len(users_checked)
users.extend(cls.cannot_share(domain, new_limit, new_skip))
return users
return users

def get_group_ids(self):
from corehq.apps.groups.models import Group
return Group.by_user_id(self._id, wrap=False)
Expand Down Expand Up @@ -2430,6 +2423,11 @@ def to_ota_restore_user(self, domain, request_user=None):
request_user=request_user
)

def get_owner_ids(self, domain):
owner_ids = [self.user_id]
owner_ids.extend(loc.location_id for loc in self._get_case_owning_locations(domain))
return owner_ids

@quickcache(['self._id', 'domain'], lambda _: settings.UNIT_TESTING)
def get_usercase_id(self, domain):
case = self.get_usercase_by_domain(domain)
Expand Down
80 changes: 71 additions & 9 deletions corehq/apps/users/tests/test_get_owner_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,103 @@

from corehq.apps.domain.models import Domain
from corehq.apps.groups.models import Group
from corehq.apps.users.models import CommCareUser
from corehq.apps.locations.tests.util import LocationHierarchyTestCase
from corehq.apps.users.models import CommCareUser, WebUser


class OwnerIDTestCase(TestCase):
domain = 'OwnerIDTestCase'

@staticmethod
def _mock_user(id):
@classmethod
def _mock_user(cls, id):
class FakeUser(CommCareUser):

@property
def project(self):
return Domain()

user = FakeUser(_id=id, domain='test-domain')
user = FakeUser(_id=id, domain=cls.domain)
return user

def test_get_owner_id_no_groups(self):
user = self._mock_user('test-user-1')
ids = user.get_owner_ids()
ids = user.get_owner_ids(self.domain)
self.assertEqual(1, len(ids))
self.assertEqual(user._id, ids[0])

def test_case_sharing_groups_included(self):
user = self._mock_user('test-user-2')
group = Group(domain='test-domain', users=['test-user-2'], case_sharing=True)
group = Group(domain=self.domain, users=['test-user-2'], case_sharing=True)
group.save()
ids = user.get_owner_ids()
ids = user.get_owner_ids(self.domain)
self.assertEqual(2, len(ids))
self.assertEqual(user._id, ids[0])
self.assertEqual(group._id, ids[1])

def test_non_case_sharing_groups_not_included(self):
user = self._mock_user('test-user-3')
group = Group(domain='test-domain', users=['test-user-3'], case_sharing=False)
group = Group(domain=self.domain, users=['test-user-3'], case_sharing=False)
group.save()
ids = user.get_owner_ids()
ids = user.get_owner_ids(self.domain)
self.assertEqual(1, len(ids))
self.assertEqual(user._id, ids[0])


class LocationOwnerIdTests(LocationHierarchyTestCase):
domain = 'LocationOwnerIdTests'
location_type_names = ['state', 'county', 'city']
location_structure = [
('Massachusetts', [
('Middlesex', [
('Cambridge', []),
('Somerville', []),
]),
('Suffolk', [
('Boston', []),
('Revere', []),
])
]),
('New York', [
('New York City', [
('Manhattan', []),
('Brooklyn', []),
('Queens', []),
]),
]),
]

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.location_types['state'].view_descendants = True
cls.location_types['state'].save()
cls.location_types['city'].shares_cases = True
cls.location_types['city'].save()

def test_hierarchical_ownership(self):
user = CommCareUser.create(self.domain, 'username', 'password', None, None)
user.set_location(self.locations['New York'])
user.add_to_assigned_locations(self.locations['Suffolk'])
user.add_to_assigned_locations(self.locations['Somerville'])
user.save()
self.addCleanup(user.delete, self.domain, deleted_by=None)

# Only city locations share cases, and only state cases view descendants,
# so the cities in New York state appear, but not those in Suffolk county
# Somerville appears to, as it's directly assigned
self.assertItemsEqual(
user.get_owner_ids(self.domain),
[user.user_id] + [self.locations[loc].location_id for loc in
['Manhattan', 'Brooklyn', 'Queens', 'Somerville']]
)

def test_web_user(self):
user = WebUser.create(self.domain, 'username', 'password', None, None)
user.set_location(self.domain, self.locations['New York'])
user.save()
self.addCleanup(user.delete, self.domain, deleted_by=None)

self.assertItemsEqual(
user.get_owner_ids(self.domain),
[user.user_id] + [self.locations[loc].location_id for loc in ['Manhattan', 'Brooklyn', 'Queens']]
)
8 changes: 1 addition & 7 deletions corehq/ex-submodules/casexml/apps/phone/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def get_commtrack_location_id(self):
raise NotImplementedError()

def get_owner_ids(self):
raise NotImplementedError()
return self._couch_user.get_owner_ids(self.domain)

def get_call_center_indicators(self, config):
raise NotImplementedError()
Expand Down Expand Up @@ -175,9 +175,6 @@ def get_fixture_data_items(self):
def get_commtrack_location_id(self):
return None

def get_owner_ids(self):
return [self.user_id]

def get_call_center_indicators(self, config):
return None

Expand Down Expand Up @@ -215,9 +212,6 @@ def get_commtrack_location_id(self):

return get_commtrack_location_id(self._couch_user, self.project)

def get_owner_ids(self):
return self._couch_user.get_owner_ids(self.domain)

def get_call_center_indicators(self, config):
from corehq.apps.callcenter.indicator_sets import CallCenterIndicators

Expand Down

0 comments on commit 08a3a9d

Please sign in to comment.