Skip to content

Commit

Permalink
feat: add consumer for tracking event emitted signal
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Jun 5, 2023
1 parent 37a0814 commit f420d7d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 0 deletions.
1 change: 1 addition & 0 deletions event_routing_backends/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ def ready(self):
"""
super().ready()
# pylint: disable=import-outside-toplevel, unused-import
from event_routing_backends import signals # noqa: F401
from event_routing_backends.processors.caliper import event_transformers as caliper_event_transformers
from event_routing_backends.processors.xapi import event_transformers as xapi_event_transformers
14 changes: 14 additions & 0 deletions event_routing_backends/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
This module contains various configuration settings via
waffle switches for the Certificates app.
"""

from edx_toggles.toggles import SettingToggle, WaffleSwitch

# .. toggle_name: SEND_TRACKING_EVENT_EMITTED_SIGNAL
# .. toggle_implementation: SettingToggle
# .. toggle_default: False
# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The
# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted.
# .. toggle_use_cases: publish
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle('SEND_TRACKING_EVENT_EMITTED_SIGNAL', default=False, module_name=__name__)
1 change: 1 addition & 0 deletions event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def plugin_settings(settings):
settings.XAPI_EVENTS_ENABLED = True
settings.EVENT_ROUTING_BACKEND_MAX_RETRIES = 3
settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30
settings.EVENT_ROUTING_BATCH_SIZE = 5

# .. setting_name: XAPI_AGENT_IFI_TYPE
# .. setting_default: 'external_id'
Expand Down
50 changes: 50 additions & 0 deletions event_routing_backends/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json
import logging

from django.conf import settings
from django.dispatch import receiver
from django_redis import get_redis_connection
from edx_django_utils.cache.utils import get_cache_key
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED
from openedx_events.event_bus import get_producer

from event_routing_backends.config import SEND_TRACKING_EVENT_EMITTED_SIGNAL

logger = logging.getLogger(__name__)

TRANSFORMED_EVENT_KEY_NAME = "transformed_events"

@receiver(TRACKING_EVENT_EMITTED)
def listen_for_tracking_event_emitted_event(sender, signal, **kwargs):
"""
Publish `TRACKING_EVENT_EMITTED` events to the event bus.
"""
event = kwargs['tracking_log']

## TODO: Should we ignore events that we don't care about here or in the event routing backend config?

redis = get_redis_connection("default")

queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME)
if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE:
queued_events = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, settings.EVENT_ROUTING_BATCH_SIZE)
queued_events.append(event)
## TODO: Send events to event bus in batch
logger.info("Sending events to event bus in batch")

#if SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled():
# get_producer().send(
# signal=TRACKING_EVENT_EMITTED,
# topic='analytics',
# event_key_field='tracking_log.name',
# event_data={'tracking_log': kwargs['tracking_log']},
# event_metadata=kwargs['metadata']
# )
else:
redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps({
"name": event.name,
"timestamp": event.timestamp,
"data": event.data,
"context": event.context,
}))
logger.info("Event pushed to the queue, current size: %s", queue_size + 1)

0 comments on commit f420d7d

Please sign in to comment.