Skip to content

Commit

Permalink
[BB-2296] Add locking mechanism to batch operations (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
Agrendalath authored Apr 16, 2020
1 parent 8281471 commit 6820c1a
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 7 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Change Log
in this file. It adheres to the structure of http://keepachangelog.com/ ,
but in reStructuredText instead of Markdown (for ease of incorporation into
Sphinx documentation and the PyPI description).
This project adheres to Semantic Versioning (http://semver.org/).

.. There should always be an "Unreleased" section for changes pending release.
Expand All @@ -17,12 +17,18 @@ Unreleased
[2.1.0] - 2020-04-17
~~~~~~~~~~~~~~~~~~~~

* Add locking mechanism to batch operations.
* Replace `course_key` with `course` in `reaggregate_course` management command.

[2.0.1] - 2020-04-17
~~~~~~~~~~~~~~~~~~~~

* Convert `course_key` to string before sending it to Celery task.

[1.0.0] - 2018-01-04
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

* First release on PyPI.
* On-demand asynchronous aggregation of xblock completion.
* Provides an API to retrieve aggregations for one or many users, for one or
* Provides an API to retrieve aggregations for one or many users, for one or
many courses.
2 changes: 1 addition & 1 deletion completion_aggregator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

from __future__ import absolute_import, unicode_literals

__version__ = '2.0.1'
__version__ = '2.1.0'

default_app_config = 'completion_aggregator.apps.CompletionAggregatorAppConfig' # pylint: disable=invalid-name
31 changes: 29 additions & 2 deletions completion_aggregator/batch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""
Aggregator service.
This service periodically determines which stale_blocks need updating, and
Expand All @@ -14,6 +13,9 @@

import six

from django.conf import settings
from django.core.cache import cache

from . import models, utils
from .tasks import aggregation_tasks

Expand All @@ -32,6 +34,9 @@ def perform_aggregation(batch_size=10000, delay=0.0, limit=None, routing_key=Non
collects all stale blocks for each enrollment, and enqueues a single
recalculation of all aggregators containing those stale blocks.
There is a locking mechanism that ensures that only one `perform_aggregation` is running at the moment.
The lock is released manually upon exiting this function.
batch_size (int|None) [default: 10000]:
Maximum number of stale completions to fetch in a single query to the
database.
Expand All @@ -48,6 +53,14 @@ def perform_aggregation(batch_size=10000, delay=0.0, limit=None, routing_key=Non
A routing key to pass to celery for the update_aggregators tasks. None
means use the default routing key.
"""
if not cache.add(
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK,
True,
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS
):
log.warning("Aggregation is already running. Exiting.")
return

stale_queryset = models.StaleCompletion.objects.filter(resolved=False)
task_options = {}
if limit is None:
Expand All @@ -58,6 +71,7 @@ def perform_aggregation(batch_size=10000, delay=0.0, limit=None, routing_key=Non
max_id = stale_queryset.order_by('-id')[0].id
except IndexError:
log.warning("No StaleCompletions to process. Exiting.")
cache.delete(settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK) # Release the lock.
return
if routing_key:
task_options['routing_key'] = routing_key
Expand Down Expand Up @@ -108,9 +122,22 @@ def perform_aggregation(batch_size=10000, delay=0.0, limit=None, routing_key=Non
if delay:
time.sleep(delay)

cache.delete(settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK) # Release the lock.
log.info("Finished aggregation update for %s user enrollments", len(stale_blocks))


def perform_cleanup():
"""
Remove resolved StaleCompletion objects.
"""
return models.StaleCompletion.objects.filter(resolved=True).delete()
if not cache.add(
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK,
True,
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS
):
log.warning("Cleanup is already running. Exiting.")
return

deleted = models.StaleCompletion.objects.filter(resolved=True).delete()
cache.delete(settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK) # Release the lock.
return deleted
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def handle(self, *args, **options):
options['course_keys'] = BlockCompletion.objects.values_list('course_key').distinct()
CourseEnrollment = compat.course_enrollment_model() # pylint: disable=invalid-name
for course in options['course_keys']:
all_enrollments = CourseEnrollment.objects.filter(course_key=course).select_related('user')
all_enrollments = CourseEnrollment.objects.filter(course=course).select_related('user')
StaleCompletion.objects.bulk_create(
(
StaleCompletion(
Expand Down
20 changes: 20 additions & 0 deletions completion_aggregator/settings/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,23 @@ def plugin_settings(settings):
'COMPLETION_AGGREGATOR_ASYNC_AGGREGATION',
settings.COMPLETION_AGGREGATOR_ASYNC_AGGREGATION,
)

settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK = settings.ENV_TOKENS.get(
'COMPLETION_AGGREGATOR_AGGREGATION_LOCK',
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK,
)

settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS = settings.ENV_TOKENS.get(
'COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS',
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS,
)

settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK = settings.ENV_TOKENS.get(
'COMPLETION_AGGREGATOR_CLEANUP_LOCK',
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK,
)

settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS = settings.ENV_TOKENS.get(
'COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS',
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS,
)
11 changes: 11 additions & 0 deletions completion_aggregator/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,14 @@ def plugin_settings(settings):
'vertical',
}
settings.COMPLETION_AGGREGATOR_ASYNC_AGGREGATION = False

# Names of the batch operations locks
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK = 'COMPLETION_AGGREGATOR_AGGREGATION_LOCK'
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK = 'COMPLETION_AGGREGATOR_CLEANUP_LOCK'

# Define how long should the locks be kept. They are released after completing the operation, so there are two
# possible scenarios for releasing the lock on timeout:
# 1. The management command takes more than 1800s - in this case you should set a higher limit for the lock.
# 2. The management command didn't exit successfully. You should check the logs to find out why.
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS = 1800
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS = 900
4 changes: 4 additions & 0 deletions test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def root(*args):
CELERY_ALWAYS_EAGER = True
COMPLETION_AGGREGATOR_BLOCK_TYPES = {'course', 'chapter', 'sequential'}
COMPLETION_AGGREGATOR_ASYNC_AGGREGATION = True
COMPLETION_AGGREGATOR_AGGREGATION_LOCK = 'COMPLETION_AGGREGATOR_AGGREGATION_LOCK'
COMPLETION_AGGREGATOR_CLEANUP_LOCK = 'COMPLETION_AGGREGATOR_CLEANUP_LOCK'
COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS = 1800
COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS = 900

DATABASES = {
'default': {
Expand Down
39 changes: 39 additions & 0 deletions tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from opaque_keys.edx.keys import CourseKey
from xblock.core import XBlock

from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.test import TestCase, override_settings

from completion.models import BlockCompletion
Expand Down Expand Up @@ -123,6 +125,12 @@ def test_with_full_course_stale_completion(mock_task, users):
assert mock_task.call_count == 2 # Called once for each user


@patch('completion_aggregator.tasks.aggregation_tasks.update_aggregators.apply_async')
def test_with_no_completions(mock_task, users): # pylint: disable=unused-argument
perform_aggregation()
assert mock_task.call_count == 0


@patch('completion_aggregator.tasks.aggregation_tasks.update_aggregators.apply_async')
def test_with_no_blocks(mock_task, users):
course_key = CourseKey.from_string('course-v1:OpenCraft+Onboarding+2018')
Expand All @@ -131,6 +139,21 @@ def test_with_no_blocks(mock_task, users):
assert mock_task.call_count == 1


@patch('completion_aggregator.tasks.aggregation_tasks.update_aggregators.apply_async')
def test_lock(mock_task, users):
"""Ensure that only one batch aggregation is running at the moment."""
cache.add(
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK,
True,
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS
)
course_key = CourseKey.from_string('course-v1:OpenCraft+Onboarding+2018')
StaleCompletion.objects.create(username=users[0].username, course_key=course_key, block_key=None, force=True)
perform_aggregation()
cache.delete(settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK)
assert mock_task.call_count == 0


def test_plethora_of_stale_completions(users):
course_key = CourseKey.from_string('course-v1:OpenCraft+Onboarding+2018')

Expand All @@ -154,6 +177,22 @@ def test_plethora_of_stale_completions(users):
assert mock_task.call_count == 1


def test_cleanup_and_lock(users):
course_key = CourseKey.from_string('course-v1:OpenCraft+Onboarding+2018')
StaleCompletion.objects.create(username=users[0].username, course_key=course_key, block_key=None, resolved=True)
cache.add(
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK,
True,
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS
)
perform_cleanup()
assert StaleCompletion.objects.count() == 1

cache.delete(settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK)
perform_cleanup()
assert StaleCompletion.objects.count() == 0


class StaleCompletionResolutionTestCase(TestCase):
"""
XBlock.register_temp_plugin decorator breaks pytest fixtures, so we
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ commands =
rm tests/__init__.py
pycodestyle completion_aggregator tests
pydocstyle completion_aggregator tests
isort --check-only --recursive tests test_utils completion_aggregator manage.py setup.py test_settings.py
isort --check-only --diff --recursive tests test_utils completion_aggregator manage.py setup.py test_settings.py
make selfcheck

0 comments on commit 6820c1a

Please sign in to comment.