From f103064d1ab54679dfd784c90e09209e6d66e88f Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 5 Jun 2023 15:04:05 -0500 Subject: [PATCH] feat: add listener for tracking event emitted signal --- event_routing_backends/apps.py | 1 + event_routing_backends/config.py | 14 +++++++ event_routing_backends/settings/common.py | 1 + event_routing_backends/signals.py | 50 +++++++++++++++++++++++ 4 files changed, 66 insertions(+) create mode 100644 event_routing_backends/config.py create mode 100644 event_routing_backends/signals.py diff --git a/event_routing_backends/apps.py b/event_routing_backends/apps.py index f8cd99d4..9289460f 100644 --- a/event_routing_backends/apps.py +++ b/event_routing_backends/apps.py @@ -35,5 +35,6 @@ def ready(self): """ super().ready() # pylint: disable=import-outside-toplevel, unused-import + from event_routing_backends import signals # noqa: F401 from event_routing_backends.processors.caliper import event_transformers as caliper_event_transformers from event_routing_backends.processors.xapi import event_transformers as xapi_event_transformers diff --git a/event_routing_backends/config.py b/event_routing_backends/config.py new file mode 100644 index 00000000..fde1d207 --- /dev/null +++ b/event_routing_backends/config.py @@ -0,0 +1,14 @@ +""" +This module contains various configuration settings via +waffle switches for the Certificates app. +""" + +from edx_toggles.toggles import SettingToggle, WaffleSwitch + +# .. toggle_name: SEND_TRACKING_EVENT_EMITTED_SIGNAL +# .. toggle_implementation: SettingToggle +# .. toggle_default: False +# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The +# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted. +# .. toggle_use_cases: publish +SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle('SEND_TRACKING_EVENT_EMITTED_SIGNAL', default=False, module_name=__name__) diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index 3be8f012..8caa3e96 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -11,6 +11,7 @@ def plugin_settings(settings): settings.XAPI_EVENTS_ENABLED = True settings.EVENT_ROUTING_BACKEND_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 + settings.EVENT_ROUTING_BATCH_SIZE = 5 # .. setting_name: XAPI_AGENT_IFI_TYPE # .. setting_default: 'external_id' diff --git a/event_routing_backends/signals.py b/event_routing_backends/signals.py new file mode 100644 index 00000000..842acf84 --- /dev/null +++ b/event_routing_backends/signals.py @@ -0,0 +1,50 @@ +import json +import logging + +from django.conf import settings +from django.dispatch import receiver +from django_redis import get_redis_connection +from edx_django_utils.cache.utils import get_cache_key +from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED +from openedx_events.event_bus import get_producer + +from event_routing_backends.config import SEND_TRACKING_EVENT_EMITTED_SIGNAL + +logger = logging.getLogger(__name__) + +TRANSFORMED_EVENT_KEY_NAME = "transformed_events" + +@receiver(TRACKING_EVENT_EMITTED) +def listen_for_tracking_event_emitted_event(sender, signal, **kwargs): + """ + Publish `TRACKING_EVENT_EMITTED` events to the event bus. + """ + event = kwargs['tracking_log'] + + ## TODO: Should we ignore events that we don't care about here or in the event routing backend config? + + redis = get_redis_connection("default") + + queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME) + if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE: + queued_events = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, settings.EVENT_ROUTING_BATCH_SIZE) + queued_events.append(event) + ## TODO: Send events to event bus in batch + logger.info("Sending events to event bus in batch") + + #if SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled(): + # get_producer().send( + # signal=TRACKING_EVENT_EMITTED, + # topic='analytics', + # event_key_field='tracking_log.name', + # event_data={'tracking_log': kwargs['tracking_log']}, + # event_metadata=kwargs['metadata'] + # ) + else: + redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps({ + "name": event.name, + "timestamp": event.timestamp, + "data": event.data, + "context": event.context, + })) + logger.info("Event pushed to the queue, current size: %s", queue_size + 1)