Skip to content

Commit

Permalink
Merge pull request #362 from openedx/cag/backfill-command-test
Browse files Browse the repository at this point in the history
feat: allow to use any configured engine to replay tracking logs
  • Loading branch information
Ian2012 authored Mar 22, 2024
2 parents 40b3fdd + 19d6132 commit ac59cec
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Change Log
Unreleased
~~~~~~~~~~

[8.3.0]

* Allow to use any configured engine to replay tracking logs

[8.2.0]

* Add support for batching for EventsRouter.
Expand Down
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.2.0'
__version__ = '8.3.0'
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
from io import BytesIO
from time import sleep

from event_routing_backends.backends.events_router import EventsRouter
from eventtracking.tracker import get_tracker

from event_routing_backends.management.commands.helpers.event_log_parser import parse_json_event
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor
from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor


class QueuedSender:
Expand Down Expand Up @@ -43,23 +41,15 @@ def __init__(
self.unparsable_lines = 0
self.batches_sent = 0

if self.transformer_type == "xapi":
self.router = EventsRouter(
backend_name=RouterConfiguration.XAPI_BACKEND,
processors=[XApiProcessor()]
)
else:
self.router = EventsRouter(
backend_name=RouterConfiguration.CALIPER_BACKEND,
processors=[CaliperProcessor()]
)
self.tracker = get_tracker()
self.engine = self.tracker.backends[self.transformer_type]

def is_known_event(self, event):
"""
Check whether any processor cares about this event.
"""
if "name" in event:
for processor in self.router.processors:
for processor in self.engine.processors:
if event["name"] in processor.registry.mapping:
return True
return False
Expand Down Expand Up @@ -108,7 +98,8 @@ def send(self):
"""
if self.destination == "LRS":
print(f"Sending {len(self.event_queue)} events to LRS...")
self.router.bulk_send(self.event_queue)
for backend in self.engine.backends.values():
backend.bulk_send(self.event_queue)
else:
print("Skipping send, we're storing with libcloud instead of an LRS.")

Expand All @@ -133,7 +124,7 @@ def store(self):

out = BytesIO()
for event in self.event_queue:
transformed_event = self.router.processors[0](event)
transformed_event = self.engine.processors[0](event)
out.write(str.encode(json.dumps(transformed_event)))
out.write(str.encode("\n"))
out.seek(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@

import pytest
from django.core.management import call_command
from django.test.utils import override_settings
from eventtracking.backends.async_routing import AsyncRoutingBackend
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.django.django_tracker import override_default_tracker
from eventtracking.tracker import get_tracker
from libcloud.storage.types import ContainerDoesNotExistError

import event_routing_backends.management.commands.transform_tracking_logs as transform_tracking_logs
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender
from event_routing_backends.management.commands.transform_tracking_logs import (
_get_chunks,
Expand All @@ -19,6 +25,11 @@
validate_source_and_files,
)

override_default_tracker()

tracker = get_tracker()


LOCAL_CONFIG = json.dumps({"key": "/openedx/", "container": "data", "prefix": ""})
REMOTE_CONFIG = json.dumps({
"key": "api key",
Expand All @@ -37,11 +48,9 @@ def mock_common_calls():
Mock out calls that we test elsewhere and aren't relevant to the command tests.
"""
command_path = "event_routing_backends.management.commands.transform_tracking_logs"
helper_path = "event_routing_backends.management.commands.helpers"
with patch(command_path+".Provider") as mock_libcloud_provider:
with patch(command_path+".get_driver") as mock_libcloud_get_driver:
with patch(helper_path + ".queued_sender.EventsRouter") as mock_eventsrouter:
yield mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter
yield mock_libcloud_provider, mock_libcloud_get_driver


def command_options():
Expand Down Expand Up @@ -180,7 +189,7 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
"""
Test the command and QueuedSender with a variety of options.
"""
mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls
mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls

expected_results = command_opts.pop("expected_results")
transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024*1024*2)
Expand All @@ -202,16 +211,15 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
mm2.registry.mapping = {"problem_check": 1}
# Fake a process response that can be serialized to json
mm2.return_value = {"foo": "bar"}
mock_eventsrouter.return_value.processors = [mm2]
tracker.backends["xapi"].processors = [mm2]
for backend in tracker.backends["xapi"].backends.values():
backend.bulk_send = MagicMock()

call_command(
'transform_tracking_logs',
**command_opts
)

# Router should only be set up once
assert mock_eventsrouter.call_count == 1

captured = capsys.readouterr()
print(captured.out)

Expand Down Expand Up @@ -290,7 +298,7 @@ def test_invalid_libcloud_source_driver(capsys, mock_common_calls):
"""
Check error cases when non-existent libcloud drivers are passed in.
"""
mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls
mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls

mock_libcloud_get_driver.side_effect = [AttributeError(), MagicMock()]

Expand All @@ -303,7 +311,7 @@ def test_invalid_libcloud_source_driver(capsys, mock_common_calls):


def test_invalid_libcloud_dest_driver(capsys, mock_common_calls):
mock_libcloud_provider, mock_libcloud_get_driver, mock_eventsrouter = mock_common_calls
mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls

mock_libcloud_get_driver.side_effect = [MagicMock(), AttributeError()]
with pytest.raises(AttributeError):
Expand Down
43 changes: 42 additions & 1 deletion test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,46 @@ def root(*args):
XAPI_AGENT_IFI_TYPE = 'external_id'
EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False
EVENT_ROUTING_BACKEND_BATCH_INTERVAL = 100

EVENT_TRACKING_ENABLED = True
EVENT_TRACKING_BACKENDS = {
"xapi": {
"ENGINE": "eventtracking.backends.async_routing.AsyncRoutingBackend",
"OPTIONS": {
"backends": {
"xapi": {
"ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter",
"OPTIONS": {
"processors": [
{
"ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor",
"OPTIONS": {},
}
],
"backend_name": "xapi",
},
},
},
},
},
"caliper": {
"ENGINE": "eventtracking.backends.async_routing.AsyncRoutingBackend",
"OPTIONS": {
"backends": {
"caliper": {
"ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter",
"OPTIONS": {
"processors": [
{
"ENGINE": "event_routing_backends.processors."
"caliper.transformer_processor.CaliperProcessor",
"OPTIONS": {},
},
],
"backend_name": "caliper",
},
},
}
},
},
}
_mock_third_party_modules()

0 comments on commit ac59cec

Please sign in to comment.