Skip to content

Commit

Permalink
feat: refactor EventsRouter bettween sync and async router
Browse files Browse the repository at this point in the history
chore: quality fixes

docs: improve docstrings for handlers and new events routers

docs: improve docstrings for event bus workflow
  • Loading branch information
Ian2012 committed Feb 2, 2024
1 parent 533a7b5 commit f3781f2
Show file tree
Hide file tree
Showing 8 changed files with 566 additions and 138 deletions.
49 changes: 49 additions & 0 deletions event_routing_backends/backends/async_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Events router to send events to hosts via celery.
This events router will trigger a celery task to send the events to the
configured hosts.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent


class AsyncEventsRouter(EventsRouter):
"""
Router to send events to hosts via celery using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event.delay(event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_bulk_events.delay(events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event_persistent.delay(event_name, updated_event, router_type, host_configurations)
48 changes: 39 additions & 9 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand All @@ -17,18 +16,16 @@ class EventsRouter:
Router to send events to hosts using requests library.
"""

def __init__(self, processors=None, backend_name=None, sync=False):
def __init__(self, processors=None, backend_name=None):
"""
Initialize the router.
Arguments:
processors (list): list of processor instances
backend_name (str): name of the router backend
sync (bool): whether to send events synchronously or in celery tasks
"""
self.processors = processors if processors else []
self.backend_name = backend_name
self.sync = sync

def configure_host(self, host, router):
"""
Expand Down Expand Up @@ -140,7 +137,7 @@ def bulk_send(self, events):
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
self.dispatch_bulk_events(
prepared_events,
host['router_type'],
host['host_configurations']
Expand All @@ -161,17 +158,15 @@ def send(self, event):

for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
func = dispatch_event_persistent if is_business_critical else dispatch_event
func = func if self.sync else func.delay
if is_business_critical:
func(
self.dispatch_event_persistent(
event_name,
updated_event,
host['router_type'],
host['host_configurations'],
)
else:
func(
self.dispatch_event(
event_name,
updated_event,
host['router_type'],
Expand Down Expand Up @@ -219,3 +214,38 @@ def overwrite_event_data(self, event, host, event_name):
host['override_args']
))
return event

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event is not implemented')

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_bulk_events is not implemented')

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event_persistent is not implemented')
51 changes: 51 additions & 0 deletions event_routing_backends/backends/sync_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Events router to send events to hosts in sync mode.
This router is expected to be used with the event bus, which
can be configured to use this router to send events to hosts
in the same thread as it process the events.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import bulk_send_events, send_event


class SyncEventsRouter(EventsRouter):
"""
Router to send events to hosts using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
send_event(None, event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
bulk_send_events(None, events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
In this case, the event bus is expected to provide the persistent storage layer.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
self.dispatch_event(event_name, updated_event, router_type, host_configurations)
Loading

0 comments on commit f3781f2

Please sign in to comment.