From 50c848af71ae20956fc06a2e339ea20196dcdea9 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 23 Aug 2024 19:34:08 +0100 Subject: [PATCH 01/34] Add `PROCESS_REPEATERS` toggle --- corehq/toggles/__init__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/corehq/toggles/__init__.py b/corehq/toggles/__init__.py index 1de3ab96ad02..4d67b6719528 100644 --- a/corehq/toggles/__init__.py +++ b/corehq/toggles/__init__.py @@ -1956,6 +1956,18 @@ def _commtrackify(domain_name, toggle_is_enabled): [NAMESPACE_DOMAIN] ) +PROCESS_REPEATERS = FeatureRelease( + 'process_repeaters', + 'Process repeaters instead of processing repeat records independently', + TAG_INTERNAL, + [NAMESPACE_DOMAIN], + owner='Norman Hooper', + description=""" + Manages repeat records through their repeater in order to make + smarter decisions about remote endpoints. + """ +) + DO_NOT_RATE_LIMIT_SUBMISSIONS = StaticToggle( 'do_not_rate_limit_submissions', 'Do not rate limit submissions for this project, on a temporary basis.', From 0db6a6c4f4ca90d3650f7d7beaf753042b97df8f Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 23 Aug 2024 20:51:51 +0100 Subject: [PATCH 02/34] `process_repeaters()` task --- corehq/motech/repeaters/const.py | 2 + corehq/motech/repeaters/models.py | 10 +++ corehq/motech/repeaters/tasks.py | 67 +++++++++++++++++++-- corehq/motech/repeaters/tests/test_tasks.py | 59 +++++++++++++++++- 4 files changed, 131 insertions(+), 7 deletions(-) diff --git a/corehq/motech/repeaters/const.py b/corehq/motech/repeaters/const.py index cec83aa1d125..669dd1306965 100644 --- a/corehq/motech/repeaters/const.py +++ b/corehq/motech/repeaters/const.py @@ -13,6 +13,8 @@ CHECK_REPEATERS_INTERVAL = timedelta(minutes=5) CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT CHECK_REPEATERS_KEY = 'check-repeaters-key' +PROCESS_REPEATERS_INTERVAL = timedelta(minutes=1) +PROCESS_REPEATERS_KEY = 'process-repeaters-key' ENDPOINT_TIMER = 'endpoint_timer' # Number of attempts to an online endpoint before cancelling payload MAX_ATTEMPTS = 3 diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 8e693a81b529..c5d5c2386198 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1171,6 +1171,9 @@ def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False): retry_process_datasource_repeat_record, ) + if toggles.PROCESS_REPEATERS.enabled(self.domain, toggles.NAMESPACE_DOMAIN): + return + if self.next_check is None or self.next_check > datetime.utcnow(): return @@ -1327,3 +1330,10 @@ def domain_can_forward(domain): domain_has_privilege(domain, ZAPIER_INTEGRATION) or domain_has_privilege(domain, DATA_FORWARDING) ) + + +def domain_can_forward_now(domain): + return ( + domain_can_forward(domain) + and not toggles.PAUSE_DATA_FORWARDING.enabled(domain) + ) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index eb6e5403594a..0ae80661456d 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -11,6 +11,11 @@ from corehq import toggles from corehq.apps.celery import periodic_task, task from corehq.motech.models import RequestLog +from corehq.motech.rate_limiter import ( + rate_limit_repeater, + report_repeater_attempt, + report_repeater_usage, +) from corehq.util.metrics import ( make_buckets_from_timedeltas, metrics_counter, @@ -27,15 +32,16 @@ CHECK_REPEATERS_PARTITION_COUNT, ENDPOINT_TIMER, MAX_RETRY_WAIT, + PROCESS_REPEATERS_INTERVAL, + PROCESS_REPEATERS_KEY, RATE_LIMITER_DELAY_RANGE, State, ) -from .models import RepeatRecord, domain_can_forward - -from ..rate_limiter import ( - rate_limit_repeater, - report_repeater_attempt, - report_repeater_usage, +from .models import ( + Repeater, + RepeatRecord, + domain_can_forward, + domain_can_forward_now, ) _check_repeaters_buckets = make_buckets_from_timedeltas( @@ -215,6 +221,55 @@ def _process_repeat_record(repeat_record): ) +@periodic_task( + run_every=PROCESS_REPEATERS_INTERVAL, + queue=settings.CELERY_PERIODIC_QUEUE, +) +def process_repeaters(): + """ + Processes repeaters, instead of processing repeat records + independently the way that ``check_repeaters()`` does. + """ + process_repeater_lock = get_redis_lock( + PROCESS_REPEATERS_KEY, + timeout=24 * 60 * 60, + name=PROCESS_REPEATERS_KEY, + ) + if not process_repeater_lock.acquire(blocking=False): + return + try: + for domain, repeater_id in iter_ready_repeater_ids_forever(): + process_repeater.delay(domain, repeater_id) + finally: + process_repeater_lock.release() + + +def iter_ready_repeater_ids_forever(): + """ + Cycles through repeaters (repeatedly ;) ) until there are no more + repeat records ready to be sent. + """ + while True: + yielded = False + for repeater in Repeater.objects.all_ready(): + if not toggles.PROCESS_REPEATERS.enabled(repeater.domain): + continue + if not domain_can_forward_now(repeater.domain): + continue + yielded = True + yield repeater.domain, repeater.repeater_id + + if not yielded: + # No repeaters are ready, or their domains can't forward or + # are paused. + return + + +@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) +def process_repeater(domain, repeater_id): + ... + + metrics_gauge_task( 'commcare.repeaters.overdue', RepeatRecord.objects.count_overdue, diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 455d28142945..4552429016ed 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -2,7 +2,7 @@ from datetime import datetime, timedelta from unittest.mock import patch -from django.test import TestCase +from django.test import SimpleTestCase, TestCase from corehq.apps.receiverwrapper.util import submit_form_locally from corehq.form_processor.models import XFormInstance @@ -15,6 +15,7 @@ from corehq.motech.repeaters.tasks import ( _process_repeat_record, delete_old_request_logs, + iter_ready_repeater_ids_forever, ) from ..const import State @@ -241,3 +242,59 @@ def patch(self): self.mock_domain_can_forward = patch_domain_can_forward.start() self.mock_domain_can_forward.return_value = True self.addCleanup(patch_domain_can_forward.stop) + + +class TestIterReadyRepeaterIDsForever(SimpleTestCase): + + def test_no_ready_repeaters(self): + with ( + patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', + return_value=[]), # <-- + patch('corehq.motech.repeaters.tasks.domain_can_forward_now', + return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + return_value=True) + ): + self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) + + def test_domain_cant_forward_now(self): + with ( + patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', + return_value=[Repeater()]), + patch('corehq.motech.repeaters.tasks.domain_can_forward_now', + return_value=False), # <-- + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + return_value=True) + ): + self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) + + def test_process_repeaters_not_enabled(self): + with ( + patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', + return_value=[Repeater()]), + patch('corehq.motech.repeaters.models.domain_can_forward', + return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + return_value=False) # <-- + ): + self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) + + def test_successive_loops(self): + repeater_1 = Repeater(domain=DOMAIN, name='foo') + repeater_2 = Repeater(domain=DOMAIN, name='bar') + with ( + patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', + side_effect=[[repeater_1, repeater_2], [repeater_1], []]), + patch('corehq.motech.repeaters.tasks.domain_can_forward_now', + return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + return_value=True) + ): + repeaters = list(iter_ready_repeater_ids_forever()) + self.assertEqual(len(repeaters), 3) + repeater_ids = [r[1] for r in repeaters] + self.assertEqual(repeater_ids, [ + repeater_1.repeater_id, + repeater_2.repeater_id, + repeater_1.repeater_id, + ]) From e36296c833ccd97263a7e2fad226256e4df65196 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 23 Aug 2024 21:10:04 +0100 Subject: [PATCH 03/34] `get_repeater_lock()` --- corehq/motech/repeaters/tasks.py | 26 +++++++++++++++++---- corehq/motech/repeaters/tests/test_tasks.py | 3 ++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 0ae80661456d..f16b8b7c907e 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -1,10 +1,13 @@ import random +import uuid from datetime import datetime, timedelta from django.conf import settings from celery.schedules import crontab from celery.utils.log import get_task_logger +from django_redis import get_redis_connection +from redis.lock import Lock from dimagi.utils.couch import get_redis_lock @@ -24,6 +27,7 @@ metrics_histogram_timer, ) from corehq.util.metrics.const import MPM_MAX +from corehq.util.metrics.lockmeter import MeteredLock from corehq.util.timer import TimingContext from .const import ( @@ -238,8 +242,8 @@ def process_repeaters(): if not process_repeater_lock.acquire(blocking=False): return try: - for domain, repeater_id in iter_ready_repeater_ids_forever(): - process_repeater.delay(domain, repeater_id) + for domain, repeater_id, lock_token in iter_ready_repeater_ids_forever(): + process_repeater.delay(domain, repeater_id, lock_token) finally: process_repeater_lock.release() @@ -256,8 +260,12 @@ def iter_ready_repeater_ids_forever(): continue if not domain_can_forward_now(repeater.domain): continue - yielded = True - yield repeater.domain, repeater.repeater_id + + lock = get_repeater_lock(repeater.repeater_id) + lock_token = uuid.uuid1().hex # The same way Lock does it + if lock.acquire(blocking=False, token=lock_token): + yielded = True + yield repeater.domain, repeater.repeater_id, lock_token if not yielded: # No repeaters are ready, or their domains can't forward or @@ -265,8 +273,16 @@ def iter_ready_repeater_ids_forever(): return +def get_repeater_lock(repeater_id): + redis = get_redis_connection() + name = f'process_repeater_{repeater_id}' + three_hours = 3 * 60 * 60 + lock = Lock(redis, name, timeout=three_hours) + return MeteredLock(lock, name) + + @task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) -def process_repeater(domain, repeater_id): +def process_repeater(domain, repeater_id, lock_token): ... diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 4552429016ed..7b3454ea32d9 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -288,7 +288,8 @@ def test_successive_loops(self): patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', - return_value=True) + return_value=True), + patch('corehq.motech.repeaters.tasks.get_repeater_lock'), ): repeaters = list(iter_ready_repeater_ids_forever()) self.assertEqual(len(repeaters), 3) From aeb10baf021dcdf2e88a35f243233bd2a3078b2a Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 23 Aug 2024 23:22:37 +0100 Subject: [PATCH 04/34] `iter_ready_repeater_ids_once()` --- corehq/motech/repeaters/models.py | 7 ++ corehq/motech/repeaters/tasks.py | 47 ++++++++++-- corehq/motech/repeaters/tests/test_models.py | 8 ++ corehq/motech/repeaters/tests/test_tasks.py | 77 ++++++++++++++++---- 4 files changed, 120 insertions(+), 19 deletions(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index c5d5c2386198..8ccc929abcb6 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -235,6 +235,13 @@ def all_ready(self): .filter(next_attempt_not_in_the_future) .filter(repeat_records_ready_to_send)) + def get_all_ready_ids_by_domain(self): + results = defaultdict(list) + query = self.all_ready().values_list('domain', 'id') + for (domain, id_uuid) in query.all(): + results[domain].append(id_uuid.hex) + return results + def get_queryset(self): repeater_obj = self.model() if type(repeater_obj).__name__ == "Repeater": diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index f16b8b7c907e..9a04a7a5eba5 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -255,17 +255,17 @@ def iter_ready_repeater_ids_forever(): """ while True: yielded = False - for repeater in Repeater.objects.all_ready(): - if not toggles.PROCESS_REPEATERS.enabled(repeater.domain): + for domain, repeater_id in iter_ready_repeater_ids_once(): + if not toggles.PROCESS_REPEATERS.enabled(domain, toggles.NAMESPACE_DOMAIN): continue - if not domain_can_forward_now(repeater.domain): + if not domain_can_forward_now(domain): continue - lock = get_repeater_lock(repeater.repeater_id) + lock = get_repeater_lock(repeater_id) lock_token = uuid.uuid1().hex # The same way Lock does it if lock.acquire(blocking=False, token=lock_token): yielded = True - yield repeater.domain, repeater.repeater_id, lock_token + yield domain, repeater_id, lock_token if not yielded: # No repeaters are ready, or their domains can't forward or @@ -273,6 +273,43 @@ def iter_ready_repeater_ids_forever(): return +def iter_ready_repeater_ids_once(): + """ + Yields domain-repeater_id tuples in a round-robin fashion. + + e.g. :: + ('domain1', 'repeater_id1'), + ('domain2', 'repeater_id2'), + ('domain3', 'repeater_id3'), + ('domain1', 'repeater_id4'), + ('domain2', 'repeater_id5'), + ... + + """ + + def iter_domain_repeaters(dom): + try: + rep_id = repeater_ids_by_domain[dom].pop(0) + except IndexError: + return + else: + yield rep_id + + repeater_ids_by_domain = Repeater.objects.get_all_ready_ids_by_domain() + while True: + if not repeater_ids_by_domain: + return + for domain in list(repeater_ids_by_domain.keys()): + try: + repeater_id = next(iter_domain_repeaters(domain)) + except StopIteration: + # We've exhausted the repeaters for this domain + del repeater_ids_by_domain[domain] + continue + else: + yield domain, repeater_id + + def get_repeater_lock(repeater_id): redis = get_redis_connection() name = f'process_repeater_{repeater_id}' diff --git a/corehq/motech/repeaters/tests/test_models.py b/corehq/motech/repeaters/tests/test_models.py index 75905c7231d5..80ff8046bfd1 100644 --- a/corehq/motech/repeaters/tests/test_models.py +++ b/corehq/motech/repeaters/tests/test_models.py @@ -193,6 +193,14 @@ def test_all_ready_next_past(self): self.assertEqual(len(repeaters), 1) self.assertEqual(repeaters[0].id, self.repeater.id) + def test_all_ready_ids(self): + with make_repeat_record(self.repeater, RECORD_PENDING_STATE): + repeater_ids = Repeater.objects.get_all_ready_ids_by_domain() + self.assertEqual( + dict(repeater_ids), + {self.repeater.domain: [self.repeater.repeater_id]} + ) + @contextmanager def make_repeat_record(repeater, state): diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 7b3454ea32d9..901e515ae39b 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -4,6 +4,8 @@ from django.test import SimpleTestCase, TestCase +from nose.tools import assert_equal + from corehq.apps.receiverwrapper.util import submit_form_locally from corehq.form_processor.models import XFormInstance from corehq.form_processor.utils.xform import ( @@ -16,6 +18,7 @@ _process_repeat_record, delete_old_request_logs, iter_ready_repeater_ids_forever, + iter_ready_repeater_ids_once, ) from ..const import State @@ -248,7 +251,7 @@ class TestIterReadyRepeaterIDsForever(SimpleTestCase): def test_no_ready_repeaters(self): with ( - patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', + patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', return_value=[]), # <-- patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), @@ -259,8 +262,8 @@ def test_no_ready_repeaters(self): def test_domain_cant_forward_now(self): with ( - patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', - return_value=[Repeater()]), + patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', + return_value=[(DOMAIN, 'abc123')]), patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=False), # <-- patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', @@ -270,8 +273,8 @@ def test_domain_cant_forward_now(self): def test_process_repeaters_not_enabled(self): with ( - patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', - return_value=[Repeater()]), + patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', + return_value=[(DOMAIN, 'abc123')]), patch('corehq.motech.repeaters.models.domain_can_forward', return_value=True), patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', @@ -280,11 +283,23 @@ def test_process_repeaters_not_enabled(self): self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) def test_successive_loops(self): - repeater_1 = Repeater(domain=DOMAIN, name='foo') - repeater_2 = Repeater(domain=DOMAIN, name='bar') with ( - patch('corehq.motech.repeaters.tasks.Repeater.objects.all_ready', - side_effect=[[repeater_1, repeater_2], [repeater_1], []]), + patch( + 'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + side_effect=[ + { + # See test_iter_ready_repeater_ids_once() + 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], + 'domain2': ['repeater_id4', 'repeater_id5'], + 'domain3': ['repeater_id6'], + }, + { + 'domain1': ['repeater_id1', 'repeater_id2'], + 'domain2': ['repeater_id4'] + }, + {}, + ] + ), patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', @@ -292,10 +307,44 @@ def test_successive_loops(self): patch('corehq.motech.repeaters.tasks.get_repeater_lock'), ): repeaters = list(iter_ready_repeater_ids_forever()) - self.assertEqual(len(repeaters), 3) - repeater_ids = [r[1] for r in repeaters] + self.assertEqual(len(repeaters), 9) + repeater_ids = [(r[0], r[1]) for r in repeaters] self.assertEqual(repeater_ids, [ - repeater_1.repeater_id, - repeater_2.repeater_id, - repeater_1.repeater_id, + # First loop + ('domain1', 'repeater_id1'), + ('domain2', 'repeater_id4'), + ('domain3', 'repeater_id6'), + ('domain1', 'repeater_id2'), + ('domain2', 'repeater_id5'), + ('domain1', 'repeater_id3'), + + # Second loop + ('domain1', 'repeater_id1'), + ('domain2', 'repeater_id4'), + ('domain1', 'repeater_id2'), ]) + + +def test_iter_ready_repeater_ids_once(): + with patch( + 'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + return_value={ + 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], + 'domain2': ['repeater_id4', 'repeater_id5'], + 'domain3': ['repeater_id6'], + } + ): + pairs = list(iter_ready_repeater_ids_once()) + assert_equal(pairs, [ + # First round of domains + ('domain1', 'repeater_id1'), + ('domain2', 'repeater_id4'), + ('domain3', 'repeater_id6'), + + # Second round + ('domain1', 'repeater_id2'), + ('domain2', 'repeater_id5'), + + # Third round + ('domain1', 'repeater_id3'), + ]) From 01e4bc788c606a9e72707c4c761b943fa04f3d5f Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 23 Aug 2024 23:51:02 +0100 Subject: [PATCH 05/34] Skip rate-limited repeaters --- corehq/motech/repeaters/tasks.py | 2 + corehq/motech/repeaters/tests/test_tasks.py | 61 +++++++++++++++------ 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 9a04a7a5eba5..95cd4da6f098 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -260,6 +260,8 @@ def iter_ready_repeater_ids_forever(): continue if not domain_can_forward_now(domain): continue + if rate_limit_repeater(domain, repeater_id): + continue lock = get_repeater_lock(repeater_id) lock_token = uuid.uuid1().hex # The same way Lock does it diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 901e515ae39b..a1889033b5fd 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -249,6 +249,22 @@ def patch(self): class TestIterReadyRepeaterIDsForever(SimpleTestCase): + @staticmethod + def all_ready_ids_by_domain(): + return [ + { + # See test_iter_ready_repeater_ids_once() + 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], + 'domain2': ['repeater_id4', 'repeater_id5'], + 'domain3': ['repeater_id6'], + }, + { + 'domain1': ['repeater_id1', 'repeater_id2'], + 'domain2': ['repeater_id4'] + }, + {}, + ] + def test_no_ready_repeaters(self): with ( patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', @@ -284,26 +300,14 @@ def test_process_repeaters_not_enabled(self): def test_successive_loops(self): with ( - patch( - 'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', - side_effect=[ - { - # See test_iter_ready_repeater_ids_once() - 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], - 'domain2': ['repeater_id4', 'repeater_id5'], - 'domain3': ['repeater_id6'], - }, - { - 'domain1': ['repeater_id1', 'repeater_id2'], - 'domain2': ['repeater_id4'] - }, - {}, - ] - ), + patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + side_effect=self.all_ready_ids_by_domain()), patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', return_value=True), + patch('corehq.motech.repeaters.tasks.rate_limit_repeater', + return_value=False), patch('corehq.motech.repeaters.tasks.get_repeater_lock'), ): repeaters = list(iter_ready_repeater_ids_forever()) @@ -324,6 +328,31 @@ def test_successive_loops(self): ('domain1', 'repeater_id2'), ]) + def test_rate_limit(self): + with ( + patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + side_effect=self.all_ready_ids_by_domain()), + patch('corehq.motech.repeaters.tasks.domain_can_forward_now', + return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + return_value=True), + patch('corehq.motech.repeaters.tasks.rate_limit_repeater', + side_effect=lambda dom, rep: dom == 'domain2' and rep == 'repeater_id4'), + patch('corehq.motech.repeaters.tasks.get_repeater_lock'), + ): + repeaters = list(iter_ready_repeater_ids_forever()) + self.assertEqual(len(repeaters), 7) + repeater_ids = [(r[0], r[1]) for r in repeaters] + self.assertEqual(repeater_ids, [ + ('domain1', 'repeater_id1'), + ('domain3', 'repeater_id6'), + ('domain1', 'repeater_id2'), + ('domain2', 'repeater_id5'), + ('domain1', 'repeater_id3'), + ('domain1', 'repeater_id1'), + ('domain1', 'repeater_id2'), + ]) + def test_iter_ready_repeater_ids_once(): with patch( From db2fec2159cf88bee2d92ae312abdf197d8a5356 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 24 Aug 2024 00:51:59 +0100 Subject: [PATCH 06/34] `process_repeater()` task --- corehq/motech/repeaters/models.py | 23 ++++- corehq/motech/repeaters/tasks.py | 95 ++++++++++++++++++- .../motech/repeaters/tests/test_repeater.py | 8 +- corehq/util/metrics/lockmeter.py | 6 ++ corehq/util/metrics/tests/test_lockmeter.py | 20 +++- settings.py | 4 + 6 files changed, 146 insertions(+), 10 deletions(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 8ccc929abcb6..0e03cc29fdd0 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -73,6 +73,7 @@ from http import HTTPStatus from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse +from django.conf import settings from django.db import models, router from django.db.models.base import Deferred from django.dispatch import receiver @@ -357,9 +358,21 @@ def _repeater_type(cls): @property def repeat_records_ready(self): - return self.repeat_records.filter(state__in=(State.Pending, State.Fail)) + """ + A QuerySet of repeat records in the Pending or Fail state in the + order in which they were registered + """ + return ( + self.repeat_records + .filter(state__in=(State.Pending, State.Fail)) + .order_by('registered_at') + ) - def set_next_attempt(self): + @property + def num_workers(self): + return settings.DEFAULT_REPEATER_WORKERS + + def set_backoff(self): now = datetime.utcnow() interval = _get_retry_interval(self.last_attempt_at, now) self.last_attempt_at = now @@ -372,7 +385,7 @@ def set_next_attempt(self): next_attempt_at=now + interval, ) - def reset_next_attempt(self): + def reset_backoff(self): if self.last_attempt_at or self.next_attempt_at: self.last_attempt_at = None self.next_attempt_at = None @@ -1168,7 +1181,9 @@ def fire(self, force_send=False, timing_context=None): self.repeater.fire_for_record(self, timing_context=timing_context) except Exception as e: self.handle_payload_error(str(e), traceback_str=traceback.format_exc()) - raise + finally: + return self.state + return None def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False): from corehq.motech.repeaters.tasks import ( diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 95cd4da6f098..63103ffdfe1b 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -4,6 +4,7 @@ from django.conf import settings +from celery import chord from celery.schedules import crontab from celery.utils.log import get_task_logger from django_redis import get_redis_connection @@ -322,7 +323,99 @@ def get_repeater_lock(repeater_id): @task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) def process_repeater(domain, repeater_id, lock_token): - ... + + def get_task_signature(repeat_record): + task_ = { + State.Pending: process_pending_repeat_record, + State.Fail: process_failed_repeat_record, + }[repeat_record.state] + return task_.s(repeat_record.id, repeat_record.domain) + + repeater = Repeater.objects.get(domain=domain, id=repeater_id) + repeat_records = repeater.repeat_records_ready[:repeater.num_workers] + header_tasks = [get_task_signature(rr) for rr in repeat_records] + chord(header_tasks)(update_repeater.s(repeater_id, lock_token)) + + +@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) +def process_pending_repeat_record(repeat_record_id, domain): + # NOTE: Keep separate from `process_failed_repeat_record()` for + # monitoring purposes. `domain` is for tagging in Datadog + return process_ready_repeat_record(repeat_record_id) + + +@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) +def process_failed_repeat_record(repeat_record_id, domain): + # NOTE: Keep separate from `process_pending_repeat_record()` for + # monitoring purposes. `domain` is for tagging in Datadog + return process_ready_repeat_record(repeat_record_id) + + +def process_ready_repeat_record(repeat_record_id): + state_or_none = None + with TimingContext('process_repeat_record') as timer: + try: + repeat_record = ( + RepeatRecord.objects + .prefetch_related('repeater', 'attempt_set') + .get(id=repeat_record_id) + ) + report_repeater_attempt(repeat_record.repeater.repeater_id) + if not is_repeat_record_ready(repeat_record): + return None + with timer('fire_timing') as fire_timer: + state_or_none = repeat_record.fire(timing_context=fire_timer) + report_repeater_usage( + repeat_record.domain, + # round up to the nearest millisecond, meaning always at least 1ms + milliseconds=int(fire_timer.duration * 1000) + 1 + ) + except Exception: + logging.exception(f'Failed to process repeat record {repeat_record_id}') + return state_or_none + + +def is_repeat_record_ready(repeat_record): + # Fail loudly if repeat_record is not ready. + # process_ready_repeat_record() will log an exception. + assert repeat_record.state in (State.Pending, State.Fail) + + # The repeater could have been paused or rate-limited while it was + # being processed + return ( + not repeat_record.repeater.is_paused + and not toggles.PAUSE_DATA_FORWARDING.enabled(repeat_record.domain) + and not rate_limit_repeater( + repeat_record.domain, + repeat_record.repeater.repeater_id + ) + ) + + +@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) +def update_repeater(repeat_record_states, repeater_id, lock_token): + """ + Determines whether the repeater should back off, based on the + results of ``_process_repeat_record()`` tasks. + """ + try: + repeater = Repeater.objects.get(id=repeater_id) + if any(s == State.Success for s in repeat_record_states): + # At least one repeat record was sent successfully. The + # remote endpoint is healthy. + repeater.reset_backoff() + elif all(s in (State.Empty, State.InvalidPayload, None) + for s in repeat_record_states): + # We can't tell anything about the remote endpoint. + # (_process_repeat_record() can return None on an exception.) + pass + else: + # All sent payloads failed. Try again later. + repeater.set_backoff() + finally: + lock = get_repeater_lock(repeater_id) + lock.local.token = lock_token + lock.release() metrics_gauge_task( diff --git a/corehq/motech/repeaters/tests/test_repeater.py b/corehq/motech/repeaters/tests/test_repeater.py index 7115b6133ccc..84a31f2a23c3 100644 --- a/corehq/motech/repeaters/tests/test_repeater.py +++ b/corehq/motech/repeaters/tests/test_repeater.py @@ -690,10 +690,10 @@ def test_payload_exception_on_fire(self): with patch('corehq.motech.repeaters.models.simple_request') as mock_simple_post: mock_simple_post.return_value.status_code = 503 # Fail and retry rr = self.repeater.register(case) - with self.assertRaises(Exception): - with patch.object(CaseRepeater, 'get_payload', side_effect=Exception('Boom!')): - rr.fire() + with patch.object(CaseRepeater, 'get_payload', side_effect=Exception('Boom!')): + state_or_none = rr.fire() + self.assertEqual(state_or_none, State.InvalidPayload) repeat_record = RepeatRecord.objects.get(id=rr.id) self.assertEqual(repeat_record.state, State.InvalidPayload) self.assertEqual(repeat_record.failure_reason, 'Boom!') @@ -1348,7 +1348,7 @@ def test_pause_and_set_next_attempt(self): self.assertIsNone(repeater_a.next_attempt_at) self.assertFalse(repeater_b.is_paused) - repeater_a.set_next_attempt() + repeater_a.set_backoff() repeater_b.pause() repeater_c = Repeater.objects.get(id=self.repeater.repeater_id) diff --git a/corehq/util/metrics/lockmeter.py b/corehq/util/metrics/lockmeter.py index 692793b24863..27b77dbc113f 100644 --- a/corehq/util/metrics/lockmeter.py +++ b/corehq/util/metrics/lockmeter.py @@ -56,6 +56,12 @@ def release(self): self.lock_trace.finish() self.lock_trace = None + @property + def local(self): + # Used for setting lock token + # Raises AttributeError if lock does not have a `local` attribute + return self.lock.local + def __enter__(self): self.acquire(blocking=True) return self diff --git a/corehq/util/metrics/tests/test_lockmeter.py b/corehq/util/metrics/tests/test_lockmeter.py index cd5a96b53aee..e658b2423463 100644 --- a/corehq/util/metrics/tests/test_lockmeter.py +++ b/corehq/util/metrics/tests/test_lockmeter.py @@ -1,9 +1,14 @@ +import threading from unittest import TestCase +from unittest.mock import call, patch +from uuid import uuid1 import attr -from unittest.mock import call, patch +from django_redis import get_redis_connection +from redis.lock import Lock from corehq.util.metrics.tests.utils import capture_metrics + from ..lockmeter import MeteredLock @@ -155,6 +160,19 @@ def test_del_untracked(self): lock.__del__() self.assertListEqual(tracer.mock_calls, []) + def test_local(self): + redis = get_redis_connection() + name = uuid1().hex + with Lock(redis, name, timeout=5) as redis_lock: + lock = MeteredLock(redis_lock, name) + self.assertEqual(type(lock.local), threading.local) + + def test_no_local(self): + fake = FakeLock() + lock = MeteredLock(fake, "test") + with self.assertRaises(AttributeError): + lock.local + @attr.s class FakeLock(object): diff --git a/settings.py b/settings.py index 5131ef9eb623..e9a3331eedcd 100755 --- a/settings.py +++ b/settings.py @@ -628,6 +628,10 @@ "ucr_queue": None, } +# The default number of repeat_record_queue workers that one repeater +# can use to send repeat records at the same time. +DEFAULT_REPEATER_WORKERS = 7 + # websockets config WEBSOCKET_URL = '/ws/' WS4REDIS_PREFIX = 'ws' From 85b952e7a0d1fefbf1412ae99a9446f13ee1641f Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sun, 4 Aug 2024 04:38:01 +0100 Subject: [PATCH 07/34] Add tests --- corehq/motech/repeaters/tests/test_tasks.py | 120 +++++++++++++++++++- 1 file changed, 118 insertions(+), 2 deletions(-) diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index a1889033b5fd..47ab52aa2bf9 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -1,6 +1,8 @@ +from collections import namedtuple from contextlib import contextmanager from datetime import datetime, timedelta -from unittest.mock import patch +from unittest.mock import MagicMock, patch +from uuid import uuid4 from django.test import SimpleTestCase, TestCase @@ -13,13 +15,16 @@ TestFormMetadata, ) from corehq.motech.models import ConnectionSettings, RequestLog -from corehq.motech.repeaters.models import Repeater, RepeatRecord +from corehq.motech.repeaters.models import FormRepeater, Repeater, RepeatRecord from corehq.motech.repeaters.tasks import ( _process_repeat_record, delete_old_request_logs, iter_ready_repeater_ids_forever, iter_ready_repeater_ids_once, + process_repeater, + update_repeater, ) +from corehq.util.test_utils import _create_case, flag_enabled from ..const import State @@ -28,6 +33,9 @@ 'naoi', 'deich'] +ResponseMock = namedtuple('ResponseMock', 'status_code reason') + + class TestDeleteOldRequestLogs(TestCase): def test_raw_delete_logs_old(self): @@ -377,3 +385,111 @@ def test_iter_ready_repeater_ids_once(): # Third round ('domain1', 'repeater_id3'), ]) + + +@flag_enabled('PROCESS_REPEATERS') +class TestProcessRepeater(TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.set_backoff_patch = patch.object(FormRepeater, 'set_backoff') + cls.set_backoff_patch.start() + cls.addClassCleanup(cls.set_backoff_patch.stop) + + connection_settings = ConnectionSettings.objects.create( + domain=DOMAIN, + url='http://www.example.com/api/' + ) + cls.repeater = FormRepeater.objects.create( + domain=DOMAIN, + connection_settings=connection_settings, + ) + + def test_process_repeater_sends_repeat_record(self): + payload, __ = _create_case( + domain=DOMAIN, + case_id=str(uuid4()), + case_type='case', + owner_id='abc123' + ) + self.repeater.register(payload) + + with ( + patch('corehq.motech.repeaters.models.simple_request') as request_mock, + patch('corehq.motech.repeaters.tasks.get_repeater_lock') + ): + request_mock.return_value = ResponseMock(status_code=200, reason='OK') + process_repeater(DOMAIN, self.repeater.repeater_id, 'token') + + request_mock.assert_called_once() + + def test_process_repeater_updates_repeater(self): + payload, __ = _create_case( + domain=DOMAIN, + case_id=str(uuid4()), + case_type='case', + owner_id='abc123' + ) + self.repeater.register(payload) + + with ( + patch('corehq.motech.repeaters.models.simple_request') as request_mock, + patch('corehq.motech.repeaters.tasks.get_repeater_lock') + ): + request_mock.return_value = ResponseMock( + status_code=429, + reason='Too Many Requests', + ) + process_repeater(DOMAIN, self.repeater.repeater_id, 'token') + + self.repeater.set_backoff.assert_called_once() + + +@flag_enabled('PROCESS_REPEATERS') +class TestUpdateRepeater(SimpleTestCase): + + @patch('corehq.motech.repeaters.tasks.get_repeater_lock') + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_resets_backoff_on_success(self, mock_get_repeater, __): + mock_repeater = MagicMock() + mock_get_repeater.return_value = mock_repeater + + update_repeater([State.Success, State.Fail, State.Empty, None], 1, 'token') + + mock_repeater.set_backoff.assert_not_called() + mock_repeater.reset_backoff.assert_called_once() + + @patch('corehq.motech.repeaters.tasks.get_repeater_lock') + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_sets_backoff_on_failure(self, mock_get_repeater, __): + mock_repeater = MagicMock() + mock_get_repeater.return_value = mock_repeater + + update_repeater([State.Fail, State.Empty, None], 1, 'token') + + mock_repeater.set_backoff.assert_called_once() + mock_repeater.reset_backoff.assert_not_called() + + @patch('corehq.motech.repeaters.tasks.get_repeater_lock') + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_does_nothing_on_empty(self, mock_get_repeater, __): + mock_repeater = MagicMock() + mock_get_repeater.return_value = mock_repeater + + update_repeater([State.Empty], 1, 'token') + + mock_repeater.set_backoff.assert_not_called() + mock_repeater.reset_backoff.assert_not_called() + + @patch('corehq.motech.repeaters.tasks.get_repeater_lock') + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __): + mock_repeater = MagicMock() + mock_get_repeater.return_value = mock_repeater + + update_repeater([None], 1, 'token') + + mock_repeater.set_backoff.assert_not_called() + mock_repeater.reset_backoff.assert_not_called() From c28c11b84578ef5613931fa7012278cf438a07a9 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 24 Aug 2024 01:49:48 +0100 Subject: [PATCH 08/34] `Repeater.max_workers` field --- .../migrations/0014_repeater_max_workers.py | 16 ++++++++++++++++ corehq/motech/repeaters/models.py | 6 +++++- migrations.lock | 1 + settings.py | 5 +++++ 4 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 corehq/motech/repeaters/migrations/0014_repeater_max_workers.py diff --git a/corehq/motech/repeaters/migrations/0014_repeater_max_workers.py b/corehq/motech/repeaters/migrations/0014_repeater_max_workers.py new file mode 100644 index 000000000000..66240342f454 --- /dev/null +++ b/corehq/motech/repeaters/migrations/0014_repeater_max_workers.py @@ -0,0 +1,16 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("repeaters", "0013_alter_repeatrecord_state_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="repeater", + name="max_workers", + field=models.IntegerField(default=0), + ), + ] diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 0e03cc29fdd0..65eee77157d2 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -268,6 +268,7 @@ class Repeater(RepeaterSuperProxy): is_paused = models.BooleanField(default=False) next_attempt_at = models.DateTimeField(null=True, blank=True) last_attempt_at = models.DateTimeField(null=True, blank=True) + max_workers = models.IntegerField(default=0) options = JSONField(default=dict) connection_settings_id = models.IntegerField(db_index=True) is_deleted = models.BooleanField(default=False, db_index=True) @@ -370,7 +371,10 @@ def repeat_records_ready(self): @property def num_workers(self): - return settings.DEFAULT_REPEATER_WORKERS + # If num_workers is 1, repeat records are sent in the order in + # which they were registered. + num_workers = self.max_workers or settings.DEFAULT_REPEATER_WORKERS + return min(num_workers, settings.MAX_REPEATER_WORKERS) def set_backoff(self): now = datetime.utcnow() diff --git a/migrations.lock b/migrations.lock index 0e1a62d79cda..74031295af7f 100644 --- a/migrations.lock +++ b/migrations.lock @@ -833,6 +833,7 @@ repeaters 0011_remove_obsolete_entities 0012_formexpressionrepeater_arcgisformexpressionrepeater 0013_alter_repeatrecord_state_and_more + 0014_repeater_max_workers reports 0001_initial 0002_auto_20171121_1803 diff --git a/settings.py b/settings.py index e9a3331eedcd..42e8ce0953f4 100755 --- a/settings.py +++ b/settings.py @@ -631,6 +631,11 @@ # The default number of repeat_record_queue workers that one repeater # can use to send repeat records at the same time. DEFAULT_REPEATER_WORKERS = 7 +# The hard limit for the number of repeat_record_queue workers that one +# repeater can use to send repeat records at the same time. This is a +# guardrail to prevent one repeater from hogging repeat_record_queue +# workers and to ensure that repeaters are iterated fairly. +MAX_REPEATER_WORKERS = 144 # websockets config WEBSOCKET_URL = '/ws/' From d8d9642341d6b91815827008d9c1c69f78485f5e Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 28 Aug 2024 15:41:11 +0100 Subject: [PATCH 09/34] Index fields used by `RepeaterManager.all_ready()` --- .../0015_alter_repeatrecord_state_and_more.py | 35 +++++++++++++++++++ corehq/motech/repeaters/models.py | 33 ++++++++++++----- migrations.lock | 1 + 3 files changed, 60 insertions(+), 9 deletions(-) create mode 100644 corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py diff --git a/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py b/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py new file mode 100644 index 000000000000..83036dc7302c --- /dev/null +++ b/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py @@ -0,0 +1,35 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("repeaters", "0014_repeater_max_workers"), + ] + + operations = [ + migrations.AlterField( + model_name="repeatrecord", + name="state", + field=models.PositiveSmallIntegerField( + choices=[ + (1, "Pending"), + (2, "Failed"), + (4, "Succeeded"), + (8, "Cancelled"), + (16, "Empty"), + (32, "Invalid Payload"), + ], + db_index=True, + default=1, + ), + ), + migrations.AddIndex( + model_name="repeater", + index=models.Index( + condition=models.Q(("is_paused", False)), + fields=["next_attempt_at"], + name="next_attempt_at_not_paused_idx", + ), + ), + ] diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 65eee77157d2..f36a7318b0d7 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -231,10 +231,12 @@ def all_ready(self): repeat_records_ready_to_send = models.Q( repeat_records__state__in=(State.Pending, State.Fail) ) - return (self.get_queryset() - .filter(not_paused) - .filter(next_attempt_not_in_the_future) - .filter(repeat_records_ready_to_send)) + return ( + self.get_queryset() + .filter(not_paused) + .filter(next_attempt_not_in_the_future) + .filter(repeat_records_ready_to_send) + ) def get_all_ready_ids_by_domain(self): results = defaultdict(list) @@ -280,6 +282,13 @@ class Repeater(RepeaterSuperProxy): class Meta: db_table = 'repeaters_repeater' + indexes = [ + models.Index( + fields=['next_attempt_at'], + condition=models.Q(is_paused=False), + name='next_attempt_at_not_paused_idx', + ), + ] payload_generator_classes = () @@ -1001,11 +1010,17 @@ def get_domains_with_records(self): class RepeatRecord(models.Model): domain = models.CharField(max_length=126) payload_id = models.CharField(max_length=255) - repeater = models.ForeignKey(Repeater, - on_delete=DB_CASCADE, - db_column="repeater_id_", - related_name='repeat_records') - state = models.PositiveSmallIntegerField(choices=State.choices, default=State.Pending) + repeater = models.ForeignKey( + Repeater, + on_delete=DB_CASCADE, + db_column="repeater_id_", + related_name='repeat_records', + ) + state = models.PositiveSmallIntegerField( + choices=State.choices, + default=State.Pending, + db_index=True, + ) registered_at = models.DateTimeField() next_check = models.DateTimeField(null=True, default=None) max_possible_tries = models.IntegerField(default=MAX_BACKOFF_ATTEMPTS) diff --git a/migrations.lock b/migrations.lock index 74031295af7f..6c2ca3472698 100644 --- a/migrations.lock +++ b/migrations.lock @@ -834,6 +834,7 @@ repeaters 0012_formexpressionrepeater_arcgisformexpressionrepeater 0013_alter_repeatrecord_state_and_more 0014_repeater_max_workers + 0015_alter_repeatrecord_state_and_more reports 0001_initial 0002_auto_20171121_1803 From 48c3d7cc9f5080cf84ae366f4633926fa1dc7f09 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 28 Aug 2024 17:57:46 +0100 Subject: [PATCH 10/34] Use quickcache. Prefilter enabled domains. --- corehq/motech/repeaters/models.py | 1 + corehq/motech/repeaters/tasks.py | 16 ++- corehq/motech/repeaters/tests/test_tasks.py | 106 +++++++++++++++----- 3 files changed, 95 insertions(+), 28 deletions(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index f36a7318b0d7..3484dec0b4b7 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1373,6 +1373,7 @@ def domain_can_forward(domain): ) +@quickcache(['domain'], timeout=60) def domain_can_forward_now(domain): return ( domain_can_forward(domain) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 63103ffdfe1b..57fc27835cdc 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -257,8 +257,6 @@ def iter_ready_repeater_ids_forever(): while True: yielded = False for domain, repeater_id in iter_ready_repeater_ids_once(): - if not toggles.PROCESS_REPEATERS.enabled(domain, toggles.NAMESPACE_DOMAIN): - continue if not domain_can_forward_now(domain): continue if rate_limit_repeater(domain, repeater_id): @@ -298,7 +296,7 @@ def iter_domain_repeaters(dom): else: yield rep_id - repeater_ids_by_domain = Repeater.objects.get_all_ready_ids_by_domain() + repeater_ids_by_domain = get_repeater_ids_by_domain() while True: if not repeater_ids_by_domain: return @@ -321,6 +319,16 @@ def get_repeater_lock(repeater_id): return MeteredLock(lock, name) +def get_repeater_ids_by_domain(): + repeater_ids_by_domain = Repeater.objects.get_all_ready_ids_by_domain() + enabled_domains = set(toggles.PROCESS_REPEATERS.get_enabled_domains()) + return { + domain: repeater_ids + for domain, repeater_ids in repeater_ids_by_domain.items() + if domain in enabled_domains + } + + @task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) def process_repeater(domain, repeater_id, lock_token): @@ -384,7 +392,7 @@ def is_repeat_record_ready(repeat_record): # being processed return ( not repeat_record.repeater.is_paused - and not toggles.PAUSE_DATA_FORWARDING.enabled(repeat_record.domain) + and domain_can_forward_now(repeat_record.domain) and not rate_limit_repeater( repeat_record.domain, repeat_record.repeater.repeater_id diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 47ab52aa2bf9..ae3c2629fb09 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -19,6 +19,7 @@ from corehq.motech.repeaters.tasks import ( _process_repeat_record, delete_old_request_logs, + get_repeater_ids_by_domain, iter_ready_repeater_ids_forever, iter_ready_repeater_ids_once, process_repeater, @@ -275,34 +276,34 @@ def all_ready_ids_by_domain(): def test_no_ready_repeaters(self): with ( - patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', - return_value=[]), # <-- + patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + return_value={}), # <-- patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), - patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', - return_value=True) + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain1', 'domain2', 'domain3']), ): self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) def test_domain_cant_forward_now(self): with ( - patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', - return_value=[(DOMAIN, 'abc123')]), + patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + side_effect=self.all_ready_ids_by_domain()), patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=False), # <-- - patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', - return_value=True) + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain1', 'domain2', 'domain3']), ): self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) def test_process_repeaters_not_enabled(self): with ( - patch('corehq.motech.repeaters.tasks.iter_ready_repeater_ids_once', - return_value=[(DOMAIN, 'abc123')]), - patch('corehq.motech.repeaters.models.domain_can_forward', + patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + side_effect=self.all_ready_ids_by_domain()), + patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), - patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', - return_value=False) # <-- + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=[]), # <-- ): self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) @@ -312,8 +313,8 @@ def test_successive_loops(self): side_effect=self.all_ready_ids_by_domain()), patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), - patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', - return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain1', 'domain2', 'domain3']), patch('corehq.motech.repeaters.tasks.rate_limit_repeater', return_value=False), patch('corehq.motech.repeaters.tasks.get_repeater_lock'), @@ -342,8 +343,8 @@ def test_rate_limit(self): side_effect=self.all_ready_ids_by_domain()), patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), - patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', - return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain1', 'domain2', 'domain3']), patch('corehq.motech.repeaters.tasks.rate_limit_repeater', side_effect=lambda dom, rep: dom == 'domain2' and rep == 'repeater_id4'), patch('corehq.motech.repeaters.tasks.get_repeater_lock'), @@ -361,15 +362,43 @@ def test_rate_limit(self): ('domain1', 'repeater_id2'), ]) + def test_disabled_domains_excluded(self): + with ( + patch('corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + side_effect=self.all_ready_ids_by_domain()), + patch('corehq.motech.repeaters.tasks.domain_can_forward_now', + return_value=True), + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain2', 'domain3']), # <-- + patch('corehq.motech.repeaters.tasks.rate_limit_repeater', + return_value=False), + patch('corehq.motech.repeaters.tasks.get_repeater_lock'), + ): + repeaters = list(iter_ready_repeater_ids_forever()) + self.assertEqual(len(repeaters), 4) + repeater_ids = [(r[0], r[1]) for r in repeaters] + self.assertEqual(repeater_ids, [ + ('domain2', 'repeater_id4'), + ('domain3', 'repeater_id6'), + ('domain2', 'repeater_id5'), + ('domain2', 'repeater_id4'), + ]) + def test_iter_ready_repeater_ids_once(): - with patch( - 'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', - return_value={ - 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], - 'domain2': ['repeater_id4', 'repeater_id5'], - 'domain3': ['repeater_id6'], - } + with ( + patch( + 'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + return_value={ + 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], + 'domain2': ['repeater_id4', 'repeater_id5'], + 'domain3': ['repeater_id6'], + } + ), + patch( + 'corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain1', 'domain2', 'domain3'], + ), ): pairs = list(iter_ready_repeater_ids_once()) assert_equal(pairs, [ @@ -387,6 +416,28 @@ def test_iter_ready_repeater_ids_once(): ]) +def test_get_repeater_ids_by_domain(): + with ( + patch( + 'corehq.motech.repeaters.tasks.Repeater.objects.get_all_ready_ids_by_domain', + return_value={ + 'domain1': ['repeater_id1', 'repeater_id2', 'repeater_id3'], + 'domain2': ['repeater_id4', 'repeater_id5'], + 'domain3': ['repeater_id6'], + } + ), + patch( + 'corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', + return_value=['domain2', 'domain3', 'domain4'], + ), + ): + repeater_ids_by_domain = get_repeater_ids_by_domain() + assert_equal(repeater_ids_by_domain, { + 'domain2': ['repeater_id4', 'repeater_id5'], + 'domain3': ['repeater_id6'], + }) + + @flag_enabled('PROCESS_REPEATERS') class TestProcessRepeater(TestCase): @@ -394,6 +445,13 @@ class TestProcessRepeater(TestCase): def setUpClass(cls): super().setUpClass() + can_forward_now_patch = patch( + 'corehq.motech.repeaters.tasks.domain_can_forward_now', + return_value=True, + ) + can_forward_now_patch = can_forward_now_patch.start() + cls.addClassCleanup(can_forward_now_patch.stop) + cls.set_backoff_patch = patch.object(FormRepeater, 'set_backoff') cls.set_backoff_patch.start() cls.addClassCleanup(cls.set_backoff_patch.stop) From 418ed3afe7f853edeeb1160bd025608ce85a0c99 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Thu, 29 Aug 2024 13:23:59 +0100 Subject: [PATCH 11/34] Check randomly-enabled domains --- corehq/motech/repeaters/tasks.py | 8 ++++++-- corehq/motech/repeaters/tests/test_tasks.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 57fc27835cdc..dad7e09cfd14 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -321,11 +321,15 @@ def get_repeater_lock(repeater_id): def get_repeater_ids_by_domain(): repeater_ids_by_domain = Repeater.objects.get_all_ready_ids_by_domain() - enabled_domains = set(toggles.PROCESS_REPEATERS.get_enabled_domains()) + always_enabled_domains = set(toggles.PROCESS_REPEATERS.get_enabled_domains()) return { domain: repeater_ids for domain, repeater_ids in repeater_ids_by_domain.items() - if domain in enabled_domains + if ( + domain in always_enabled_domains + # FeatureRelease toggle: Check whether domain is randomly enabled + or toggles.PROCESS_REPEATERS.enabled(domain, toggles.NAMESPACE_DOMAIN) + ) } diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index ae3c2629fb09..ed633d7404da 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -304,6 +304,8 @@ def test_process_repeaters_not_enabled(self): return_value=True), patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', return_value=[]), # <-- + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + return_value=False), # <-- ): self.assertFalse(next(iter_ready_repeater_ids_forever(), False)) @@ -369,7 +371,9 @@ def test_disabled_domains_excluded(self): patch('corehq.motech.repeaters.tasks.domain_can_forward_now', return_value=True), patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', - return_value=['domain2', 'domain3']), # <-- + return_value=['domain2']), # <-- + patch('corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + side_effect=lambda dom, __: dom == 'domain3'), # <-- patch('corehq.motech.repeaters.tasks.rate_limit_repeater', return_value=False), patch('corehq.motech.repeaters.tasks.get_repeater_lock'), @@ -428,8 +432,11 @@ def test_get_repeater_ids_by_domain(): ), patch( 'corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.get_enabled_domains', - return_value=['domain2', 'domain3', 'domain4'], + return_value=['domain2', 'domain4'], ), + patch( + 'corehq.motech.repeaters.tasks.toggles.PROCESS_REPEATERS.enabled', + side_effect=lambda dom, __: dom == 'domain3'), ): repeater_ids_by_domain = get_repeater_ids_by_domain() assert_equal(repeater_ids_by_domain, { From 85bbfa337942042f5539d143b6b6238d44eb9a1f Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Thu, 29 Aug 2024 13:43:34 +0100 Subject: [PATCH 12/34] Forward new records for synchronous case repeaters --- corehq/motech/repeaters/models.py | 12 ++++++++++- corehq/motech/repeaters/tests/test_models.py | 21 +++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 3484dec0b4b7..a628382c2cc4 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1212,7 +1212,17 @@ def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False): retry_process_datasource_repeat_record, ) - if toggles.PROCESS_REPEATERS.enabled(self.domain, toggles.NAMESPACE_DOMAIN): + def is_new_synchronous_case_repeater_record(): + """ + Repeat record is a new record for a synchronous case repeater + See corehq.motech.repeaters.signals.fire_synchronous_case_repeaters + """ + return fire_synchronously and self.state == State.Pending + + if ( + toggles.PROCESS_REPEATERS.enabled(self.domain, toggles.NAMESPACE_DOMAIN) + and not is_new_synchronous_case_repeater_record() + ): return if self.next_check is None or self.next_check > datetime.utcnow(): diff --git a/corehq/motech/repeaters/tests/test_models.py b/corehq/motech/repeaters/tests/test_models.py index 80ff8046bfd1..946495bc0e08 100644 --- a/corehq/motech/repeaters/tests/test_models.py +++ b/corehq/motech/repeaters/tests/test_models.py @@ -14,7 +14,7 @@ from nose.tools import assert_in, assert_raises from corehq.motech.models import ConnectionSettings -from corehq.util.test_utils import _create_case +from corehq.util.test_utils import _create_case, flag_enabled from ..const import ( MAX_ATTEMPTS, @@ -484,6 +484,25 @@ def test_fire_synchronously(self, process, retry_process): process.assert_called_once() self.assert_not_called(retry_process) + @flag_enabled('PROCESS_REPEATERS') + def test_process_repeaters_enabled(self, process, retry_process): + rec = self.new_record() + rec.attempt_forward_now() + + self.assert_not_called(process, retry_process) + + @flag_enabled('PROCESS_REPEATERS') + def test_fire_synchronously_process_repeaters_enabled( + self, + process, + retry_process, + ): + rec = self.new_record() + rec.attempt_forward_now(fire_synchronously=True) + + process.assert_called_once() + self.assert_not_called(retry_process) + def test_retry(self, process, retry_process): rec = self.new_record() rec.attempt_forward_now(is_retry=True) From d1119bbec475cf3cdbffb78b6ef8b07c36e7efd3 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 9 Sep 2024 15:38:14 +0100 Subject: [PATCH 13/34] Add explanatory docstrings and comments --- corehq/motech/repeaters/models.py | 12 ++++++++++++ corehq/motech/repeaters/tasks.py | 7 ++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index a628382c2cc4..1dd9de434f27 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1377,6 +1377,12 @@ def is_response(duck): def domain_can_forward(domain): + """ + Returns whether ``domain`` has data forwarding or Zapier integration + privileges. + + Used for determining whether to register a repeat record. + """ return domain and ( domain_has_privilege(domain, ZAPIER_INTEGRATION) or domain_has_privilege(domain, DATA_FORWARDING) @@ -1385,6 +1391,12 @@ def domain_can_forward(domain): @quickcache(['domain'], timeout=60) def domain_can_forward_now(domain): + """ + Returns ``True`` if ``domain`` has the requisite privileges and data + forwarding is not paused. + + Used for determining whether to send a repeat record now. + """ return ( domain_can_forward(domain) and not toggles.PAUSE_DATA_FORWARDING.enabled(domain) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index dad7e09cfd14..8741d8bab1d9 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -263,7 +263,12 @@ def iter_ready_repeater_ids_forever(): continue lock = get_repeater_lock(repeater_id) - lock_token = uuid.uuid1().hex # The same way Lock does it + # Generate a lock token using `uuid1()` the same way that + # `redis.lock.Lock` does. The `Lock` class uses the token to + # determine ownership, so that one process can acquire a + # lock and a different process can release it. This lock + # will be released by the `update_repeater()` task. + lock_token = uuid.uuid1().hex if lock.acquire(blocking=False, token=lock_token): yielded = True yield domain, repeater_id, lock_token From 03b26cfb41df9e2d57b91fa723754687b8bfac10 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 9 Sep 2024 18:50:40 +0100 Subject: [PATCH 14/34] get_redis_lock() ... acquire(): No TypeError ?! --- .../dimagi/utils/couch/tests/__init__.py | 0 .../utils/couch/tests/test_redis_lock.py | 31 +++++++++++++++++++ corehq/motech/repeaters/tasks.py | 7 +---- 3 files changed, 32 insertions(+), 6 deletions(-) create mode 100644 corehq/ex-submodules/dimagi/utils/couch/tests/__init__.py create mode 100644 corehq/ex-submodules/dimagi/utils/couch/tests/test_redis_lock.py diff --git a/corehq/ex-submodules/dimagi/utils/couch/tests/__init__.py b/corehq/ex-submodules/dimagi/utils/couch/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/corehq/ex-submodules/dimagi/utils/couch/tests/test_redis_lock.py b/corehq/ex-submodules/dimagi/utils/couch/tests/test_redis_lock.py new file mode 100644 index 000000000000..c5a8bd2d38f5 --- /dev/null +++ b/corehq/ex-submodules/dimagi/utils/couch/tests/test_redis_lock.py @@ -0,0 +1,31 @@ +import uuid + +from redis.lock import Lock as RedisLock + +from dimagi.utils.couch import get_redis_lock + +from corehq.tests.noseplugins.redislocks import TestLock +from corehq.util.metrics.lockmeter import MeteredLock + + +def test_get_redis_lock_with_token(): + lock_name = 'test-1' + metered_lock = get_redis_lock(key=lock_name, name=lock_name, timeout=1) + assert isinstance(metered_lock, MeteredLock) + # metered_lock.lock is a TestLock instance because of + # corehq.tests.noseplugins.redislocks.RedisLockTimeoutPlugin + test_lock = metered_lock.lock + assert isinstance(test_lock, TestLock) + redis_lock = test_lock.lock + assert isinstance(redis_lock, RedisLock) + + token = uuid.uuid1().hex + acquired = redis_lock.acquire(blocking=False, token=token) + assert acquired + + # What we want to be able to do in a separate process: + metered_lock_2 = get_redis_lock(key=lock_name, name=lock_name, timeout=1) + redis_lock_2 = metered_lock_2.lock.lock + redis_lock_2.local.token = token + # Does not raise LockNotOwnedError: + redis_lock_2.release() diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 8741d8bab1d9..388bde5a6f90 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -7,8 +7,6 @@ from celery import chord from celery.schedules import crontab from celery.utils.log import get_task_logger -from django_redis import get_redis_connection -from redis.lock import Lock from dimagi.utils.couch import get_redis_lock @@ -28,7 +26,6 @@ metrics_histogram_timer, ) from corehq.util.metrics.const import MPM_MAX -from corehq.util.metrics.lockmeter import MeteredLock from corehq.util.timer import TimingContext from .const import ( @@ -317,11 +314,9 @@ def iter_domain_repeaters(dom): def get_repeater_lock(repeater_id): - redis = get_redis_connection() name = f'process_repeater_{repeater_id}' three_hours = 3 * 60 * 60 - lock = Lock(redis, name, timeout=three_hours) - return MeteredLock(lock, name) + return get_redis_lock(key=name, name=name, timeout=three_hours) def get_repeater_ids_by_domain(): From de27ba0da31c6326068ea897bed20be8dd77da4f Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 10 Sep 2024 15:22:02 +0100 Subject: [PATCH 15/34] Drop unnecessary `iter_domain_repeaters()` --- corehq/motech/repeaters/tasks.py | 16 +++-------- corehq/motech/repeaters/tests/test_tasks.py | 30 ++++++++++----------- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 388bde5a6f90..2d83319ec9b6 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -289,28 +289,18 @@ def iter_ready_repeater_ids_once(): ... """ - - def iter_domain_repeaters(dom): - try: - rep_id = repeater_ids_by_domain[dom].pop(0) - except IndexError: - return - else: - yield rep_id - repeater_ids_by_domain = get_repeater_ids_by_domain() while True: if not repeater_ids_by_domain: return for domain in list(repeater_ids_by_domain.keys()): try: - repeater_id = next(iter_domain_repeaters(domain)) - except StopIteration: + repeater_id = repeater_ids_by_domain[domain].pop() + except IndexError: # We've exhausted the repeaters for this domain del repeater_ids_by_domain[domain] continue - else: - yield domain, repeater_id + yield domain, repeater_id def get_repeater_lock(repeater_id): diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index ed633d7404da..2a25d223f1cc 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -326,17 +326,17 @@ def test_successive_loops(self): repeater_ids = [(r[0], r[1]) for r in repeaters] self.assertEqual(repeater_ids, [ # First loop - ('domain1', 'repeater_id1'), - ('domain2', 'repeater_id4'), + ('domain1', 'repeater_id3'), + ('domain2', 'repeater_id5'), ('domain3', 'repeater_id6'), ('domain1', 'repeater_id2'), - ('domain2', 'repeater_id5'), - ('domain1', 'repeater_id3'), + ('domain2', 'repeater_id4'), + ('domain1', 'repeater_id1'), # Second loop - ('domain1', 'repeater_id1'), - ('domain2', 'repeater_id4'), ('domain1', 'repeater_id2'), + ('domain2', 'repeater_id4'), + ('domain1', 'repeater_id1'), ]) def test_rate_limit(self): @@ -355,13 +355,13 @@ def test_rate_limit(self): self.assertEqual(len(repeaters), 7) repeater_ids = [(r[0], r[1]) for r in repeaters] self.assertEqual(repeater_ids, [ - ('domain1', 'repeater_id1'), + ('domain1', 'repeater_id3'), + ('domain2', 'repeater_id5'), ('domain3', 'repeater_id6'), ('domain1', 'repeater_id2'), - ('domain2', 'repeater_id5'), - ('domain1', 'repeater_id3'), ('domain1', 'repeater_id1'), ('domain1', 'repeater_id2'), + ('domain1', 'repeater_id1'), ]) def test_disabled_domains_excluded(self): @@ -382,9 +382,9 @@ def test_disabled_domains_excluded(self): self.assertEqual(len(repeaters), 4) repeater_ids = [(r[0], r[1]) for r in repeaters] self.assertEqual(repeater_ids, [ - ('domain2', 'repeater_id4'), - ('domain3', 'repeater_id6'), ('domain2', 'repeater_id5'), + ('domain3', 'repeater_id6'), + ('domain2', 'repeater_id4'), ('domain2', 'repeater_id4'), ]) @@ -407,16 +407,16 @@ def test_iter_ready_repeater_ids_once(): pairs = list(iter_ready_repeater_ids_once()) assert_equal(pairs, [ # First round of domains - ('domain1', 'repeater_id1'), - ('domain2', 'repeater_id4'), + ('domain1', 'repeater_id3'), + ('domain2', 'repeater_id5'), ('domain3', 'repeater_id6'), # Second round ('domain1', 'repeater_id2'), - ('domain2', 'repeater_id5'), + ('domain2', 'repeater_id4'), # Third round - ('domain1', 'repeater_id3'), + ('domain1', 'repeater_id1'), ]) From 4955ef4a9628c5238956ea07dc6e592cc73b65a6 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 24 Sep 2024 15:47:54 +0200 Subject: [PATCH 16/34] Don't quickcache `domain_can_forward_now()` --- corehq/motech/repeaters/models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 1dd9de434f27..c8c17ada3593 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1389,7 +1389,6 @@ def domain_can_forward(domain): ) -@quickcache(['domain'], timeout=60) def domain_can_forward_now(domain): """ Returns ``True`` if ``domain`` has the requisite privileges and data From 59aae7169773ecbef86a659e2709c34ebf67df87 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 24 Sep 2024 15:52:22 +0200 Subject: [PATCH 17/34] Migration to create indexes concurrently --- .../0015_alter_repeatrecord_state_and_more.py | 72 ++++++++++++------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py b/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py index 83036dc7302c..3d7f8ccd6ef5 100644 --- a/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py +++ b/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py @@ -1,35 +1,59 @@ -from django.db import migrations, models +""" +Adds an index for RepeatRecord.state and a partial index for +next_attempt_at + not_paused +""" +from django.db import migrations class Migration(migrations.Migration): + atomic = False dependencies = [ ("repeaters", "0014_repeater_max_workers"), ] operations = [ - migrations.AlterField( - model_name="repeatrecord", - name="state", - field=models.PositiveSmallIntegerField( - choices=[ - (1, "Pending"), - (2, "Failed"), - (4, "Succeeded"), - (8, "Cancelled"), - (16, "Empty"), - (32, "Invalid Payload"), - ], - db_index=True, - default=1, - ), - ), - migrations.AddIndex( - model_name="repeater", - index=models.Index( - condition=models.Q(("is_paused", False)), - fields=["next_attempt_at"], - name="next_attempt_at_not_paused_idx", - ), + # migrations.AlterField( + # model_name="repeatrecord", + # name="state", + # field=models.PositiveSmallIntegerField( + # choices=[ + # (1, "Pending"), + # (2, "Failed"), + # (4, "Succeeded"), + # (8, "Cancelled"), + # (16, "Empty"), + # (32, "Invalid Payload"), + # ], + # db_index=True, + # default=1, + # ), + # ), + # Equivalent to the migration above, but builds the index concurrently + migrations.RunSQL( + sql=""" + CREATE INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b" + ON "repeaters_repeatrecord" ("state"); + """, + reverse_sql=""" + DROP INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b"; + """ ), + + # migrations.AddIndex( + # model_name="repeater", + # index=models.Index( + # condition=models.Q(("is_paused", False)), + # fields=["next_attempt_at"], + # name="next_attempt_at_not_paused_idx", + # ), + # ), + migrations.RunSQL( + sql=""" + CREATE INDEX "next_attempt_at_not_paused_idx" ON "repeaters_repeater" ("next_attempt_at") WHERE NOT "is_paused"; + """, + reverse_sql=""" + DROP INDEX CONCURRENTLY "next_attempt_at_not_paused_idx"; + """ + ) ] From b70fc523d63012d06e337bffac15887a5b9f3f4d Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 19 Oct 2024 13:34:23 +0100 Subject: [PATCH 18/34] Add comment --- corehq/motech/repeaters/models.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 2b83a1388eff..77627a43e826 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -415,6 +415,10 @@ def set_backoff(self): def reset_backoff(self): if self.last_attempt_at or self.next_attempt_at: + # `_get_retry_interval()` implements exponential backoff by + # multiplying the previous interval by 3. Set last_attempt_at + # to None so that the next time we need to back off, we + # know it is the first interval. self.last_attempt_at = None self.next_attempt_at = None # Avoid a possible race condition with self.pause(), etc. From 7e65b3b06614e6d75f044540d2b3bbf4d17199f6 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 19 Oct 2024 15:40:42 +0100 Subject: [PATCH 19/34] Don't squash BaseExceptions --- corehq/motech/repeaters/models.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 77627a43e826..da8af4f523fa 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1218,8 +1218,7 @@ def fire(self, force_send=False, timing_context=None): self.repeater.fire_for_record(self, timing_context=timing_context) except Exception as e: self.handle_payload_error(str(e), traceback_str=traceback.format_exc()) - finally: - return self.state + return self.state return None def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False): From 4c4189661f8d7af95d6d45797eebb56ce59ed05f Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 19 Oct 2024 16:45:14 +0100 Subject: [PATCH 20/34] Drop timeout for `process_repeater_lock`. --- corehq/motech/repeaters/tasks.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 2d83319ec9b6..cac724254467 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -234,11 +234,20 @@ def process_repeaters(): """ process_repeater_lock = get_redis_lock( PROCESS_REPEATERS_KEY, - timeout=24 * 60 * 60, + timeout=None, # Iterating repeaters forever is fine name=PROCESS_REPEATERS_KEY, ) + # How to recover from a crash: If `process_repeaters()` needs to be + # restarted and `process_repeater_lock` was not released, expire the + # lock to allow `process_repeaters()` to start: + # + # >>> from dimagi.utils.couch.cache.cache_core import get_redis_client + # >>> from corehq.motech.repeaters.const import PROCESS_REPEATERS_KEY + # >>> client = get_redis_client() + # >>> client.expire(PROCESS_REPEATERS_KEY, timeout=0) if not process_repeater_lock.acquire(blocking=False): return + try: for domain, repeater_id, lock_token in iter_ready_repeater_ids_forever(): process_repeater.delay(domain, repeater_id, lock_token) From 30d4a6fac0d0c1c5b4a06db941c6e8e57bcf1b28 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 19 Oct 2024 16:46:25 +0100 Subject: [PATCH 21/34] Add metric for monitoring health --- corehq/motech/repeaters/tasks.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index cac724254467..409398df05c8 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -435,3 +435,12 @@ def update_repeater(repeat_record_states, repeater_id, lock_token): run_every=crontab(), # every minute multiprocess_mode=MPM_MAX ) + +# This metric monitors the number of Repeaters waiting to be sent. A +# steep increase indicates a problem with `process_repeaters()`. +metrics_gauge_task( + 'commcare.repeaters.all_ready', + lambda: Repeater.objects.all_ready().count(), + run_every=crontab(minute='*/5'), # every five minutes + multiprocess_mode=MPM_MAX +) From e32b46522c77544562f632691023f1038e6a94ec Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 19 Oct 2024 17:03:51 +0100 Subject: [PATCH 22/34] Resolve migration conflict, fix index --- .../0015_alter_repeatrecord_state_and_more.py | 59 ------------------ ...orkers.py => 0015_repeater_max_workers.py} | 2 +- .../repeaters/migrations/0016_add_indexes.py | 62 +++++++++++++++++++ corehq/motech/repeaters/models.py | 4 +- migrations.lock | 2 + 5 files changed, 67 insertions(+), 62 deletions(-) delete mode 100644 corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py rename corehq/motech/repeaters/migrations/{0014_repeater_max_workers.py => 0015_repeater_max_workers.py} (81%) create mode 100644 corehq/motech/repeaters/migrations/0016_add_indexes.py diff --git a/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py b/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py deleted file mode 100644 index 3d7f8ccd6ef5..000000000000 --- a/corehq/motech/repeaters/migrations/0015_alter_repeatrecord_state_and_more.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -Adds an index for RepeatRecord.state and a partial index for -next_attempt_at + not_paused -""" -from django.db import migrations - - -class Migration(migrations.Migration): - atomic = False - - dependencies = [ - ("repeaters", "0014_repeater_max_workers"), - ] - - operations = [ - # migrations.AlterField( - # model_name="repeatrecord", - # name="state", - # field=models.PositiveSmallIntegerField( - # choices=[ - # (1, "Pending"), - # (2, "Failed"), - # (4, "Succeeded"), - # (8, "Cancelled"), - # (16, "Empty"), - # (32, "Invalid Payload"), - # ], - # db_index=True, - # default=1, - # ), - # ), - # Equivalent to the migration above, but builds the index concurrently - migrations.RunSQL( - sql=""" - CREATE INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b" - ON "repeaters_repeatrecord" ("state"); - """, - reverse_sql=""" - DROP INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b"; - """ - ), - - # migrations.AddIndex( - # model_name="repeater", - # index=models.Index( - # condition=models.Q(("is_paused", False)), - # fields=["next_attempt_at"], - # name="next_attempt_at_not_paused_idx", - # ), - # ), - migrations.RunSQL( - sql=""" - CREATE INDEX "next_attempt_at_not_paused_idx" ON "repeaters_repeater" ("next_attempt_at") WHERE NOT "is_paused"; - """, - reverse_sql=""" - DROP INDEX CONCURRENTLY "next_attempt_at_not_paused_idx"; - """ - ) - ] diff --git a/corehq/motech/repeaters/migrations/0014_repeater_max_workers.py b/corehq/motech/repeaters/migrations/0015_repeater_max_workers.py similarity index 81% rename from corehq/motech/repeaters/migrations/0014_repeater_max_workers.py rename to corehq/motech/repeaters/migrations/0015_repeater_max_workers.py index 66240342f454..d55d9a2e4047 100644 --- a/corehq/motech/repeaters/migrations/0014_repeater_max_workers.py +++ b/corehq/motech/repeaters/migrations/0015_repeater_max_workers.py @@ -4,7 +4,7 @@ class Migration(migrations.Migration): dependencies = [ - ("repeaters", "0013_alter_repeatrecord_state_and_more"), + ("repeaters", "0014_alter_repeater_request_method"), ] operations = [ diff --git a/corehq/motech/repeaters/migrations/0016_add_indexes.py b/corehq/motech/repeaters/migrations/0016_add_indexes.py new file mode 100644 index 000000000000..2409b1df064d --- /dev/null +++ b/corehq/motech/repeaters/migrations/0016_add_indexes.py @@ -0,0 +1,62 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + atomic = False + + dependencies = [ + ("repeaters", "0015_repeater_max_workers"), + ] + + operations = [ + migrations.SeparateDatabaseAndState( + state_operations=[ + migrations.AlterField( + model_name="repeatrecord", + name="state", + field=models.PositiveSmallIntegerField( + choices=[ + (1, "Pending"), + (2, "Failed"), + (4, "Succeeded"), + (8, "Cancelled"), + (16, "Empty"), + (32, "Invalid Payload"), + ], + db_index=True, + default=1, + ), + ), + migrations.AddIndex( + model_name="repeater", + index=models.Index( + condition=models.Q(("is_deleted", False), ("is_paused", False)), + fields=["next_attempt_at"], + name="next_attempt_at_partial_idx", + ), + ), + ], + + database_operations=[ + migrations.RunSQL( + sql=""" + CREATE INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b" + ON "repeaters_repeatrecord" ("state"); + """, + reverse_sql=""" + DROP INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b"; + """ + ), + migrations.RunSQL( + sql=""" + CREATE INDEX CONCURRENTLY "next_attempt_at_partial_idx" + ON "repeaters_repeater" ("next_attempt_at") + WHERE (NOT "is_deleted" AND NOT "is_paused"); + """, + reverse_sql=""" + DROP INDEX CONCURRENTLY "next_attempt_at_partial_idx"; + """ + ), + ] + ) + ] diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 9699c70154e3..2bcdbfe3ae5d 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -300,8 +300,8 @@ class Meta: indexes = [ models.Index( fields=['next_attempt_at'], - condition=models.Q(is_paused=False), - name='next_attempt_at_not_paused_idx', + condition=models.Q(("is_deleted", False), ("is_paused", False)), + name='next_attempt_at_partial_idx', ), ] diff --git a/migrations.lock b/migrations.lock index 7dc0874ac264..170568b378bf 100644 --- a/migrations.lock +++ b/migrations.lock @@ -837,6 +837,8 @@ repeaters 0012_formexpressionrepeater_arcgisformexpressionrepeater 0013_alter_repeatrecord_state_and_more 0014_alter_repeater_request_method + 0015_repeater_max_workers + 0016_add_indexes reports 0001_initial 0002_auto_20171121_1803 From e3bcd748894852990155ee69258c111fc05eaaa0 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sun, 20 Oct 2024 00:04:42 +0100 Subject: [PATCH 23/34] Fix metric --- corehq/motech/repeaters/tasks.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 409398df05c8..6b67c27158d8 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -436,11 +436,16 @@ def update_repeater(repeat_record_states, repeater_id, lock_token): multiprocess_mode=MPM_MAX ) -# This metric monitors the number of Repeaters waiting to be sent. A -# steep increase indicates a problem with `process_repeaters()`. + +def get_all_ready_count(): + return Repeater.objects.all_ready().count() + + +# This metric monitors the number of Repeaters with RepeatRecords ready to +# be sent. A steep increase indicates a problem with `process_repeaters()`. metrics_gauge_task( 'commcare.repeaters.all_ready', - lambda: Repeater.objects.all_ready().count(), + get_all_ready_count, run_every=crontab(minute='*/5'), # every five minutes multiprocess_mode=MPM_MAX ) From efc4dde3b96803301a2a5ec888b15c38c8941623 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 22 Oct 2024 20:22:26 +0100 Subject: [PATCH 24/34] Change indexes After some analyzing, it turns out that the query is between two and three times as fast using a partial index on RepeatRecord.repeater_id where state is pending or failed. Performance is also improved using a partial index with Repeater.is_deleted instead of a normal B-tree index. When these two indexes are used, the next_attempt_at_partial_idx is not used. --- .../repeaters/migrations/0016_add_indexes.py | 60 +++++++++++-------- corehq/motech/repeaters/models.py | 16 +++-- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/corehq/motech/repeaters/migrations/0016_add_indexes.py b/corehq/motech/repeaters/migrations/0016_add_indexes.py index 2409b1df064d..18866dcc5673 100644 --- a/corehq/motech/repeaters/migrations/0016_add_indexes.py +++ b/corehq/motech/repeaters/migrations/0016_add_indexes.py @@ -12,49 +12,61 @@ class Migration(migrations.Migration): migrations.SeparateDatabaseAndState( state_operations=[ migrations.AlterField( - model_name="repeatrecord", - name="state", - field=models.PositiveSmallIntegerField( - choices=[ - (1, "Pending"), - (2, "Failed"), - (4, "Succeeded"), - (8, "Cancelled"), - (16, "Empty"), - (32, "Invalid Payload"), - ], - db_index=True, - default=1, - ), + model_name="repeater", + name="is_deleted", + field=models.BooleanField(default=False), ), migrations.AddIndex( model_name="repeater", index=models.Index( - condition=models.Q(("is_deleted", False), ("is_paused", False)), - fields=["next_attempt_at"], - name="next_attempt_at_partial_idx", + condition=models.Q(("is_deleted", False)), + fields=["id"], + name="is_deleted_partial_idx", + ), + ), + migrations.AddIndex( + model_name="repeatrecord", + index=models.Index( + condition=models.Q(("state__in", (1, 2))), + fields=["repeater_id"], + name="state_partial_idx", ), ), ], database_operations=[ + # Drop `Repeater.id_deleted` index migrations.RunSQL( sql=""" - CREATE INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b" - ON "repeaters_repeatrecord" ("state"); + DROP INDEX CONCURRENTLY "repeaters_repeater_is_deleted_08441bf0"; """, reverse_sql=""" - DROP INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b"; + CREATE INDEX CONCURRENTLY "repeaters_repeater_is_deleted_08441bf0" + ON "repeaters_repeater" ("is_deleted"); """ ), + + # Replace with a partial index on `id_` column + migrations.RunSQL( + sql=""" + CREATE INDEX CONCURRENTLY "is_deleted_partial_idx" + ON "repeaters_repeater" ("id_") + WHERE NOT "is_deleted"; + """, + reverse_sql=""" + DROP INDEX CONCURRENTLY "is_deleted_partial_idx"; + """ + ), + + # Add partial index for RepeatRecord.state on `repeater_id` migrations.RunSQL( sql=""" - CREATE INDEX CONCURRENTLY "next_attempt_at_partial_idx" - ON "repeaters_repeater" ("next_attempt_at") - WHERE (NOT "is_deleted" AND NOT "is_paused"); + CREATE INDEX CONCURRENTLY "state_partial_idx" + ON "repeaters_repeatrecord" ("repeater_id_") + WHERE "state" IN (1, 2); """, reverse_sql=""" - DROP INDEX CONCURRENTLY "next_attempt_at_partial_idx"; + DROP INDEX CONCURRENTLY "state_partial_idx"; """ ), ] diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 2bcdbfe3ae5d..82713ff14a1d 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -288,7 +288,7 @@ class Repeater(RepeaterSuperProxy): max_workers = models.IntegerField(default=0) options = JSONField(default=dict) connection_settings_id = models.IntegerField(db_index=True) - is_deleted = models.BooleanField(default=False, db_index=True) + is_deleted = models.BooleanField(default=False) last_modified = models.DateTimeField(auto_now=True) date_created = models.DateTimeField(auto_now_add=True) @@ -299,9 +299,9 @@ class Meta: db_table = 'repeaters_repeater' indexes = [ models.Index( - fields=['next_attempt_at'], - condition=models.Q(("is_deleted", False), ("is_paused", False)), - name='next_attempt_at_partial_idx', + name='is_deleted_partial_idx', + fields=['id'], + condition=models.Q(is_deleted=False), ), ] @@ -1037,7 +1037,6 @@ class RepeatRecord(models.Model): state = models.PositiveSmallIntegerField( choices=State.choices, default=State.Pending, - db_index=True, ) registered_at = models.DateTimeField() next_check = models.DateTimeField(null=True, default=None) @@ -1053,7 +1052,12 @@ class Meta: name="next_check_not_null", fields=["next_check"], condition=models.Q(next_check__isnull=False), - ) + ), + models.Index( + name="state_partial_idx", + fields=["repeater_id"], + condition=models.Q(state__in=(State.Pending, State.Fail)), + ), ] constraints = [ models.CheckConstraint( From bd37a009ced50048aa57076ec52a73d4330b2066 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 23 Oct 2024 18:54:00 +0100 Subject: [PATCH 25/34] Add one more index. Use UNION ALL queries. --- .../repeaters/migrations/0016_add_indexes.py | 34 +++++++++++-- corehq/motech/repeaters/models.py | 51 +++++++++++++------ corehq/motech/repeaters/tasks.py | 6 +-- corehq/motech/repeaters/tests/test_models.py | 5 ++ 4 files changed, 72 insertions(+), 24 deletions(-) diff --git a/corehq/motech/repeaters/migrations/0016_add_indexes.py b/corehq/motech/repeaters/migrations/0016_add_indexes.py index 18866dcc5673..5f7f0b15af97 100644 --- a/corehq/motech/repeaters/migrations/0016_add_indexes.py +++ b/corehq/motech/repeaters/migrations/0016_add_indexes.py @@ -21,7 +21,18 @@ class Migration(migrations.Migration): index=models.Index( condition=models.Q(("is_deleted", False)), fields=["id"], - name="is_deleted_partial_idx", + name="deleted_partial_idx", + ), + ), + migrations.AddIndex( + model_name="repeater", + index=models.Index( + condition=models.Q( + ("is_deleted", False), + ("is_paused", False), + ), + fields=["id"], + name="deleted_paused_partial_idx", ), ), migrations.AddIndex( @@ -46,19 +57,34 @@ class Migration(migrations.Migration): """ ), - # Replace with a partial index on `id_` column + # Replace with a partial index on `id_` column. Used + # when next_attempt_at is null migrations.RunSQL( sql=""" - CREATE INDEX CONCURRENTLY "is_deleted_partial_idx" + CREATE INDEX CONCURRENTLY "deleted_partial_idx" ON "repeaters_repeater" ("id_") WHERE NOT "is_deleted"; """, reverse_sql=""" - DROP INDEX CONCURRENTLY "is_deleted_partial_idx"; + DROP INDEX CONCURRENTLY "deleted_partial_idx"; + """ + ), + + # Add partial index for is_deleted and is_paused on `id_` + # column. Used when next_attempt_at is not null + migrations.RunSQL( + sql=""" + CREATE INDEX CONCURRENTLY "deleted_paused_partial_idx" + ON "repeaters_repeater" ("id_") + WHERE (NOT "is_deleted" AND NOT "is_paused"); + """, + reverse_sql=""" + DROP INDEX CONCURRENTLY "deleted_paused_partial_idx"; """ ), # Add partial index for RepeatRecord.state on `repeater_id` + # column. Used when next_attempt_at is not null migrations.RunSQL( sql=""" CREATE INDEX CONCURRENTLY "state_partial_idx" diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 82713ff14a1d..2019a6b40f5e 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -238,28 +238,44 @@ def all_ready(self): """ Return all Repeaters ready to be forwarded. """ - not_paused = models.Q(is_paused=False) - next_attempt_not_in_the_future = ( - models.Q(next_attempt_at__isnull=True) - | models.Q(next_attempt_at__lte=timezone.now()) - ) - repeat_records_ready_to_send = models.Q( - repeat_records__state__in=(State.Pending, State.Fail) - ) - return ( - self.get_queryset() - .filter(not_paused) - .filter(next_attempt_not_in_the_future) - .filter(repeat_records_ready_to_send) + return self._all_ready_next_attempt_null().union( + self._all_ready_next_attempt_now(), + all=True, ) def get_all_ready_ids_by_domain(self): + next_attempt_null = self._all_ready_next_attempt_null().values_list('domain', 'id') + next_attempt_now = self._all_ready_next_attempt_now().values_list('domain', 'id') + query = next_attempt_null.union(next_attempt_now, all=True) results = defaultdict(list) - query = self.all_ready().values_list('domain', 'id') for (domain, id_uuid) in query.all(): results[domain].append(id_uuid.hex) return results + def all_ready_count(self): + return ( + self._all_ready_next_attempt_null().count() + + self._all_ready_next_attempt_now().count() + ) + + def _all_ready_next_attempt_null(self): + # Slower query. Uses deleted_partial_idx + return ( + self.get_queryset() + .filter(is_paused=False) + .filter(next_attempt_at__isnull=True) + .filter(repeat_records__state__in=(State.Pending, State.Fail)) + ) + + def _all_ready_next_attempt_now(self): + # Fast query. Uses deleted_paused_partial_idx and state_partial_idx + return ( + self.get_queryset() + .filter(is_paused=False) + .filter(next_attempt_at__lte=timezone.now()) + .filter(repeat_records__state__in=(State.Pending, State.Fail)) + ) + def get_queryset(self): repeater_obj = self.model() if type(repeater_obj).__name__ == "Repeater": @@ -299,10 +315,15 @@ class Meta: db_table = 'repeaters_repeater' indexes = [ models.Index( - name='is_deleted_partial_idx', + name='deleted_partial_idx', fields=['id'], condition=models.Q(is_deleted=False), ), + models.Index( + name="deleted_paused_partial_idx", + fields=["id"], + condition=models.Q(("is_deleted", False), ("is_paused", False)), + ), ] payload_generator_classes = () diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 6b67c27158d8..2bc5aeea5ec4 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -437,15 +437,11 @@ def update_repeater(repeat_record_states, repeater_id, lock_token): ) -def get_all_ready_count(): - return Repeater.objects.all_ready().count() - - # This metric monitors the number of Repeaters with RepeatRecords ready to # be sent. A steep increase indicates a problem with `process_repeaters()`. metrics_gauge_task( 'commcare.repeaters.all_ready', - get_all_ready_count, + Repeater.objects.all_ready_count, run_every=crontab(minute='*/5'), # every five minutes multiprocess_mode=MPM_MAX ) diff --git a/corehq/motech/repeaters/tests/test_models.py b/corehq/motech/repeaters/tests/test_models.py index 6e24d144202f..5a5c1f4511a4 100644 --- a/corehq/motech/repeaters/tests/test_models.py +++ b/corehq/motech/repeaters/tests/test_models.py @@ -977,3 +977,8 @@ def test_status_404_response(self): def test_none_response(self): self.assertFalse(is_success_response(None)) + + +def test_repeater_all_ready_union_all_sql(): + sql_str = str(Repeater.objects.all_ready().query) + assert_in('UNION ALL', sql_str) From a448b9ee7bca28ffba8456f3c6178f0a7bf169f3 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 26 Oct 2024 12:19:15 +0100 Subject: [PATCH 26/34] Don't report attempt too soon --- corehq/motech/repeaters/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 2bc5aeea5ec4..46a3e7d9c31b 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -371,9 +371,10 @@ def process_ready_repeat_record(repeat_record_id): .prefetch_related('repeater', 'attempt_set') .get(id=repeat_record_id) ) - report_repeater_attempt(repeat_record.repeater.repeater_id) if not is_repeat_record_ready(repeat_record): return None + + report_repeater_attempt(repeat_record.repeater.repeater_id) with timer('fire_timing') as fire_timer: state_or_none = repeat_record.fire(timing_context=fire_timer) report_repeater_usage( From 4321fb7e4e0136baa02b454daefc96283c1dbe36 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 26 Oct 2024 13:13:08 +0100 Subject: [PATCH 27/34] Add metrics --- corehq/motech/repeaters/tasks.py | 37 +++++++++++- corehq/motech/repeaters/tests/test_tasks.py | 63 +++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 46a3e7d9c31b..527b8e75d2fd 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -1,6 +1,7 @@ import random import uuid from datetime import datetime, timedelta +from inspect import cleandoc from django.conf import settings @@ -248,6 +249,7 @@ def process_repeaters(): if not process_repeater_lock.acquire(blocking=False): return + metrics_counter('commcare.repeaters.process_repeaters.start') try: for domain, repeater_id, lock_token in iter_ready_repeater_ids_forever(): process_repeater.delay(domain, repeater_id, lock_token) @@ -298,6 +300,7 @@ def iter_ready_repeater_ids_once(): ... """ + metrics_counter('commcare.repeaters.process_repeaters.iter_once') repeater_ids_by_domain = get_repeater_ids_by_domain() while True: if not repeater_ids_by_domain: @@ -374,6 +377,7 @@ def process_ready_repeat_record(repeat_record_id): if not is_repeat_record_ready(repeat_record): return None + _metrics_wait_duration(repeat_record) report_repeater_attempt(repeat_record.repeater.repeater_id) with timer('fire_timing') as fire_timer: state_or_none = repeat_record.fire(timing_context=fire_timer) @@ -404,6 +408,37 @@ def is_repeat_record_ready(repeat_record): ) +def _metrics_wait_duration(repeat_record): + """ + The duration since ``repeat_record`` was registered or last attempted. + + Buckets are exponential: [1m, 6m, 36m, 3.6h, 21.6h, 5.4d] + """ + buckets = [60 * (6 ** exp) for exp in range(6)] + metrics_histogram( + 'commcare.repeaters.process_repeaters.repeat_record_wait', + _get_wait_duration_seconds(repeat_record), + bucket_tag='duration', + buckets=buckets, + bucket_unit='s', + tags={ + 'domain': repeat_record.domain, + 'repeater': f'{repeat_record.domain}: {repeat_record.repeater.name}', + }, + documentation=cleandoc(_metrics_wait_duration.__doc__) + ) + + +def _get_wait_duration_seconds(repeat_record): + last_attempt = repeat_record.attempt_set.last() + if last_attempt: + duration_start = last_attempt.created_at + else: + duration_start = repeat_record.registered_at + wait_duration = datetime.utcnow() - duration_start + return int(wait_duration.total_seconds()) + + @task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) def update_repeater(repeat_record_states, repeater_id, lock_token): """ @@ -441,7 +476,7 @@ def update_repeater(repeat_record_states, repeater_id, lock_token): # This metric monitors the number of Repeaters with RepeatRecords ready to # be sent. A steep increase indicates a problem with `process_repeaters()`. metrics_gauge_task( - 'commcare.repeaters.all_ready', + 'commcare.repeaters.process_repeaters.all_ready_count', Repeater.objects.all_ready_count, run_every=crontab(minute='*/5'), # every five minutes multiprocess_mode=MPM_MAX diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 2a25d223f1cc..ee0b15999665 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -17,6 +17,7 @@ from corehq.motech.models import ConnectionSettings, RequestLog from corehq.motech.repeaters.models import FormRepeater, Repeater, RepeatRecord from corehq.motech.repeaters.tasks import ( + _get_wait_duration_seconds, _process_repeat_record, delete_old_request_logs, get_repeater_ids_by_domain, @@ -558,3 +559,65 @@ def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __): mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_not_called() + + +class TestGetWaitDurationSeconds(TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.repeater = FormRepeater.objects.create( + domain=DOMAIN, + connection_settings=ConnectionSettings.objects.create( + domain=DOMAIN, + url='http://www.example.com/api/' + ), + ) + + def test_repeat_record_no_attempts(self): + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + repeat_record = RepeatRecord.objects.create( + repeater=self.repeater, + domain=DOMAIN, + payload_id='abc123', + registered_at=five_minutes_ago, + ) + wait_duration = _get_wait_duration_seconds(repeat_record) + self.assertEqual(wait_duration, 300) + + def test_repeat_record_one_attempt(self): + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + repeat_record = RepeatRecord.objects.create( + repeater=self.repeater, + domain=DOMAIN, + payload_id='abc123', + registered_at=five_minutes_ago, + ) + thirty_seconds_ago = datetime.utcnow() - timedelta(seconds=30) + repeat_record.attempt_set.create( + created_at=thirty_seconds_ago, + state=State.Fail, + ) + wait_duration = _get_wait_duration_seconds(repeat_record) + self.assertEqual(wait_duration, 30) + + def test_repeat_record_two_attempts(self): + an_hour_ago = datetime.utcnow() - timedelta(hours=1) + repeat_record = RepeatRecord.objects.create( + repeater=self.repeater, + domain=DOMAIN, + payload_id='abc123', + registered_at=an_hour_ago, + ) + thirty_minutes = datetime.utcnow() - timedelta(minutes=30) + repeat_record.attempt_set.create( + created_at=thirty_minutes, + state=State.Fail, + ) + five_seconds_ago = datetime.utcnow() - timedelta(seconds=5) + repeat_record.attempt_set.create( + created_at=five_seconds_ago, + state=State.Fail, + ) + wait_duration = _get_wait_duration_seconds(repeat_record) + self.assertEqual(wait_duration, 5) From 74137f99d53d70639b48ea9c01e6eefabd80b5e1 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 26 Oct 2024 23:28:09 +0100 Subject: [PATCH 28/34] Improve backoff logic --- corehq/motech/repeaters/models.py | 13 ++++++++ corehq/motech/repeaters/tasks.py | 16 +++++----- .../motech/repeaters/tests/test_repeater.py | 2 +- corehq/motech/repeaters/tests/test_tasks.py | 30 ++++++++++++++----- 4 files changed, 44 insertions(+), 17 deletions(-) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 2019a6b40f5e..55543a917029 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1243,6 +1243,19 @@ def fire(self, force_send=False, timing_context=None): self.repeater.fire_for_record(self, timing_context=timing_context) except Exception as e: self.handle_payload_error(str(e), traceback_str=traceback.format_exc()) + # Repeat records with State.Fail are retried, and repeat + # records with State.InvalidPayload are not. + # + # But a repeat record can have State.InvalidPayload + # because it was sent and rejected, so we know that the + # remote endpoint is healthy and responding, or because + # this exception occurred and it was not sent, so we + # don't know anything about the remote endpoint. + # + # Return None so that `tasks.update_repeater()` treats + # the repeat record as unsent, and does not apply or + # reset a backoff. + return None return self.state return None diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 527b8e75d2fd..2317e05e0783 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -446,18 +446,16 @@ def update_repeater(repeat_record_states, repeater_id, lock_token): results of ``_process_repeat_record()`` tasks. """ try: + if all(s in (State.Empty, None) for s in repeat_record_states): + # We can't tell anything about the remote endpoint. + return + success_or_invalid = (State.Success, State.InvalidPayload) repeater = Repeater.objects.get(id=repeater_id) - if any(s == State.Success for s in repeat_record_states): - # At least one repeat record was sent successfully. The - # remote endpoint is healthy. + if any(s in success_or_invalid for s in repeat_record_states): + # The remote endpoint appears to be healthy. repeater.reset_backoff() - elif all(s in (State.Empty, State.InvalidPayload, None) - for s in repeat_record_states): - # We can't tell anything about the remote endpoint. - # (_process_repeat_record() can return None on an exception.) - pass else: - # All sent payloads failed. Try again later. + # All the payloads that were sent failed. Try again later. repeater.set_backoff() finally: lock = get_repeater_lock(repeater_id) diff --git a/corehq/motech/repeaters/tests/test_repeater.py b/corehq/motech/repeaters/tests/test_repeater.py index 2f93dfa7b519..a5b0cd15dfa5 100644 --- a/corehq/motech/repeaters/tests/test_repeater.py +++ b/corehq/motech/repeaters/tests/test_repeater.py @@ -701,7 +701,7 @@ def test_payload_exception_on_fire(self): with patch.object(CaseRepeater, 'get_payload', side_effect=Exception('Boom!')): state_or_none = rr.fire() - self.assertEqual(state_or_none, State.InvalidPayload) + self.assertIsNone(state_or_none) repeat_record = RepeatRecord.objects.get(id=rr.id) self.assertEqual(repeat_record.state, State.InvalidPayload) self.assertEqual(repeat_record.failure_reason, 'Boom!') diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index ee0b15999665..17eb0561ebda 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -519,10 +519,23 @@ class TestUpdateRepeater(SimpleTestCase): @patch('corehq.motech.repeaters.tasks.get_repeater_lock') @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') def test_update_repeater_resets_backoff_on_success(self, mock_get_repeater, __): + repeat_record_states = [State.Success, State.Fail, State.Empty, None] + mock_repeater = MagicMock() mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') + + mock_repeater.set_backoff.assert_not_called() + mock_repeater.reset_backoff.assert_called_once() - update_repeater([State.Success, State.Fail, State.Empty, None], 1, 'token') + @patch('corehq.motech.repeaters.tasks.get_repeater_lock') + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_resets_backoff_on_invalid(self, mock_get_repeater, __): + repeat_record_states = [State.InvalidPayload, State.Fail, State.Empty, None] + + mock_repeater = MagicMock() + mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_called_once() @@ -530,10 +543,11 @@ def test_update_repeater_resets_backoff_on_success(self, mock_get_repeater, __): @patch('corehq.motech.repeaters.tasks.get_repeater_lock') @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') def test_update_repeater_sets_backoff_on_failure(self, mock_get_repeater, __): + repeat_record_states = [State.Fail, State.Empty, None] + mock_repeater = MagicMock() mock_get_repeater.return_value = mock_repeater - - update_repeater([State.Fail, State.Empty, None], 1, 'token') + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_called_once() mock_repeater.reset_backoff.assert_not_called() @@ -541,10 +555,11 @@ def test_update_repeater_sets_backoff_on_failure(self, mock_get_repeater, __): @patch('corehq.motech.repeaters.tasks.get_repeater_lock') @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') def test_update_repeater_does_nothing_on_empty(self, mock_get_repeater, __): + repeat_record_states = [State.Empty] + mock_repeater = MagicMock() mock_get_repeater.return_value = mock_repeater - - update_repeater([State.Empty], 1, 'token') + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_not_called() @@ -552,10 +567,11 @@ def test_update_repeater_does_nothing_on_empty(self, mock_get_repeater, __): @patch('corehq.motech.repeaters.tasks.get_repeater_lock') @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __): + repeat_record_states = [None] + mock_repeater = MagicMock() mock_get_repeater.return_value = mock_repeater - - update_repeater([None], 1, 'token') + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_not_called() From 968a922a59d621ea2017462f868ee416567af47a Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 28 Oct 2024 11:50:21 +0000 Subject: [PATCH 29/34] Update comments --- .../repeaters/migrations/0016_add_indexes.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/corehq/motech/repeaters/migrations/0016_add_indexes.py b/corehq/motech/repeaters/migrations/0016_add_indexes.py index 5f7f0b15af97..fd4d476fd32e 100644 --- a/corehq/motech/repeaters/migrations/0016_add_indexes.py +++ b/corehq/motech/repeaters/migrations/0016_add_indexes.py @@ -57,8 +57,13 @@ class Migration(migrations.Migration): """ ), - # Replace with a partial index on `id_` column. Used - # when next_attempt_at is null + # Replace `Repeater.id_deleted` index with a partial + # index on `id_` column. + # > One major reason for using a partial index is to + # > avoid indexing common values. ... This reduces the + # > size of the index, which will speed up those queries + # > that do use the index. + # > -- https://www.postgresql.org/docs/current/indexes-partial.html migrations.RunSQL( sql=""" CREATE INDEX CONCURRENTLY "deleted_partial_idx" @@ -71,7 +76,7 @@ class Migration(migrations.Migration): ), # Add partial index for is_deleted and is_paused on `id_` - # column. Used when next_attempt_at is not null + # column. migrations.RunSQL( sql=""" CREATE INDEX CONCURRENTLY "deleted_paused_partial_idx" @@ -84,7 +89,8 @@ class Migration(migrations.Migration): ), # Add partial index for RepeatRecord.state on `repeater_id` - # column. Used when next_attempt_at is not null + # column. This does the heavy lifting for the queries + # related to `RepeaterManager.all_ready()`. migrations.RunSQL( sql=""" CREATE INDEX CONCURRENTLY "state_partial_idx" From 4fd14a0f2bead91d4218dfc109e8dfc07b8b429a Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 28 Oct 2024 17:31:40 +0000 Subject: [PATCH 30/34] Show "Next attempt at" in Forwarders page --- .../repeaters/templates/repeaters/partials/repeater_row.html | 3 +++ 1 file changed, 3 insertions(+) diff --git a/corehq/motech/repeaters/templates/repeaters/partials/repeater_row.html b/corehq/motech/repeaters/templates/repeaters/partials/repeater_row.html index 069941a94f02..18e5d91026f3 100644 --- a/corehq/motech/repeaters/templates/repeaters/partials/repeater_row.html +++ b/corehq/motech/repeaters/templates/repeaters/partials/repeater_row.html @@ -5,6 +5,9 @@ {% if repeater.white_listed_case_types %}
Case Type: {{ repeater.white_listed_case_types|join:", " }} {% endif %} + {% if repeater.next_attempt_at and request|toggle_enabled:"PROCESS_REPEATERS" and not repeater.is_paused %} +
Next attempt at {{ repeater.next_attempt_at|date:"Y-m-d H:i" }} + {% endif %} From b1eb1715c86448f34ca64212619142f7e7f3f54e Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 28 Oct 2024 17:56:46 +0000 Subject: [PATCH 31/34] Fixes migration --- .../repeaters/migrations/0015_drop_receiverwrapper_couchdb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/corehq/motech/repeaters/migrations/0015_drop_receiverwrapper_couchdb.py b/corehq/motech/repeaters/migrations/0015_drop_receiverwrapper_couchdb.py index 54dbd606bbdb..a75498273af5 100644 --- a/corehq/motech/repeaters/migrations/0015_drop_receiverwrapper_couchdb.py +++ b/corehq/motech/repeaters/migrations/0015_drop_receiverwrapper_couchdb.py @@ -8,7 +8,7 @@ @skip_on_fresh_install -def _delete_receiverwrapper_couchdb(): +def _delete_receiverwrapper_couchdb(apps, schema_editor): db = Database(f"{settings.COUCH_DATABASE}__receiverwrapper") try: db.server.delete_db(db.dbname) From 8a7f3432d3901b080601fd232aa10e8df94c50c8 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 19 Nov 2024 15:05:49 +0000 Subject: [PATCH 32/34] Add comment on other `True` return value --- corehq/motech/repeaters/models.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 83dc62458d58..770ec109363f 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -1275,6 +1275,9 @@ def is_new_synchronous_case_repeater_record(): Repeat record is a new record for a synchronous case repeater See corehq.motech.repeaters.signals.fire_synchronous_case_repeaters """ + # This will also return True if a user clicked "Resend" on a + # Pending repeat record in the Repeat Records Report. This + # is not intended, but it's also not harmful. return fire_synchronously and self.state == State.Pending if ( From 0f72ba93399d404522c72c1edc146d66be2304d8 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 2 Dec 2024 22:24:11 +0000 Subject: [PATCH 33/34] Count repeater backoffs --- corehq/motech/repeaters/tasks.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 2317e05e0783..f99d107d4bd9 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -456,6 +456,13 @@ def update_repeater(repeat_record_states, repeater_id, lock_token): repeater.reset_backoff() else: # All the payloads that were sent failed. Try again later. + metrics_counter( + 'commcare.repeaters.process_repeaters.repeater_backoff', + tags={ + 'domain': repeater.domain, + 'repeater': f'{repeater.domain}: {repeater.name}', + }, + ) repeater.set_backoff() finally: lock = get_repeater_lock(repeater_id) From 7f18e5223396aeb4dbab90c20c794b6b32079914 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Mon, 2 Dec 2024 22:24:36 +0000 Subject: [PATCH 34/34] Add documentation --- corehq/motech/repeaters/tasks.py | 73 ++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index f99d107d4bd9..f4a239314cd4 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -1,3 +1,76 @@ +""" +check_repeaters() and process_repeaters() +========================================= + +check_repeaters() +----------------- + +The ``check_repeaters()`` task is how repeat records are sent, and its +workflow was shaped by the fact that repeaters and repeat records were +stored in CouchDB. + +``check_repeaters()`` iterates all **repeat records** where the value of +``RepeatRecord.next_check`` is in the past. + +We iterate them in parallel by dividing them into partitions (four +partitions in production). Repeat records are partitioned using their +ID. (``partition = RepeatRecord.id % num_partitions``.) + +For each repeat record, ``check_repeaters_in_partition()`` calls +``RepeatRecord.attempt_forward_now(is_retry=True)``. (``is_retry`` is +set to ``True`` because when a repeat record is registered, +``RepeatRecord.attempt_forward_now()`` is called immediately, and the +repeat record is only enqueued if sending fails.) + +Execution ends up back in the ``tasks`` module when +``RepeatRecord.attempt_forward_now()`` calls +``_process_repeat_record()``. It runs a battery of checks, and if they +all succeed, ``RepeatRecord.fire()`` is called. + +This process has several disadvantages: + +* It iterates many repeat records that will not be sent. It has no way + to filter out the repeat records of paused or deleted repeaters. + +* It attempts to forward all the repeat records of a repeater, even if + every previous repeat record has failed. + + +process_repeaters() +------------------- + +The ``process_repeaters()`` task sends repeat records, but does so in a +way that takes advantage of foreign keys between repeaters and their +repeat records. + +This process is enabled using the ``PROCESS_REPEATERS`` feature flag. + +The ``iter_ready_repeater_ids_once()`` generator yields the IDs of +repeaters that have repeat records ready to be sent. It does so in a +round-robin fashion, cycling through the domains. It does this so that: + +* Domains and repeaters are not rate-limited unnecessarily. +* Remote APIs are not unintentionally DDoS-attacked by CommCare HQ. +* No domain has to wait while another domain consumes all the repeat + record queue workers. + +As long as there are repeat records ready to be sent, the +``iter_ready_repeater_ids_forever()`` generator will continue to yield +from ``iter_ready_repeater_ids_once()``. The ``process_repeaters()`` +task iterates these repeater IDs, and passes each one to the +``process_repeater()`` task. + +``process_repeater()`` fetches a batch of repeat records (the number is +set per repeater) and spawns tasks to send them in parallel. The results +of all the send attempts are passed to ``update_repeater()``. If all the +send attempts failed, the **repeater** (not the repeat record) is backed +off. If any send attempts succeeded, the backoff is reset. + +The intention of this process is not only to share repeat record queue +workers fairly across domains, but also to optimise workers by not +trying to send repeat records that are unlikely to succeed. + +""" import random import uuid from datetime import datetime, timedelta