Skip to content

Commit

Permalink
Merge pull request #408 from openedx/bmtcril/batching_fixes
Browse files Browse the repository at this point in the history
fix: Batching failures when individual event errors bubble up
  • Loading branch information
Ian2012 authored Apr 1, 2024
2 parents 393cd0b + 5a555f1 commit a8952cf
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.3.0'
__version__ = '8.3.1'
15 changes: 14 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,17 @@ def queue_event(self, redis, event):

if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis):
batch = redis.rpop(self.queue_name, queue_size)

orig_size = len(batch)
# Deduplicate list, in some misconfigured cases tracking events can be emitted to the
# bus twice, causing them to be processed twice, which LRSs will reject.
# See: https://github.com/openedx/event-routing-backends/issues/410
batch = [i for n, i in enumerate(batch) if i not in batch[n + 1:]]
final_size = len(batch)

if final_size != orig_size: # pragma: no cover
logger.warning(f"{orig_size - final_size} duplicate events in event-routing-backends batch queue! "
f"This is a likely due to misconfiguration of EVENT_TRACKING_BACKENDS.")
return batch

return None
Expand All @@ -237,7 +248,9 @@ def time_to_send(self, redis):
if not last_sent:
return True
time_passed = (datetime.now() - datetime.fromisoformat(last_sent.decode('utf-8')))
return time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL)
ready = time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL)

return ready

def process_event(self, event):
"""
Expand Down
10 changes: 3 additions & 7 deletions event_routing_backends/processors/caliper/tests/test_caliper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from django.test import SimpleTestCase
from django.test.utils import override_settings
from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled
from mock import MagicMock, call, patch, sentinel

from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor
Expand All @@ -29,13 +28,11 @@ def setUp(self):

@override_settings(CALIPER_EVENTS_ENABLED=False)
def test_skip_event_when_disabled(self):
with self.assertRaises(NoBackendEnabled):
self.processor(self.sample_event)
self.assertFalse(self.processor(self.sample_event))

@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_no_transformer_implemented(self, mocked_logger):
with self.assertRaises(EventEmissionExit):
self.processor([self.sample_event])
self.assertFalse(self.processor([self.sample_event]))

mocked_logger.error.assert_called_once_with(
'Could not get transformer for %s event.',
Expand Down Expand Up @@ -130,6 +127,5 @@ def test_send_method_with_successfull_flow_logging_disabled(
def test_with_no_registry(self, mocked_logger):
backend = CaliperProcessor()
backend.registry = None
with self.assertRaises(EventEmissionExit):
self.assertIsNone(backend([self.sample_event]))
self.assertFalse(backend([self.sample_event]))
mocked_logger.exception.assert_called_once()
4 changes: 1 addition & 3 deletions event_routing_backends/processors/mixins/base_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ def get_data(self, key, required=False):
result = None

if result is None:
if not required:
logger.warning('Could not get value for %s in event "%s"', key, self.event.get('name', None))
else:
if required:
raise ValueError(
'Could not get value for {} in event "{}"'.format(key, self.event.get('name', None))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from logging import getLogger

from eventtracking.processors.exceptions import EventEmissionExit, NoTransformerImplemented
from eventtracking.processors.exceptions import NoBackendEnabled, NoTransformerImplemented

logger = getLogger(__name__)

Expand Down Expand Up @@ -34,13 +34,18 @@ def __call__(self, events):
"""
returned_events = []
for event in events:
transformed_event = self.transform_event(event)
if not transformed_event:
raise EventEmissionExit
if isinstance(transformed_event, list):
returned_events += transformed_event
else:
returned_events.append(transformed_event)
try:
transformed_event = self.transform_event(event)
if not transformed_event:
pass
elif isinstance(transformed_event, list):
returned_events += transformed_event
else:
returned_events.append(transformed_event)

# If the backend isn't enabled at all, early out
except NoBackendEnabled:
break
return returned_events

def transform_event(self, event):
Expand All @@ -60,7 +65,6 @@ def transform_event(self, event):

try:
transformed_event = self.get_transformed_event(event)

except NoTransformerImplemented:
logger.error('Could not get transformer for %s event.', event_name)
return None
Expand Down
14 changes: 4 additions & 10 deletions event_routing_backends/processors/xapi/tests/test_xapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from django.test import SimpleTestCase
from django.test.utils import override_settings
from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled
from mock import MagicMock, call, patch, sentinel
from tincan import Activity, Statement

Expand All @@ -25,13 +24,11 @@ def setUp(self):

@override_settings(XAPI_EVENTS_ENABLED=False)
def test_skip_event_when_disabled(self):
with self.assertRaises(NoBackendEnabled):
self.processor(self.sample_event)
self.assertFalse(self.processor(self.sample_event))

@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_no_transformer_implemented(self, mocked_logger):
with self.assertRaises(EventEmissionExit):
self.processor([self.sample_event])
self.assertFalse(self.processor([self.sample_event]))

mocked_logger.error.assert_called_once_with(
'Could not get transformer for %s event.',
Expand Down Expand Up @@ -91,9 +88,7 @@ def test_send_method_with_invalid_object(self, mocked_logger, mocked_get_transfo
mocked_transformer.transform.return_value = transformed_event
mocked_get_transformer.return_value = mocked_transformer

with self.assertRaises(EventEmissionExit):
self.processor([self.sample_event])

self.assertFalse(self.processor([self.sample_event]))
self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls)

@override_settings(XAPI_EVENT_LOGGING_ENABLED=False)
Expand All @@ -116,6 +111,5 @@ def test_send_method_with_successfull_flow_no_logger(self, mocked_logger, mocked
def test_with_no_registry(self, mocked_logger):
backend = XApiProcessor()
backend.registry = None
with self.assertRaises(EventEmissionExit):
self.assertIsNone(backend([self.sample_event]))
self.assertFalse(backend([self.sample_event]))
mocked_logger.exception.assert_called_once()

0 comments on commit a8952cf

Please sign in to comment.