diff --git a/event_routing_backends/backends/async_events_router.py b/event_routing_backends/backends/async_events_router.py new file mode 100644 index 00000000..c25e938d --- /dev/null +++ b/event_routing_backends/backends/async_events_router.py @@ -0,0 +1,49 @@ +""" +Events router to send events to hosts via celery. + +This events router will trigger a celery task to send the events to the +configured hosts. +""" +from event_routing_backends.backends.events_router import EventsRouter +from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent + + +class AsyncEventsRouter(EventsRouter): + """ + Router to send events to hosts via celery using requests library. + """ + + def dispatch_event(self, event_name, updated_event, router_type, host_configurations): + """ + Dispatch the event to the configured router. + + Arguments: + event_name (str): name of the original event. + updated_event (dict): processed event dictionary + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + dispatch_event.delay(event_name, updated_event, router_type, host_configurations) + + def dispatch_bulk_events(self, events, router_type, host_configurations): + """ + Dispatch the a list of events to the configured router in bulk. + + Arguments: + events (list[dict]): list of processed event dictionaries + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + dispatch_bulk_events.delay(events, router_type, host_configurations) + + def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations): + """ + Dispatch the event to the configured router providing persistent storage. + + Arguments: + event_name (str): name of the original event. + updated_event (dict): processed event dictionary + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + dispatch_event_persistent.delay(event_name, updated_event, router_type, host_configurations) diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 6df1dcdb..85832f4d 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -7,7 +7,6 @@ from event_routing_backends.helpers import get_business_critical_events from event_routing_backends.models import RouterConfiguration -from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent logger = logging.getLogger(__name__) @@ -17,18 +16,16 @@ class EventsRouter: Router to send events to hosts using requests library. """ - def __init__(self, processors=None, backend_name=None, sync=False): + def __init__(self, processors=None, backend_name=None): """ Initialize the router. Arguments: processors (list): list of processor instances backend_name (str): name of the router backend - sync (bool): whether to send events synchronously or in celery tasks """ self.processors = processors if processors else [] self.backend_name = backend_name - self.sync = sync def configure_host(self, host, router): """ @@ -140,7 +137,7 @@ def bulk_send(self, events): prepared_events.append(updated_event) if prepared_events: # pragma: no cover - dispatch_bulk_events.delay( + self.dispatch_bulk_events( prepared_events, host['router_type'], host['host_configurations'] @@ -161,17 +158,15 @@ def send(self, event): for events_for_route in event_routes.values(): for event_name, updated_event, host, is_business_critical in events_for_route: - func = dispatch_event_persistent if is_business_critical else dispatch_event - func = func if self.sync else func.delay if is_business_critical: - func( + self.dispatch_event_persistent( event_name, updated_event, host['router_type'], host['host_configurations'], ) else: - func( + self.dispatch_event( event_name, updated_event, host['router_type'], @@ -219,3 +214,38 @@ def overwrite_event_data(self, event, host, event_name): host['override_args'] )) return event + + def dispatch_event(self, event_name, updated_event, router_type, host_configurations): + """ + Dispatch the event to the configured router. + + Arguments: + event_name (str): name of the original event. + updated_event (dict): processed event dictionary + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + raise NotImplementedError('dispatch_event is not implemented') + + def dispatch_bulk_events(self, events, router_type, host_configurations): + """ + Dispatch the a list of events to the configured router in bulk. + + Arguments: + events (list[dict]): list of processed event dictionaries + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + raise NotImplementedError('dispatch_bulk_events is not implemented') + + def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations): + """ + Dispatch the event to the configured router providing persistent storage. + + Arguments: + event_name (str): name of the original event. + updated_event (dict): processed event dictionary + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + raise NotImplementedError('dispatch_event_persistent is not implemented') diff --git a/event_routing_backends/backends/sync_events_router.py b/event_routing_backends/backends/sync_events_router.py new file mode 100644 index 00000000..8fe7f68d --- /dev/null +++ b/event_routing_backends/backends/sync_events_router.py @@ -0,0 +1,51 @@ +""" +Events router to send events to hosts in sync mode. + +This router is expected to be used with the event bus, which +can be configured to use this router to send events to hosts +in the same thread as it process the events. +""" +from event_routing_backends.backends.events_router import EventsRouter +from event_routing_backends.tasks import bulk_send_events, send_event + + +class SyncEventsRouter(EventsRouter): + """ + Router to send events to hosts using requests library. + """ + + def dispatch_event(self, event_name, updated_event, router_type, host_configurations): + """ + Dispatch the event to the configured router. + + Arguments: + event_name (str): name of the original event. + updated_event (dict): processed event dictionary + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + send_event(None, event_name, updated_event, router_type, host_configurations) + + def dispatch_bulk_events(self, events, router_type, host_configurations): + """ + Dispatch the a list of events to the configured router in bulk. + + Arguments: + events (list[dict]): list of processed event dictionaries + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + bulk_send_events(None, events, router_type, host_configurations) + + def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations): + """ + Dispatch the event to the configured router providing persistent storage. + In this case, the event bus is expected to provide the persistent storage layer. + + Arguments: + event_name (str): name of the original event. + updated_event (dict): processed event dictionary + router_type (str): type of the router + host_configurations (dict): host configurations dict + """ + self.dispatch_event(event_name, updated_event, router_type, host_configurations) diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index a8bc1ccc..8f46f208 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -10,7 +10,9 @@ from eventtracking.processors.exceptions import EventEmissionExit from tincan.statement import Statement +from event_routing_backends.backends.async_events_router import AsyncEventsRouter from event_routing_backends.backends.events_router import EventsRouter +from event_routing_backends.backends.sync_events_router import SyncEventsRouter from event_routing_backends.helpers import get_business_critical_events from event_routing_backends.models import RouterConfiguration from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched @@ -146,8 +148,6 @@ def setUp(self): } ] - self.router = EventsRouter(processors=[], backend_name='test') - @patch('event_routing_backends.utils.http_client.requests.post') @patch('event_routing_backends.backends.events_router.logger') @patch('event_routing_backends.models.RouterConfiguration.get_enabled_routers') @@ -177,31 +177,6 @@ def test_with_processor_exception(self, mocked_get_enabled_routers, mocked_logge exc_info=True ), mocked_logger.error.mock_calls) - @patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', { - 'AUTH_HEADERS': MagicMock(side_effect=EventNotDispatched) - }) - @patch('event_routing_backends.utils.http_client.requests.post') - @patch('event_routing_backends.tasks.logger') - def test_generic_exception_business_critical_event(self, mocked_logger, mocked_post): - RouterConfigurationFactory.create( - backend_name=RouterConfiguration.XAPI_BACKEND, - enabled=True, - route_url='http://test3.com', - auth_scheme=RouterConfiguration.AUTH_BEARER, - auth_key='test_key', - configurations=ROUTER_CONFIG_FIXTURE[0] - ) - - router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) - event_data = self.transformed_event.copy() - business_critical_events = get_business_critical_events() - event_data['name'] = business_critical_events[0] - router.send(event_data) - - self.assertEqual(mocked_logger.exception.call_count, - getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) - mocked_post.assert_not_called() - @patch('event_routing_backends.utils.http_client.requests.post') @patch('event_routing_backends.backends.events_router.logger') def test_with_no_router_configurations_available(self, mocked_logger, mocked_post): @@ -216,27 +191,81 @@ def test_with_no_router_configurations_available(self, mocked_logger, mocked_pos ) @patch('event_routing_backends.utils.http_client.requests.post') - @patch('event_routing_backends.tasks.logger') - def test_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): - RouterConfigurationFactory.create( + @patch('event_routing_backends.backends.events_router.logger') + def test_with_no_available_hosts(self, mocked_logger, mocked_post): + router_config = RouterConfigurationFactory.create( backend_name='test_backend', enabled=True, route_url='http://test3.com', - auth_scheme=RouterConfiguration.AUTH_BEARER, - auth_key='test_key', - configurations=ROUTER_CONFIG_FIXTURE[0] + configurations=ROUTER_CONFIG_FIXTURE[1] ) router = EventsRouter(processors=[], backend_name='test_backend') TieredCache.dangerous_clear_all_tiers() router.send(self.transformed_event) - mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') mocked_post.assert_not_called() + self.assertIn( + call( + 'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"', + self.transformed_event['name'], router_config.pk, 'test_backend' + ), + mocked_logger.info.mock_calls + ) + + def test_with_non_dict_event(self): + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url='http://test3.com', + configurations=ROUTER_CONFIG_FIXTURE[3] + ) + router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + transformed_event = Statement() + with self.assertRaises(ValueError): + router.send(transformed_event) + + def test_unsuccessful_routing_of_event(self): + host_configurations = { + 'url': 'http://test3.com', + 'version': '1.0.1', + 'auth_scheme': 'bearer', + 'auth_key': 'key', + } + client = LrsClient(**host_configurations) + with self.assertRaises(EventNotDispatched): + client.send(event_name='test', statement_data={}) + + @patch('event_routing_backends.utils.xapi_lrs_client.logger') + def test_duplicate_xapi_event_id(self, mocked_logger): + """ + Test that when we receive a 409 response when inserting an XAPI statement + we do not raise an exception, but do log it. + """ + mock_duplicate_return = MagicMock() + mock_duplicate_return.success = False + mock_duplicate_return.response.status = 409 + + client = LrsClient({}) + client.lrs_client = MagicMock() + client.lrs_client.save_statement.return_value = mock_duplicate_return + + client.send(event_name='test', statement_data={}) + self.assertIn( + call('Event test received a 409 error indicating the event id already exists.'), + mocked_logger.info.mock_calls + ) + + +@ddt.ddt +class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests + """ + Test the AsyncEventsRouter + """ @patch('event_routing_backends.utils.http_client.requests.post') @patch('event_routing_backends.tasks.logger') - def test_bulk_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): + def test_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): RouterConfigurationFactory.create( backend_name='test_backend', enabled=True, @@ -246,37 +275,32 @@ def test_bulk_with_unsupported_routing_strategy(self, mocked_logger, mocked_post configurations=ROUTER_CONFIG_FIXTURE[0] ) - router = EventsRouter(processors=[], backend_name='test_backend') + router = AsyncEventsRouter(processors=[], backend_name='test_backend') TieredCache.dangerous_clear_all_tiers() - router.bulk_send([self.transformed_event]) + router.send(self.transformed_event) mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') mocked_post.assert_not_called() @patch('event_routing_backends.utils.http_client.requests.post') - @patch('event_routing_backends.backends.events_router.logger') - def test_with_no_available_hosts(self, mocked_logger, mocked_post): - router_config = RouterConfigurationFactory.create( + @patch('event_routing_backends.tasks.logger') + def test_bulk_with_unsupported_routing_strategy(self, mocked_logger, mocked_post): + RouterConfigurationFactory.create( backend_name='test_backend', enabled=True, route_url='http://test3.com', - configurations=ROUTER_CONFIG_FIXTURE[1] + auth_scheme=RouterConfiguration.AUTH_BEARER, + auth_key='test_key', + configurations=ROUTER_CONFIG_FIXTURE[0] ) - router = EventsRouter(processors=[], backend_name='test_backend') + router = AsyncEventsRouter(processors=[], backend_name='test_backend') TieredCache.dangerous_clear_all_tiers() - router.send(self.transformed_event) + router.bulk_send([self.transformed_event]) + mocked_logger.error.assert_called_once_with('Unsupported routing strategy detected: INVALID_TYPE') mocked_post.assert_not_called() - self.assertIn( - call( - 'Event %s is not allowed to be sent to any host for router ID %s with backend "%s"', - self.transformed_event['name'], router_config.pk, 'test_backend' - ), - mocked_logger.info.mock_calls - ) - @ddt.data( ( RouterConfiguration.XAPI_BACKEND, @@ -299,7 +323,7 @@ def test_generic_exception(self, backend_name, mocked_logger, mocked_post): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=backend_name) + router = AsyncEventsRouter(processors=[], backend_name=backend_name) router.send(self.transformed_event) if backend_name == RouterConfiguration.CALIPER_BACKEND: self.assertEqual(mocked_logger.exception.call_count, @@ -325,7 +349,7 @@ def test_failed_bulk_post(self, mocked_logger, mocked_post): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + router = AsyncEventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) router.bulk_send([self.transformed_event]) self.assertEqual(mocked_logger.exception.call_count, @@ -350,7 +374,7 @@ def test_failed_post(self, mocked_logger, mocked_post): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + router = AsyncEventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) router.send(self.transformed_event) self.assertEqual(mocked_logger.exception.call_count, @@ -377,7 +401,7 @@ def test_failed_bulk_routing(self, mocked_logger, mocked_remote_lrs): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router = AsyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) router.bulk_send([self.transformed_event]) self.assertEqual(mocked_logger.exception.call_count, @@ -404,7 +428,7 @@ def test_failed_routing(self, mocked_logger, mocked_remote_lrs): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router = AsyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) router.send(self.transformed_event) self.assertEqual(mocked_logger.exception.call_count, @@ -431,7 +455,7 @@ def test_duplicate_ids_in_bulk(self, mocked_logger, mocked_remote_lrs): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) + router = AsyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) router.bulk_send([self.transformed_event]) self.assertEqual(mocked_logger.exception.call_count, 0) @@ -459,7 +483,7 @@ def test_bulk_generic_exception(self, backend_name, mocked_logger, mocked_post): configurations=ROUTER_CONFIG_FIXTURE[2] ) - router = EventsRouter(processors=[], backend_name=backend_name) + router = AsyncEventsRouter(processors=[], backend_name=backend_name) router.bulk_send([self.transformed_event]) if backend_name == RouterConfiguration.CALIPER_BACKEND: self.assertEqual(mocked_logger.exception.call_count, @@ -468,17 +492,30 @@ def test_bulk_generic_exception(self, backend_name, mocked_logger, mocked_post): else: mocked_logger.exception.assert_not_called() - def test_with_non_dict_event(self): + @patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', { + 'AUTH_HEADERS': MagicMock(side_effect=EventNotDispatched) + }) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.tasks.logger') + def test_generic_exception_business_critical_event(self, mocked_logger, mocked_post): RouterConfigurationFactory.create( backend_name=RouterConfiguration.XAPI_BACKEND, enabled=True, route_url='http://test3.com', - configurations=ROUTER_CONFIG_FIXTURE[3] + auth_scheme=RouterConfiguration.AUTH_BEARER, + auth_key='test_key', + configurations=ROUTER_CONFIG_FIXTURE[0] ) - router = EventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) - transformed_event = Statement() - with self.assertRaises(ValueError): - router.send(transformed_event) + + router = AsyncEventsRouter(processors=[], backend_name=RouterConfiguration.CALIPER_BACKEND) + event_data = self.transformed_event.copy() + business_critical_events = get_business_critical_events() + event_data['name'] = business_critical_events[0] + router.send(event_data) + + self.assertEqual(mocked_logger.exception.call_count, + getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 3) + 1) + mocked_post.assert_not_called() @ddt.data( (RouterConfiguration.AUTH_BASIC, @@ -563,7 +600,7 @@ def test_successful_routing_of_event( configurations=ROUTER_CONFIG_FIXTURE[0] ) - router = EventsRouter(processors=[], backend_name=backend_name) + router = AsyncEventsRouter(processors=[], backend_name=backend_name) with patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', MOCKED_MAP): router.send(self.transformed_event) @@ -611,37 +648,6 @@ def test_successful_routing_of_event( # test mocked oauth client mocked_oauth_client.assert_not_called() - def test_unsuccessful_routing_of_event(self): - host_configurations = { - 'url': 'http://test3.com', - 'version': '1.0.1', - 'auth_scheme': 'bearer', - 'auth_key': 'key', - } - client = LrsClient(**host_configurations) - with self.assertRaises(EventNotDispatched): - client.send(event_name='test', statement_data={}) - - @patch('event_routing_backends.utils.xapi_lrs_client.logger') - def test_duplicate_xapi_event_id(self, mocked_logger): - """ - Test that when we receive a 409 response when inserting an XAPI statement - we do not raise an exception, but do log it. - """ - mock_duplicate_return = MagicMock() - mock_duplicate_return.success = False - mock_duplicate_return.response.status = 409 - - client = LrsClient({}) - client.lrs_client = MagicMock() - client.lrs_client.save_statement.return_value = mock_duplicate_return - - client.send(event_name='test', statement_data={}) - self.assertIn( - call('Event test received a 409 error indicating the event id already exists.'), - mocked_logger.info.mock_calls - ) - @patch('event_routing_backends.utils.http_client.requests.post') def test_unsuccessful_routing_of_event_http(self, mocked_post): mock_response = MagicMock() @@ -742,7 +748,288 @@ def test_successful_routing_of_bulk_events( configurations=ROUTER_CONFIG_FIXTURE[0] ) - router = EventsRouter(processors=[], backend_name=backend_name) + router = AsyncEventsRouter(processors=[], backend_name=backend_name) + + with patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', MOCKED_MAP): + router.bulk_send(self.bulk_transformed_events) + + overridden_events = self.bulk_transformed_events.copy() + + for event in overridden_events: + event['new_key'] = 'new_value' + + if backend_name == RouterConfiguration.XAPI_BACKEND: + # test LRS Client + mocked_lrs().save_statements.assert_has_calls([ + call(overridden_events), + ]) + else: + # test the HTTP client + if auth_scheme == RouterConfiguration.AUTH_BASIC: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + }, + auth=(username, password) + ), + ]) + elif auth_scheme == RouterConfiguration.AUTH_BEARER: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + 'Authorization': RouterConfiguration.AUTH_BEARER + ' ' + auth_key + } + ), + ]) + else: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_events, + headers={ + }, + ), + ]) + + # test mocked oauth client + mocked_oauth_client.assert_not_called() + + +@ddt.ddt +class TestSyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests + """ + Test the SyncEventsRouter + """ + + @ddt.data( + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.CALIPER_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test3.com' + ), + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.XAPI_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test3.com' + ), + ) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @ddt.unpack + def test_successful_routing_of_event( + self, + auth_scheme, + auth_key, + username, + password, + backend_name, + route_url, + mocked_lrs, + mocked_post, + ): + TieredCache.dangerous_clear_all_tiers() + mocked_oauth_client = MagicMock() + mocked_api_key_client = MagicMock() + + MOCKED_MAP = { + 'AUTH_HEADERS': HttpClient, + 'OAUTH2': mocked_oauth_client, + 'API_KEY': mocked_api_key_client, + 'XAPI_LRS': LrsClient, + } + RouterConfigurationFactory.create( + backend_name=backend_name, + enabled=True, + route_url=route_url, + auth_scheme=auth_scheme, + auth_key=auth_key, + username=username, + password=password, + configurations=ROUTER_CONFIG_FIXTURE[0] + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.request.method = "POST" + mocked_post.return_value = mock_response + + router = SyncEventsRouter(processors=[], backend_name=backend_name) + + with patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', MOCKED_MAP): + router.send(self.transformed_event) + + overridden_event = self.transformed_event.copy() + overridden_event['new_key'] = 'new_value' + + if backend_name == RouterConfiguration.XAPI_BACKEND: + # test LRS Client + mocked_lrs().save_statement.assert_has_calls([ + call(overridden_event), + ]) + else: + # test the HTTP client + if auth_scheme == RouterConfiguration.AUTH_BASIC: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_event, + headers={ + }, + auth=(username, password) + ), + ]) + elif auth_scheme == RouterConfiguration.AUTH_BEARER: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_event, + headers={ + 'Authorization': RouterConfiguration.AUTH_BEARER + ' ' + auth_key + } + ), + ]) + else: + mocked_post.assert_has_calls([ + call( + url=route_url, + json=overridden_event, + headers={ + }, + ), + ]) + + # test mocked oauth client + mocked_oauth_client.assert_not_called() + + @ddt.data( + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.CALIPER_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.CALIPER_BACKEND, + 'http://test3.com' + ), + (RouterConfiguration.AUTH_BASIC, + None, + 'abc', + 'xyz', + RouterConfiguration.XAPI_BACKEND, + 'http://test1.com' + ), + ( + RouterConfiguration.AUTH_BEARER, + 'test_key', + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test2.com' + ), + ( + None, + None, + None, + None, + RouterConfiguration.XAPI_BACKEND, + 'http://test3.com' + ), + ) + @patch('event_routing_backends.utils.http_client.requests.post') + @patch('event_routing_backends.utils.xapi_lrs_client.RemoteLRS') + @ddt.unpack + def test_successful_routing_of_bulk_events( + self, + auth_scheme, + auth_key, + username, + password, + backend_name, + route_url, + mocked_lrs, + mocked_post, + ): + TieredCache.dangerous_clear_all_tiers() + mocked_oauth_client = MagicMock() + mocked_api_key_client = MagicMock() + + MOCKED_MAP = { + 'AUTH_HEADERS': HttpClient, + 'OAUTH2': mocked_oauth_client, + 'API_KEY': mocked_api_key_client, + 'XAPI_LRS': LrsClient, + } + RouterConfigurationFactory.create( + backend_name=backend_name, + enabled=True, + route_url=route_url, + auth_scheme=auth_scheme, + auth_key=auth_key, + username=username, + password=password, + configurations=ROUTER_CONFIG_FIXTURE[0] + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.request.method = "POST" + mocked_post.return_value = mock_response + + router = SyncEventsRouter(processors=[], backend_name=backend_name) with patch.dict('event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING', MOCKED_MAP): router.bulk_send(self.bulk_transformed_events) diff --git a/event_routing_backends/handlers.py b/event_routing_backends/handlers.py index f3007504..dacf63f3 100644 --- a/event_routing_backends/handlers.py +++ b/event_routing_backends/handlers.py @@ -7,19 +7,30 @@ from django.dispatch import receiver from eventtracking.backends.event_bus import EventBusRoutingBackend from eventtracking.processors.exceptions import EventEmissionExit -from eventtracking.tasks import send_event from eventtracking.tracker import get_tracker -from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED +from openedx_events.analytics.signals import TRACKING_LOG_EVENT_EMITTED logger = logging.getLogger(__name__) -@receiver(TRACKING_EVENT_EMITTED) +@receiver(TRACKING_LOG_EVENT_EMITTED) def send_tracking_log_to_backends( sender, signal, **kwargs ): # pylint: disable=unused-argument """ - Listen for the TRACKING_EVENT_EMITTED signal and send the event to the enabled backends. + Listen for the TRACKING_LOG_EVENT_EMITTED signal and send the event to the enabled backends. + + The process is the following: + + 1. Unserialize the tracking log from the signal. + 2. Get the tracker instance to get the enabled backends (mongo, event_bus, logger, etc). + 3. Get the event bus backends that are the interested in the signals (multiple can be configured). + 4. Transform the event to xAPI or Caliper format. + 5. Send the transformed event to the different event bus backends. Any event bus backend can be configured + with multiples backends (xAPI or Caliper). + + This allows us to only send the tracking log to the event bus once and the event bus will send + the transformed event to the different configured backends. """ tracking_log = kwargs.get("tracking_log") @@ -41,6 +52,6 @@ def send_tracking_log_to_backends( try: processed_event = engine.process_event(event) logger.info('Successfully processed event "{}"'.format(event["name"])) - send_event(name, processed_event, True) + engine.send_to_backends(processed_event.copy()) except EventEmissionExit: logger.info("[EventEmissionExit] skipping event {}".format(event["name"])) diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index 96d7b4eb..9a597dc9 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -60,8 +60,6 @@ def plugin_settings(settings): 'edx.course.grade.passed.first_time' ] - settings.EVENT_TRACKING_BACKENDS_EVENT_BUS_ENABLED = True - settings.WHITELIST_EVENTS = { 'xapi': [ 'edx.course.enrollment.activated', @@ -174,7 +172,7 @@ def plugin_settings(settings): ], "backends": { 'xapi': { - 'ENGINE': 'event_routing_backends.backends.events_router.EventsRouter', + 'ENGINE': 'event_routing_backends.backends.async_events_router.AsyncEventsRouter', 'OPTIONS': { 'processors': [ { @@ -184,11 +182,10 @@ def plugin_settings(settings): } ], 'backend_name': 'xapi', - 'sync': True, } }, "caliper": { - "ENGINE": "event_routing_backends.backends.events_router.EventsRouter", + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", "OPTIONS": { "processors": [ { @@ -207,7 +204,6 @@ def plugin_settings(settings): } ], "backend_name": "caliper", - 'sync': True, } } } diff --git a/event_routing_backends/tasks.py b/event_routing_backends/tasks.py index 19e17e3c..58c70298 100644 --- a/event_routing_backends/tasks.py +++ b/event_routing_backends/tasks.py @@ -53,11 +53,11 @@ def send_event(task, event_name, event, router_type, host_config): Send event to configured client. Arguments: - task (object) : celery task object to perform celery actions - event_name (str) : name of the original event - event (dict) : event dictionary to be delivered. - router_type (str) : decides the client to use for sending the event - host_config (dict) : contains configurations for the host. + task (object, optional) : celery task object to perform celery actions + event_name (str) : name of the original event + event (dict) : event dictionary to be delivered. + router_type (str) : decides the client to use for sending the event + host_config (dict) : contains configurations for the host. """ try: client_class = ROUTER_STRATEGY_MAPPING[router_type] @@ -82,6 +82,11 @@ def send_event(task, event_name, event, router_type, host_config): ), exc_info=True ) + # If this function is called synchronously, we want to raise the exception + # to inform about errors. If it's called asynchronously, we want to retry + # the celery task till it succeeds or reaches max retries. + if not task: + raise exc raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), max_retries=getattr(settings, '' 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) @@ -106,10 +111,10 @@ def bulk_send_events(task, events, router_type, host_config): Send event to configured client. Arguments: - task (object) : celery task object to perform celery actions - events (list[dict]) : list of event dictionaries to be delivered. - router_type (str) : decides the client to use for sending the event - host_config (dict) : contains configurations for the host. + task (object, optional) : celery task object to perform celery actions + events (list[dict]) : list of event dictionaries to be delivered. + router_type (str) : decides the client to use for sending the event + host_config (dict) : contains configurations for the host. """ try: client_class = ROUTER_STRATEGY_MAPPING[router_type] @@ -134,5 +139,10 @@ def bulk_send_events(task, events, router_type, host_config): ), exc_info=True ) + # If this function is called synchronously, we want to raise the exception + # to inform about errors. If it's called asynchronously, we want to retry + # the celery task till it succeeds or reaches max retries. + if not task: + raise exc raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30), max_retries=getattr(settings, 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3)) diff --git a/event_routing_backends/tests/test_handlers.py b/event_routing_backends/tests/test_handlers.py index bbf56e38..0a27a78d 100644 --- a/event_routing_backends/tests/test_handlers.py +++ b/event_routing_backends/tests/test_handlers.py @@ -2,7 +2,7 @@ Test handlers for signals emitted by the analytics app """ -from unittest.mock import patch +from unittest.mock import Mock, patch from django.test import TestCase from django.test.utils import override_settings @@ -26,20 +26,16 @@ class TestHandlers(TestCase): } ) @patch("event_routing_backends.handlers.get_tracker") - @patch("event_routing_backends.handlers.isinstance") - @patch("event_routing_backends.handlers.send_event") def test_send_tracking_log_to_backends( - self, mock_send_event, mock_is_instance, mock_get_tracker + self, mock_get_tracker ): """ Test for send_tracking_log_to_backends """ tracker = DjangoTracker() mock_get_tracker.return_value = tracker - - mock_is_instance.return_value = True - - mock_send_event.return_value = None + mock_backend = Mock() + tracker.backends["event_bus"].send_to_backends = mock_backend send_tracking_log_to_backends( sender=None, @@ -52,15 +48,13 @@ def test_send_tracking_log_to_backends( ), ) - mock_send_event.assert_called_once_with( - "event_bus", + mock_backend.assert_called_once_with( { "name": "test_name", "timestamp": "test_timestamp", "data": {}, "context": {}, - }, - True, + } ) @override_settings(