Skip to content

Commit

Permalink
Use quickcache. Prefilter enabled domains.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaapstorm committed Aug 29, 2024
1 parent d8d9642 commit 48c3d7c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 28 deletions.
1 change: 1 addition & 0 deletions corehq/motech/repeaters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,7 @@ def domain_can_forward(domain):
)


@quickcache(['domain'], timeout=60)
def domain_can_forward_now(domain):
return (
domain_can_forward(domain)
Expand Down
16 changes: 12 additions & 4 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ def iter_ready_repeater_ids_forever():
while True:
yielded = False
for domain, repeater_id in iter_ready_repeater_ids_once():
if not toggles.PROCESS_REPEATERS.enabled(domain, toggles.NAMESPACE_DOMAIN):
continue
if not domain_can_forward_now(domain):
continue
if rate_limit_repeater(domain, repeater_id):
Expand Down Expand Up @@ -298,7 +296,7 @@ def iter_domain_repeaters(dom):
else:
yield rep_id

repeater_ids_by_domain = Repeater.objects.get_all_ready_ids_by_domain()
repeater_ids_by_domain = get_repeater_ids_by_domain()
while True:
if not repeater_ids_by_domain:
return
Expand All @@ -321,6 +319,16 @@ def get_repeater_lock(repeater_id):
return MeteredLock(lock, name)


def get_repeater_ids_by_domain():
repeater_ids_by_domain = Repeater.objects.get_all_ready_ids_by_domain()
enabled_domains = set(toggles.PROCESS_REPEATERS.get_enabled_domains())
return {
domain: repeater_ids
for domain, repeater_ids in repeater_ids_by_domain.items()
if domain in enabled_domains
}


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def process_repeater(domain, repeater_id, lock_token):

Expand Down Expand Up @@ -384,7 +392,7 @@ def is_repeat_record_ready(repeat_record):
# being processed
return (
not repeat_record.repeater.is_paused
and not toggles.PAUSE_DATA_FORWARDING.enabled(repeat_record.domain)
and domain_can_forward_now(repeat_record.domain)
and not rate_limit_repeater(
repeat_record.domain,
repeat_record.repeater.repeater_id
Expand Down
106 changes: 82 additions & 24 deletions corehq/motech/repeaters/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from corehq.motech.repeaters.tasks import (
_process_repeat_record,
delete_old_request_logs,
get_repeater_ids_by_domain,
iter_ready_repeater_ids_forever,
iter_ready_repeater_ids_once,
process_repeater,
Expand Down Expand Up @@ -275,34 +276,34 @@ def all_ready_ids_by_domain():

def test_no_ready_repeaters(self):
with (
patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once',
return_value=[]), # <--
patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
return_value={}), # <--
patch('corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled',
return_value=True)
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain1', 'domain2', 'domain3']),
):
self.assertFalse(next(iter_ready_repeater_ids_forever(), False))

def test_domain_cant_forward_now(self):
with (
patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once',
return_value=[(DOMAIN, 'abc123')]),
patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
side_effect=self.all_ready_ids_by_domain()),
patch('corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=False), # <--
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled',
return_value=True)
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain1', 'domain2', 'domain3']),
):
self.assertFalse(next(iter_ready_repeater_ids_forever(), False))

def test_process_repeaters_not_enabled(self):
with (
patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once',
return_value=[(DOMAIN, 'abc123')]),
patch('corehq.motech.repeaters.models.domain_can_forward',
patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
side_effect=self.all_ready_ids_by_domain()),
patch('corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled',
return_value=False) # <--
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=[]), # <--
):
self.assertFalse(next(iter_ready_repeater_ids_forever(), False))

Expand All @@ -312,8 +313,8 @@ def test_successive_loops(self):
side_effect=self.all_ready_ids_by_domain()),
patch('corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain1', 'domain2', 'domain3']),
patch('corehq.motech.repeaters.tasks.rate_limit_repeater',
return_value=False),
patch('corehq.motech.repeaters.tasks.get_repeater_lock'),
Expand Down Expand Up @@ -342,8 +343,8 @@ def test_rate_limit(self):
side_effect=self.all_ready_ids_by_domain()),
patch('corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain1', 'domain2', 'domain3']),
patch('corehq.motech.repeaters.tasks.rate_limit_repeater',
side_effect=lambda dom, rep: dom == 'domain2' and rep == 'repeater_id4'),
patch('corehq.motech.repeaters.tasks.get_repeater_lock'),
Expand All @@ -361,15 +362,43 @@ def test_rate_limit(self):
('domain1', 'repeater_id2'),
])

def test_disabled_domains_excluded(self):
with (
patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
side_effect=self.all_ready_ids_by_domain()),
patch('corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=True),
patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain2', 'domain3']), # <--
patch('corehq.motech.repeaters.tasks.rate_limit_repeater',
return_value=False),
patch('corehq.motech.repeaters.tasks.get_repeater_lock'),
):
repeaters = list(iter_ready_repeater_ids_forever())
self.assertEqual(len(repeaters), 4)
repeater_ids = [(r[0], r[1]) for r in repeaters]
self.assertEqual(repeater_ids, [
('domain2', 'repeater_id4'),
('domain3', 'repeater_id6'),
('domain2', 'repeater_id5'),
('domain2', 'repeater_id4'),
])


def test_iter_ready_repeater_ids_once():
with patch(
'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
return_value={
'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'],
'domain2': ['repeater_id4', 'repeater_id5'],
'domain3': ['repeater_id6'],
}
with (
patch(
'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
return_value={
'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'],
'domain2': ['repeater_id4', 'repeater_id5'],
'domain3': ['repeater_id6'],
}
),
patch(
'corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain1', 'domain2', 'domain3'],
),
):
pairs = list(iter_ready_repeater_ids_once())
assert_equal(pairs, [
Expand All @@ -387,13 +416,42 @@ def test_iter_ready_repeater_ids_once():
])


def test_get_repeater_ids_by_domain():
with (
patch(
'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain',
return_value={
'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'],
'domain2': ['repeater_id4', 'repeater_id5'],
'domain3': ['repeater_id6'],
}
),
patch(
'corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains',
return_value=['domain2', 'domain3', 'domain4'],
),
):
repeater_ids_by_domain = get_repeater_ids_by_domain()
assert_equal(repeater_ids_by_domain, {
'domain2': ['repeater_id4', 'repeater_id5'],
'domain3': ['repeater_id6'],
})


@flag_enabled('PROCESS_REPEATERS')
class TestProcessRepeater(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()

can_forward_now_patch = patch(
'corehq.motech.repeaters.tasks.domain_can_forward_now',
return_value=True,
)
can_forward_now_patch = can_forward_now_patch.start()
cls.addClassCleanup(can_forward_now_patch.stop)

cls.set_backoff_patch = patch.object(FormRepeater, 'set_backoff')
cls.set_backoff_patch.start()
cls.addClassCleanup(cls.set_backoff_patch.stop)
Expand Down

0 comments on commit 48c3d7c

Please sign in to comment.