Skip to content

Commit

Permalink
[YONK-1622] Backport migration code to v1 (#73)
Browse files Browse the repository at this point in the history
Backport migration code to v1
  • Loading branch information
xitij2000 authored Jun 3, 2020
2 parents 596520c + 6ac7c42 commit 4277565
Show file tree
Hide file tree
Showing 20 changed files with 523 additions and 351 deletions.
2 changes: 1 addition & 1 deletion completion_aggregator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

from __future__ import absolute_import, unicode_literals

__version__ = '1.5.25'
__version__ = '1.6.0'

default_app_config = 'completion_aggregator.apps.CompletionAggregatorAppConfig' # pylint: disable=invalid-name
4 changes: 2 additions & 2 deletions completion_aggregator/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ def authentication_classes(self): # pragma: no cover
"""
from openedx.core.lib.api import authentication # pylint: disable=import-error
try:
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication # pylint: disable=import-error
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
except ImportError:
from edx_rest_framework_extensions.authentication import JwtAuthentication # pylint: disable=import-error
from edx_rest_framework_extensions.authentication import JwtAuthentication

return [
JwtAuthentication,
Expand Down
56 changes: 8 additions & 48 deletions completion_aggregator/management/commands/migrate_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,11 @@
"""

import logging
import time

from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import BaseCommand

from ...tasks import aggregation_tasks

try:
from progress.models import CourseModuleCompletion
PROGRESS_IMPORTED = True
except ImportError:
PROGRESS_IMPORTED = False


log = logging.getLogger(__name__)


Expand All @@ -38,60 +30,28 @@ def add_arguments(self, parser):
default=10000,
type=int,
)
parser.add_argument(
'--start-index',
help='Offset from which to start processing CourseModuleCompletions. (default: 0)',
default=0,
type=int,
)
parser.add_argument(
'--stop-index',
help='Offset at which to stop processing CourseModuleCompletions. (default: process all)',
default=0,
type=int,
)
parser.add_argument(
'--delay-between-tasks',
help='Amount of time to wait between submitting tasks in seconds. (default: 0.0)',
default=0.0,
type=float,
)
parser.add_argument(
'--ids',
help='Migrate specific CourseModuleCompletion IDs',
)

def handle(self, *args, **options):
if not PROGRESS_IMPORTED:
raise CommandError("Unable to import progress models. Aborting")
self._configure_logging(options)
task_options = self.get_task_options(options)

if options['ids']:
migrate_ids = [int(id_) for id_ in options['ids'].split(',')]
migrate_ids.sort()
for index in migrate_ids:
aggregation_tasks.migrate_batch.apply_async(
kwargs={'start': index, 'stop': index + 1},
**task_options
)
time.sleep(options['delay_between_tasks'])
else:
cmc_max_id = CourseModuleCompletion.objects.all().order_by('-id')[:1][0].id
cmc_min_id = CourseModuleCompletion.objects.all().order_by('id')[:1][0].id
start = max(options['start_index'], cmc_min_id)
stop = min(cmc_max_id + 1, options['stop_index'] or float('inf'))
for index in range(start, stop, options['batch_size']):
aggregation_tasks.migrate_batch.apply_async(
kwargs={'start': index, 'stop': min(stop, index + options['batch_size'])},
**task_options
)
time.sleep(options['delay_between_tasks'])
aggregation_tasks.migrate_batch.apply_async(
kwargs={
'batch_size': options['batch_size'],
'delay_between_tasks': options['delay_between_tasks'],
},
**task_options
)

def get_task_options(self, options):
"""
Return task options for generated celery tasks.
Currently, this adds a routing key, if provided.
"""
opts = {}
Expand Down
124 changes: 43 additions & 81 deletions completion_aggregator/tasks/aggregation_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import time

from celery import shared_task
from celery_utils.logged_task import LoggedTask
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey, UsageKey

from django.contrib.auth.models import User
Expand All @@ -17,38 +17,16 @@
from .. import core
from ..models import StaleCompletion

try:
from progress.models import CourseModuleCompletion
PROGRESS_IMPORTED = True
except ImportError:
PROGRESS_IMPORTED = False


# SQLite doesn't support the ON DUPLICATE KEY syntax. INSERT OR REPLACE will
# have a similar effect, but uses new primary keys. The drawbacks of this are:
# * It will consume the available keyspace more quickly.
# * It will not preserve foreign keys pointing to our table.
# SQLite is only used in testing environments, so neither of these drawbacks
# poses an actual problem.

INSERT_OR_UPDATE_MYSQL = """
INSERT INTO completion_blockcompletion
(user_id, course_key, block_key, block_type, completion)
VALUES
(%s, %s, %s, %s, 1.0)
ON DUPLICATE KEY UPDATE
completion=VALUES(completion);
UPDATE_SQL = """
UPDATE completion_blockcompletion completion, progress_coursemodulecompletion progress
SET completion.created = progress.created,
completion.modified = progress.modified
WHERE completion.user_id = progress.user_id
AND completion.block_key = progress.content_id
AND completion.course_key = progress.course_id
AND completion.id IN %(ids)s;
"""

INSERT_OR_UPDATE_SQLITE = """
INSERT OR REPLACE
INTO completion_blockcompletion
(user_id, course_key, block_key, block_type, completion)
VALUES
(%s, %s, %s, %s, 1.0);
"""


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -80,65 +58,49 @@ def update_aggregators(username, course_key, block_keys=(), force=False):

course_key = CourseKey.from_string(course_key)
block_keys = set(UsageKey.from_string(key).map_into_course(course_key) for key in block_keys)
log.info("Updating aggregators in %s for %s. Changed blocks: %s", course_key, user.username, block_keys)
log.info(
"Updating aggregators in %s for %s. Changed blocks: %s", course_key, user.username, block_keys,
)
return core.update_aggregators(user, course_key, block_keys, force)


@shared_task
def migrate_batch(start, stop): # Cannot pass a queryset to a task.
def migrate_batch(batch_size, delay_between_tasks):
"""
Convert a batch of CourseModuleCompletions to BlockCompletions.
Wraps _migrate_batch to simplify testing.
"""
_migrate_batch(batch_size, delay_between_tasks)

Given a starting ID and a stopping ID, this task will:

def _migrate_batch(batch_size, delay_between_tasks):
"""
Convert a batch of CourseModuleCompletions to BlockCompletions.
Given a starting ID and a stopping ID, this task will:
* Fetch all CourseModuleCompletions with an ID in range(start_id, stop_id).
* Update the BlockCompletion table with those CourseModuleCompletion
records.
"""
if not PROGRESS_IMPORTED:
log.error("Cannot perform migration: CourseModuleCompletion not importable.")

queryset = CourseModuleCompletion.objects.all().select_related('user')
course_module_completions = queryset.filter(id__gte=start, id__lt=stop)

processed = {} # Dict has format: {course: {user: [blocks]}
insert_params = []
for cmc in course_module_completions:
try:
course_key = CourseKey.from_string(cmc.course_id)
block_key = UsageKey.from_string(cmc.content_id).map_into_course(course_key)
block_type = block_key.block_type
except InvalidKeyError:
log.exception(
"Could not migrate CourseModuleCompletion with values: %s",
cmc.__dict__,
)
continue
if course_key not in processed:
processed[course_key] = set()
if cmc.user not in processed[course_key]:
processed[course_key].add(cmc.user)
# Param order: (user_id, course_key, block_key, block_type)
insert_params.append((cmc.user_id, cmc.course_id, cmc.content_id, block_type))
if connection.vendor == 'mysql':
sql = INSERT_OR_UPDATE_MYSQL
else:
sql = INSERT_OR_UPDATE_SQLITE
with connection.cursor() as cur:
cur.executemany(sql, insert_params)
# Create aggregators later.
stale_completions = []
for course_key in processed:
for user in processed[course_key]:
stale_completions.append(
StaleCompletion(
username=user.username,
course_key=course_key,
block_key=None,
force=True

def get_next_id_batch():
while True:
with connection.cursor() as cur:
count = cur.execute(
"""
SELECT id
FROM completion_blockcompletion
WHERE NOT completion_blockcompletion.modified
LIMIT %(batch_size)s;
""",
{"batch_size": batch_size},
)
)
StaleCompletion.objects.bulk_create(
stale_completions,
)
log.info("Completed progress migration batch from %s to %s", start, stop)
ids = [row[0] for row in cur.fetchall()]
if count == 0:
break
yield ids

with connection.cursor() as cur:
count = 0
for ids in get_next_id_batch():
count = cur.execute(UPDATE_SQL, {"ids": ids},)
time.sleep(delay_between_tasks)
log.info("Completed progress updation batch of %s objects", count)
25 changes: 16 additions & 9 deletions requirements/base.in
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# Core requirements for using this application

celery==3.1.18 # Asynchronous tasks
edx-celeryutils<0.2 # Custom task extensions
Django>=1.8 # Web application framework
-c constraints.txt


celery==3.1.18 # Asynchronous tasks
edx-celeryutils<0.2 # Custom task extensions
Django>=1.8 # Web application framework
django-oauth-toolkit<1.0
djangorestframework>=3.0,<3.7 # API tools
django-model-utils # Provides TimeStampedModel abstract base class
edx-opaque-keys>=0.4.2 # Provides CourseKey and UsageKey
djangorestframework>=3.0,<3.7 # API tools
django-model-utils<3.2 # Provides TimeStampedModel abstract base class
edx-opaque-keys<2,>=0.4.2 # Provides CourseKey and UsageKey
edx-completion>=1.0.3,<2
mysqlclient # For connecting to MySQL
six
XBlock
mysqlclient # For connecting to MySQL
six # Python 2/3 compatibility stubs
# Limit for compatible django version:
XBlock<1.3
jsonfield<2.1
django-braces<1.14
django-waffle<0.16
1 change: 1 addition & 0 deletions requirements/constraints.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
futures; python_version < "3"
2 changes: 2 additions & 0 deletions requirements/dev.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Additional requirements for development of this application

-c constraints.txt

diff-cover # Changeset diff test coverage
edx-lint # For updating pylintrc
edx-i18n-tools # For i18n_tool dummy
Expand Down
Loading

0 comments on commit 4277565

Please sign in to comment.