Skip to content

Commit

Permalink
Add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kaapstorm committed Oct 26, 2024
1 parent a448b9e commit 87e8c34
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
37 changes: 36 additions & 1 deletion corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def process_repeaters():
if not process_repeater_lock.acquire(blocking=False):
return

metrics_counter('commcare.repeaters.process_repeaters.start')
try:
for domain, repeater_id, lock_token in iter_ready_repeater_ids_forever():
process_repeater.delay(domain, repeater_id, lock_token)
Expand Down Expand Up @@ -298,6 +299,7 @@ def iter_ready_repeater_ids_once():
...
"""
metrics_counter('commcare.repeaters.process_repeaters.iter_once')
repeater_ids_by_domain = get_repeater_ids_by_domain()
while True:
if not repeater_ids_by_domain:
Expand Down Expand Up @@ -374,6 +376,7 @@ def process_ready_repeat_record(repeat_record_id):
if not is_repeat_record_ready(repeat_record):
return None

_metrics_wait_duration(repeat_record)
report_repeater_attempt(repeat_record.repeater.repeater_id)
with timer('fire_timing') as fire_timer:
state_or_none = repeat_record.fire(timing_context=fire_timer)
Expand Down Expand Up @@ -404,6 +407,38 @@ def is_repeat_record_ready(repeat_record):
)


def _metrics_wait_duration(repeat_record):
"""
Metrics for the duration since ``repeat_record`` was registered or
attempted.
Max backoff for a Repeater (``MAX_RETRY_WAIT``) is 7 days. The
metric uses 10 exponential buckets, from 1 minute to 9 days.
"""
buckets = [0] + [60 * (3 ** exp) for exp in range(10)] # 1 minute to 9 days
metrics_histogram(
'commcare.repeaters.process_repeaters.repeat_record_wait',
_get_wait_duration_seconds(repeat_record),
bucket_tag='duration',
buckets=buckets,
bucket_unit='s',
tags={
'domain': repeat_record.domain,
'repeater': f'{repeat_record.domain}: {repeat_record.repeater.name}',
},
)


def _get_wait_duration_seconds(repeat_record):
last_attempt = repeat_record.attempt_set.last()
if last_attempt:
duration_start = last_attempt.created_at
else:
duration_start = repeat_record.registered_at
wait_duration = datetime.utcnow() - duration_start
return int(wait_duration.total_seconds())


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def update_repeater(repeat_record_states, repeater_id, lock_token):
"""
Expand Down Expand Up @@ -441,7 +476,7 @@ def update_repeater(repeat_record_states, repeater_id, lock_token):
# This metric monitors the number of Repeaters with RepeatRecords ready to
# be sent. A steep increase indicates a problem with `process_repeaters()`.
metrics_gauge_task(
'commcare.repeaters.all_ready',
'commcare.repeaters.process_repeaters.all_ready_count',
Repeater.objects.all_ready_count,
run_every=crontab(minute='*/5'), # every five minutes
multiprocess_mode=MPM_MAX
Expand Down
63 changes: 63 additions & 0 deletions corehq/motech/repeaters/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from corehq.motech.models import ConnectionSettings, RequestLog
from corehq.motech.repeaters.models import FormRepeater, Repeater, RepeatRecord
from corehq.motech.repeaters.tasks import (
_get_wait_duration_seconds,
_process_repeat_record,
delete_old_request_logs,
get_repeater_ids_by_domain,
Expand Down Expand Up @@ -558,3 +559,65 @@ def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __):

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_not_called()


class TestGetWaitDurationSeconds(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.repeater = FormRepeater.objects.create(
domain=DOMAIN,
connection_settings=ConnectionSettings.objects.create(
domain=DOMAIN,
url='http://www.example.com/api/'
),
)

def test_repeat_record_no_attempts(self):
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
repeat_record = RepeatRecord.objects.create(
repeater=self.repeater,
domain=DOMAIN,
payload_id='abc123',
registered_at=five_minutes_ago,
)
wait_duration = _get_wait_duration_seconds(repeat_record)
self.assertEqual(wait_duration, 300)

def test_repeat_record_one_attempt(self):
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
repeat_record = RepeatRecord.objects.create(
repeater=self.repeater,
domain=DOMAIN,
payload_id='abc123',
registered_at=five_minutes_ago,
)
thirty_seconds_ago = datetime.utcnow() - timedelta(seconds=30)
repeat_record.attempt_set.create(
created_at=thirty_seconds_ago,
state=State.Fail,
)
wait_duration = _get_wait_duration_seconds(repeat_record)
self.assertEqual(wait_duration, 30)

def test_repeat_record_two_attempts(self):
an_hour_ago = datetime.utcnow() - timedelta(hours=1)
repeat_record = RepeatRecord.objects.create(
repeater=self.repeater,
domain=DOMAIN,
payload_id='abc123',
registered_at=an_hour_ago,
)
thirty_minutes = datetime.utcnow() - timedelta(minutes=30)
repeat_record.attempt_set.create(
created_at=thirty_minutes,
state=State.Fail,
)
five_seconds_ago = datetime.utcnow() - timedelta(seconds=5)
repeat_record.attempt_set.create(
created_at=five_seconds_ago,
state=State.Fail,
)
wait_duration = _get_wait_duration_seconds(repeat_record)
self.assertEqual(wait_duration, 5)

0 comments on commit 87e8c34

Please sign in to comment.