Skip to content

Commit

Permalink
Add one more index. Use UNION ALL queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaapstorm committed Oct 23, 2024
1 parent efc4dde commit bd37a00
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
34 changes: 30 additions & 4 deletions corehq/motech/repeaters/migrations/0016_add_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,18 @@ class Migration(migrations.Migration):
index=models.Index(
condition=models.Q(("is_deleted", False)),
fields=["id"],
name="is_deleted_partial_idx",
name="deleted_partial_idx",
),
),
migrations.AddIndex(
model_name="repeater",
index=models.Index(
condition=models.Q(
("is_deleted", False),
("is_paused", False),
),
fields=["id"],
name="deleted_paused_partial_idx",
),
),
migrations.AddIndex(
Expand All @@ -46,19 +57,34 @@ class Migration(migrations.Migration):
"""
),

# Replace with a partial index on `id_` column
# Replace with a partial index on `id_` column. Used
# when next_attempt_at is null
migrations.RunSQL(
sql="""
CREATE INDEX CONCURRENTLY "is_deleted_partial_idx"
CREATE INDEX CONCURRENTLY "deleted_partial_idx"
ON "repeaters_repeater" ("id_")
WHERE NOT "is_deleted";
""",
reverse_sql="""
DROP INDEX CONCURRENTLY "is_deleted_partial_idx";
DROP INDEX CONCURRENTLY "deleted_partial_idx";
"""
),

# Add partial index for is_deleted and is_paused on `id_`
# column. Used when next_attempt_at is not null
migrations.RunSQL(
sql="""
CREATE INDEX CONCURRENTLY "deleted_paused_partial_idx"
ON "repeaters_repeater" ("id_")
WHERE (NOT "is_deleted" AND NOT "is_paused");
""",
reverse_sql="""
DROP INDEX CONCURRENTLY "deleted_paused_partial_idx";
"""
),

# Add partial index for RepeatRecord.state on `repeater_id`
# column. Used when next_attempt_at is not null
migrations.RunSQL(
sql="""
CREATE INDEX CONCURRENTLY "state_partial_idx"
Expand Down
51 changes: 36 additions & 15 deletions corehq/motech/repeaters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,28 +238,44 @@ def all_ready(self):
"""
Return all Repeaters ready to be forwarded.
"""
not_paused = models.Q(is_paused=False)
next_attempt_not_in_the_future = (
models.Q(next_attempt_at__isnull=True)
| models.Q(next_attempt_at__lte=timezone.now())
)
repeat_records_ready_to_send = models.Q(
repeat_records__state__in=(State.Pending, State.Fail)
)
return (
self.get_queryset()
.filter(not_paused)
.filter(next_attempt_not_in_the_future)
.filter(repeat_records_ready_to_send)
return self._all_ready_next_attempt_null().union(
self._all_ready_next_attempt_now(),
all=True,
)

def get_all_ready_ids_by_domain(self):
next_attempt_null = self._all_ready_next_attempt_null().values_list('domain', 'id')
next_attempt_now = self._all_ready_next_attempt_now().values_list('domain', 'id')
query = next_attempt_null.union(next_attempt_now, all=True)
results = defaultdict(list)
query = self.all_ready().values_list('domain', 'id')
for (domain, id_uuid) in query.all():
results[domain].append(id_uuid.hex)
return results

def all_ready_count(self):
return (
self._all_ready_next_attempt_null().count()
+ self._all_ready_next_attempt_now().count()
)

def _all_ready_next_attempt_null(self):
# Slower query. Uses deleted_partial_idx
return (
self.get_queryset()
.filter(is_paused=False)
.filter(next_attempt_at__isnull=True)
.filter(repeat_records__state__in=(State.Pending, State.Fail))
)

def _all_ready_next_attempt_now(self):
# Fast query. Uses deleted_paused_partial_idx and state_partial_idx
return (
self.get_queryset()
.filter(is_paused=False)
.filter(next_attempt_at__lte=timezone.now())
.filter(repeat_records__state__in=(State.Pending, State.Fail))
)

def get_queryset(self):
repeater_obj = self.model()
if type(repeater_obj).__name__ == "Repeater":
Expand Down Expand Up @@ -299,10 +315,15 @@ class Meta:
db_table = 'repeaters_repeater'
indexes = [
models.Index(
name='is_deleted_partial_idx',
name='deleted_partial_idx',
fields=['id'],
condition=models.Q(is_deleted=False),
),
models.Index(
name="deleted_paused_partial_idx",
fields=["id"],
condition=models.Q(("is_deleted", False), ("is_paused", False)),
),
]

payload_generator_classes = ()
Expand Down
6 changes: 1 addition & 5 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,11 @@ def update_repeater(repeat_record_states, repeater_id, lock_token):
)


def get_all_ready_count():
return Repeater.objects.all_ready().count()


# This metric monitors the number of Repeaters with RepeatRecords ready to
# be sent. A steep increase indicates a problem with `process_repeaters()`.
metrics_gauge_task(
'commcare.repeaters.all_ready',
get_all_ready_count,
Repeater.objects.all_ready_count,
run_every=crontab(minute='*/5'), # every five minutes
multiprocess_mode=MPM_MAX
)
5 changes: 5 additions & 0 deletions corehq/motech/repeaters/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,3 +977,8 @@ def test_status_404_response(self):

def test_none_response(self):
self.assertFalse(is_success_response(None))


def test_repeater_all_ready_union_all_sql():
sql_str = str(Repeater.objects.all_ready().query)
assert_in('UNION ALL', sql_str)

0 comments on commit bd37a00

Please sign in to comment.