Skip to content

Commit

Permalink
feat: send xAPI statements via event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Oct 25, 2023
1 parent 0651832 commit d6b3fa6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
14 changes: 9 additions & 5 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class EventsRouter:
Router to send events to hosts using requests library.
"""

def __init__(self, processors=None, backend_name=None):
def __init__(self, processors=None, backend_name=None, sync=False):
"""
Initialize the router.
Expand All @@ -27,6 +27,7 @@ def __init__(self, processors=None, backend_name=None):
"""
self.processors = processors if processors else []
self.backend_name = backend_name
self.sync = sync

def configure_host(self, host, router):
"""
Expand Down Expand Up @@ -159,19 +160,22 @@ def send(self, event):

for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
func = dispatch_event_persistent if is_business_critical else dispatch_event
if not self.sync:
func = func.delay
if is_business_critical:
dispatch_event_persistent.delay(
func(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)
else:
dispatch_event.delay(
func(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)

def process_event(self, event):
Expand Down
3 changes: 2 additions & 1 deletion event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def plugin_settings(settings):

settings.EVENT_TRACKING_BACKENDS.update({
'xapi': {
'ENGINE': 'eventtracking.backends.async_routing.AsyncRoutingBackend',
'ENGINE': 'eventtracking.backends.event_bus.EventBusRoutingBackend',
'OPTIONS': {
'backend_name': 'xapi',
'processors': [
Expand Down Expand Up @@ -149,6 +149,7 @@ def plugin_settings(settings):
}
],
'backend_name': 'xapi',
'sync': True,
}
}
},
Expand Down
30 changes: 23 additions & 7 deletions event_routing_backends/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

from django.dispatch import receiver
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED_TO_BUS
from eventtracking.django.django_tracker import DjangoTracker
from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.tasks import send_event

logger = logging.getLogger(__name__)

Expand All @@ -12,10 +15,23 @@ def listen_for_tracking_event_emitted_event(sender, signal, **kwargs):
Publish `TRACKING_EVENT_EMITTED` events to the event bus.
"""
tracking_log = kwargs.get("tracking_log")
print(f"""
tracking_log:
name: {tracking_log.name}
timestamp: {tracking_log.timestamp}
data: {json.loads(tracking_log.data)}
context: {json.loads(tracking_log.context)}
""")

event = {
"name": tracking_log.name,
"timestamp": tracking_log.timestamp,
"data": json.loads(tracking_log.data),
"context": json.loads(tracking_log.context),
}

django_tracker = DjangoTracker()

backend = django_tracker.backends["xapi"]

try:
processed_event = backend.process_event(event)
logger.info('Successfully processed event "{}"'.format(event['name']))

except EventEmissionExit:
logger.info('[EventEmissionExit] skipping event {}'.format(event['name']))
return
send_event(backend.backend_name, processed_event)

0 comments on commit d6b3fa6

Please sign in to comment.