Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process Repeaters, Part 1 #35033

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
50c848a
Add `PROCESS_REPEATERS` toggle
kaapstorm Aug 23, 2024
0db6a6c
`process_repeaters()` task
kaapstorm Aug 23, 2024
e36296c
`get_repeater_lock()`
kaapstorm Aug 23, 2024
aeb10ba
`iter_ready_repeater_ids_once()`
kaapstorm Aug 23, 2024
01e4bc7
Skip rate-limited repeaters
kaapstorm Aug 23, 2024
db2fec2
`process_repeater()` task
kaapstorm Aug 23, 2024
85b952e
Add tests
kaapstorm Aug 4, 2024
c28c11b
`Repeater.max_workers` field
kaapstorm Aug 24, 2024
d8d9642
Index fields used by `RepeaterManager.all_ready()`
kaapstorm Aug 28, 2024
48c3d7c
Use quickcache. Prefilter enabled domains.
kaapstorm Aug 28, 2024
418ed3a
Check randomly-enabled domains
kaapstorm Aug 29, 2024
85bbfa3
Forward new records for synchronous case repeaters
kaapstorm Aug 29, 2024
d1119bb
Add explanatory docstrings and comments
kaapstorm Sep 9, 2024
03b26cf
get_redis_lock() ... acquire(): No TypeError ?!
kaapstorm Sep 9, 2024
de27ba0
Drop unnecessary `iter_domain_repeaters()`
kaapstorm Sep 10, 2024
4955ef4
Don't quickcache `domain_can_forward_now()`
kaapstorm Sep 24, 2024
59aae71
Migration to create indexes concurrently
kaapstorm Sep 24, 2024
f40e6f4
Merge branch 'master' into nh/iter_repeaters_1
orangejenny Oct 4, 2024
b70fc52
Add comment
kaapstorm Oct 19, 2024
7e65b3b
Don't squash BaseExceptions
kaapstorm Oct 19, 2024
4c41896
Drop timeout for `process_repeater_lock`.
kaapstorm Oct 19, 2024
30d4a6f
Add metric for monitoring health
kaapstorm Oct 19, 2024
fc0f174
Merge branch 'master' into nh/iter_repeaters_1
kaapstorm Oct 19, 2024
e32b465
Resolve migration conflict, fix index
kaapstorm Oct 19, 2024
e3bcd74
Fix metric
kaapstorm Oct 19, 2024
efc4dde
Change indexes
kaapstorm Oct 22, 2024
bd37a00
Add one more index. Use UNION ALL queries.
kaapstorm Oct 23, 2024
a448b9e
Don't report attempt too soon
kaapstorm Oct 26, 2024
4321fb7
Add metrics
kaapstorm Oct 26, 2024
74137f9
Improve backoff logic
kaapstorm Oct 26, 2024
968a922
Update comments
kaapstorm Oct 28, 2024
4fd14a0
Show "Next attempt at" in Forwarders page
kaapstorm Oct 28, 2024
07320b9
Merge branch 'master' into nh/iter_repeaters_1
kaapstorm Oct 28, 2024
b1eb171
Fixes migration
kaapstorm Oct 28, 2024
2463348
Merge remote-tracking branch 'origin/master' into nh/iter_repeaters_1
kaapstorm Oct 29, 2024
8a7f343
Add comment on other `True` return value
kaapstorm Nov 19, 2024
0f72ba9
Count repeater backoffs
kaapstorm Dec 2, 2024
7f18e52
Add documentation
kaapstorm Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random
import uuid
from datetime import datetime, timedelta
from inspect import cleandoc

from django.conf import settings

Expand Down Expand Up @@ -248,6 +249,7 @@ def process_repeaters():
if not process_repeater_lock.acquire(blocking=False):
return

metrics_counter('commcare.repeaters.process_repeaters.start')
try:
for domain, repeater_id, lock_token in iter_ready_repeater_ids_forever():
process_repeater.delay(domain, repeater_id, lock_token)
Expand Down Expand Up @@ -298,6 +300,7 @@ def iter_ready_repeater_ids_once():
...

"""
metrics_counter('commcare.repeaters.process_repeaters.iter_once')
repeater_ids_by_domain = get_repeater_ids_by_domain()
while True:
if not repeater_ids_by_domain:
Expand Down Expand Up @@ -374,6 +377,7 @@ def process_ready_repeat_record(repeat_record_id):
if not is_repeat_record_ready(repeat_record):
return None

_metrics_wait_duration(repeat_record)
report_repeater_attempt(repeat_record.repeater.repeater_id)
with timer('fire_timing') as fire_timer:
state_or_none = repeat_record.fire(timing_context=fire_timer)
Expand Down Expand Up @@ -404,6 +408,37 @@ def is_repeat_record_ready(repeat_record):
)


def _metrics_wait_duration(repeat_record):
"""
The duration since ``repeat_record`` was registered or last attempted.

Buckets are exponential: [1m, 6m, 36m, 3.6h, 21.6h, 5.4d]
"""
buckets = [60 * (6 ** exp) for exp in range(6)]
metrics_histogram(
'commcare.repeaters.process_repeaters.repeat_record_wait',
_get_wait_duration_seconds(repeat_record),
bucket_tag='duration',
buckets=buckets,
bucket_unit='s',
tags={
'domain': repeat_record.domain,
'repeater': f'{repeat_record.domain}: {repeat_record.repeater.name}',
},
documentation=cleandoc(_metrics_wait_duration.__doc__)
)


def _get_wait_duration_seconds(repeat_record):
last_attempt = repeat_record.attempt_set.last()
if last_attempt:
duration_start = last_attempt.created_at
else:
duration_start = repeat_record.registered_at
wait_duration = datetime.utcnow() - duration_start
return int(wait_duration.total_seconds())


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def update_repeater(repeat_record_states, repeater_id, lock_token):
"""
Expand Down Expand Up @@ -441,7 +476,7 @@ def update_repeater(repeat_record_states, repeater_id, lock_token):
# This metric monitors the number of Repeaters with RepeatRecords ready to
# be sent. A steep increase indicates a problem with `process_repeaters()`.
metrics_gauge_task(
'commcare.repeaters.all_ready',
'commcare.repeaters.process_repeaters.all_ready_count',
Repeater.objects.all_ready_count,
run_every=crontab(minute='*/5'), # every five minutes
multiprocess_mode=MPM_MAX
Expand Down
63 changes: 63 additions & 0 deletions corehq/motech/repeaters/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from corehq.motech.models import ConnectionSettings, RequestLog
from corehq.motech.repeaters.models import FormRepeater, Repeater, RepeatRecord
from corehq.motech.repeaters.tasks import (
_get_wait_duration_seconds,
_process_repeat_record,
delete_old_request_logs,
get_repeater_ids_by_domain,
Expand Down Expand Up @@ -558,3 +559,65 @@ def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __):

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_not_called()


class TestGetWaitDurationSeconds(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.repeater = FormRepeater.objects.create(
domain=DOMAIN,
connection_settings=ConnectionSettings.objects.create(
domain=DOMAIN,
url='http://www.example.com/api/'
),
)

def test_repeat_record_no_attempts(self):
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
repeat_record = RepeatRecord.objects.create(
repeater=self.repeater,
domain=DOMAIN,
payload_id='abc123',
registered_at=five_minutes_ago,
)
wait_duration = _get_wait_duration_seconds(repeat_record)
self.assertEqual(wait_duration, 300)
Copy link
Contributor

@millerdev millerdev Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be a race condition (resulting in a flaky test) if something took an extra second or two (like garbage collection) between when repeat_record is created and when _get_wait_duration_seconds is run? Seems unlikely, but not impossible. Maybe consider using something like freezegun.freeze_time?

The same applies to other tests in this class as well.


def test_repeat_record_one_attempt(self):
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
repeat_record = RepeatRecord.objects.create(
repeater=self.repeater,
domain=DOMAIN,
payload_id='abc123',
registered_at=five_minutes_ago,
)
thirty_seconds_ago = datetime.utcnow() - timedelta(seconds=30)
repeat_record.attempt_set.create(
created_at=thirty_seconds_ago,
state=State.Fail,
)
wait_duration = _get_wait_duration_seconds(repeat_record)
self.assertEqual(wait_duration, 30)

def test_repeat_record_two_attempts(self):
an_hour_ago = datetime.utcnow() - timedelta(hours=1)
repeat_record = RepeatRecord.objects.create(
repeater=self.repeater,
domain=DOMAIN,
payload_id='abc123',
registered_at=an_hour_ago,
)
thirty_minutes = datetime.utcnow() - timedelta(minutes=30)
repeat_record.attempt_set.create(
created_at=thirty_minutes,
state=State.Fail,
)
five_seconds_ago = datetime.utcnow() - timedelta(seconds=5)
repeat_record.attempt_set.create(
created_at=five_seconds_ago,
state=State.Fail,
)
wait_duration = _get_wait_duration_seconds(repeat_record)
self.assertEqual(wait_duration, 5)