diff --git a/event_routing_backends/__init__.py b/event_routing_backends/__init__.py index 75ba1222..fe9c8618 100644 --- a/event_routing_backends/__init__.py +++ b/event_routing_backends/__init__.py @@ -2,4 +2,4 @@ Various backends for receiving edX LMS events.. """ -__version__ = '8.3.0' +__version__ = '8.3.1' diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 1917adc5..c02d2db1 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -225,6 +225,17 @@ def queue_event(self, redis, event): if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis): batch = redis.rpop(self.queue_name, queue_size) + + orig_size = len(batch) + # Deduplicate list, in some misconfigured cases tracking events can be emitted to the + # bus twice, causing them to be processed twice, which LRSs will reject. + # See: https://github.com/openedx/event-routing-backends/issues/410 + batch = [i for n, i in enumerate(batch) if i not in batch[n + 1:]] + final_size = len(batch) + + if final_size != orig_size: # pragma: no cover + logger.warning(f"{orig_size - final_size} duplicate events in event-routing-backends batch queue! " + f"This is a likely due to misconfiguration of EVENT_TRACKING_BACKENDS.") return batch return None @@ -237,7 +248,9 @@ def time_to_send(self, redis): if not last_sent: return True time_passed = (datetime.now() - datetime.fromisoformat(last_sent.decode('utf-8'))) - return time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL) + ready = time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL) + + return ready def process_event(self, event): """ diff --git a/event_routing_backends/processors/caliper/tests/test_caliper.py b/event_routing_backends/processors/caliper/tests/test_caliper.py index a18bcddf..f2c48a16 100644 --- a/event_routing_backends/processors/caliper/tests/test_caliper.py +++ b/event_routing_backends/processors/caliper/tests/test_caliper.py @@ -5,7 +5,6 @@ from django.test import SimpleTestCase from django.test.utils import override_settings -from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled from mock import MagicMock, call, patch, sentinel from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor @@ -29,13 +28,11 @@ def setUp(self): @override_settings(CALIPER_EVENTS_ENABLED=False) def test_skip_event_when_disabled(self): - with self.assertRaises(NoBackendEnabled): - self.processor(self.sample_event) + self.assertFalse(self.processor(self.sample_event)) @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_send_method_with_no_transformer_implemented(self, mocked_logger): - with self.assertRaises(EventEmissionExit): - self.processor([self.sample_event]) + self.assertFalse(self.processor([self.sample_event])) mocked_logger.error.assert_called_once_with( 'Could not get transformer for %s event.', @@ -130,6 +127,5 @@ def test_send_method_with_successfull_flow_logging_disabled( def test_with_no_registry(self, mocked_logger): backend = CaliperProcessor() backend.registry = None - with self.assertRaises(EventEmissionExit): - self.assertIsNone(backend([self.sample_event])) + self.assertFalse(backend([self.sample_event])) mocked_logger.exception.assert_called_once() diff --git a/event_routing_backends/processors/mixins/base_transformer.py b/event_routing_backends/processors/mixins/base_transformer.py index a4c78743..d754b34f 100644 --- a/event_routing_backends/processors/mixins/base_transformer.py +++ b/event_routing_backends/processors/mixins/base_transformer.py @@ -175,9 +175,7 @@ def get_data(self, key, required=False): result = None if result is None: - if not required: - logger.warning('Could not get value for %s in event "%s"', key, self.event.get('name', None)) - else: + if required: raise ValueError( 'Could not get value for {} in event "{}"'.format(key, self.event.get('name', None)) ) diff --git a/event_routing_backends/processors/mixins/base_transformer_processor.py b/event_routing_backends/processors/mixins/base_transformer_processor.py index fa558771..17241793 100644 --- a/event_routing_backends/processors/mixins/base_transformer_processor.py +++ b/event_routing_backends/processors/mixins/base_transformer_processor.py @@ -3,7 +3,7 @@ """ from logging import getLogger -from eventtracking.processors.exceptions import EventEmissionExit, NoTransformerImplemented +from eventtracking.processors.exceptions import NoBackendEnabled, NoTransformerImplemented logger = getLogger(__name__) @@ -34,13 +34,18 @@ def __call__(self, events): """ returned_events = [] for event in events: - transformed_event = self.transform_event(event) - if not transformed_event: - raise EventEmissionExit - if isinstance(transformed_event, list): - returned_events += transformed_event - else: - returned_events.append(transformed_event) + try: + transformed_event = self.transform_event(event) + if not transformed_event: + pass + elif isinstance(transformed_event, list): + returned_events += transformed_event + else: + returned_events.append(transformed_event) + + # If the backend isn't enabled at all, early out + except NoBackendEnabled: + break return returned_events def transform_event(self, event): @@ -60,7 +65,6 @@ def transform_event(self, event): try: transformed_event = self.get_transformed_event(event) - except NoTransformerImplemented: logger.error('Could not get transformer for %s event.', event_name) return None diff --git a/event_routing_backends/processors/xapi/tests/test_xapi.py b/event_routing_backends/processors/xapi/tests/test_xapi.py index 7510f2e0..f612ef6c 100644 --- a/event_routing_backends/processors/xapi/tests/test_xapi.py +++ b/event_routing_backends/processors/xapi/tests/test_xapi.py @@ -5,7 +5,6 @@ from django.test import SimpleTestCase from django.test.utils import override_settings -from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled from mock import MagicMock, call, patch, sentinel from tincan import Activity, Statement @@ -25,13 +24,11 @@ def setUp(self): @override_settings(XAPI_EVENTS_ENABLED=False) def test_skip_event_when_disabled(self): - with self.assertRaises(NoBackendEnabled): - self.processor(self.sample_event) + self.assertFalse(self.processor(self.sample_event)) @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_send_method_with_no_transformer_implemented(self, mocked_logger): - with self.assertRaises(EventEmissionExit): - self.processor([self.sample_event]) + self.assertFalse(self.processor([self.sample_event])) mocked_logger.error.assert_called_once_with( 'Could not get transformer for %s event.', @@ -91,9 +88,7 @@ def test_send_method_with_invalid_object(self, mocked_logger, mocked_get_transfo mocked_transformer.transform.return_value = transformed_event mocked_get_transformer.return_value = mocked_transformer - with self.assertRaises(EventEmissionExit): - self.processor([self.sample_event]) - + self.assertFalse(self.processor([self.sample_event])) self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls) @override_settings(XAPI_EVENT_LOGGING_ENABLED=False) @@ -116,6 +111,5 @@ def test_send_method_with_successfull_flow_no_logger(self, mocked_logger, mocked def test_with_no_registry(self, mocked_logger): backend = XApiProcessor() backend.registry = None - with self.assertRaises(EventEmissionExit): - self.assertIsNone(backend([self.sample_event])) + self.assertFalse(backend([self.sample_event])) mocked_logger.exception.assert_called_once()