Skip to content

Commit

Permalink
[resotocore][feat] Maintain subscribers in memory (#1819)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Nov 9, 2023
1 parent 5099152 commit 28a13a6
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 148 deletions.
4 changes: 1 addition & 3 deletions resotocore/resotocore/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ async def direct_tenant(deps: TenantDependencies) -> None:
cli = deps.add(ServiceNames.cli, CLIService(deps, all_commands(deps), default_env, alias_names()))
deps.add(ServiceNames.template_expander, TemplateExpanderService(db.template_entity_db, cli))
inspector = deps.add(ServiceNames.inspector, InspectorConfigService(cli))
subscriptions = deps.add(
ServiceNames.subscription_handler, SubscriptionHandlerService(db.subscribers_db, message_bus)
)
subscriptions = deps.add(ServiceNames.subscription_handler, SubscriptionHandlerService(message_bus))
core_config_handler = deps.add(
ServiceNames.core_config_handler,
CoreConfigHandler(config, message_bus, worker_task_queue, config_handler, event_sender, inspector),
Expand Down
4 changes: 0 additions & 4 deletions resotocore/resotocore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from resotocore.db.modeldb import ModelDb, model_db
from resotocore.db.packagedb import app_package_entity_db
from resotocore.db.runningtaskdb import running_task_db
from resotocore.db.subscriberdb import subscriber_db
from resotocore.db.system_data_db import SystemDataDb
from resotocore.db.templatedb import template_entity_db
from resotocore.error import NoSuchGraph, RequiredDependencyMissingError
Expand All @@ -46,7 +45,6 @@ def __init__(
event_sender: AnalyticsEventSender,
adjust_node: AdjustNode,
config: CoreConfig,
subscriber_name: str = "subscribers",
running_task_name: str = "running_tasks",
job_name: str = "jobs",
deferred_outer_edge_name: str = "deferred_outer_edges",
Expand All @@ -62,7 +60,6 @@ def __init__(
self.db = AsyncArangoDB(arango_database)
self.adjust_node = adjust_node
self.graph_model_dbs: Dict[GraphName, ModelDb] = {}
self.subscribers_db = EventEntityDb(subscriber_db(self.db, subscriber_name), event_sender, subscriber_name)
self.system_data_db = SystemDataDb(self.db)
self.running_task_db = running_task_db(self.db, running_task_name)
self.deferred_outer_edge_db = deferred_outer_edge_db(self.db, deferred_outer_edge_name)
Expand All @@ -78,7 +75,6 @@ def __init__(

async def start(self) -> None:
if not self.config.multi_tenant_setup:
await self.subscribers_db.create_update_schema()
await self.running_task_db.create_update_schema()
await self.job_db.create_update_schema()
await self.config_entity_db.create_update_schema()
Expand Down
11 changes: 0 additions & 11 deletions resotocore/resotocore/db/subscriberdb.py

This file was deleted.

47 changes: 16 additions & 31 deletions resotocore/resotocore/task/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from datetime import timedelta, datetime
from typing import Optional, Iterable, Dict, List

from resotocore.db.subscriberdb import SubscriberDb
from resotocore.ids import SubscriberId
from resotocore.message_bus import MessageBus
from resotocore.service import Service
Expand Down Expand Up @@ -52,10 +51,6 @@ async def remove_subscriber(self, subscriber_id: SubscriberId) -> Optional[Subsc
def subscribers_by_event(self) -> Dict[str, List[Subscriber]]:
pass

@abstractmethod
def update_subscriber_by_event(self, subscribers: Iterable[Subscriber]) -> Dict[str, List[Subscriber]]:
pass


class SubscriptionHandlerService(SubscriptionHandler):
"""
Expand All @@ -64,21 +59,17 @@ class SubscriptionHandlerService(SubscriptionHandler):
This handler belongs to the event system, which assumes there is only one instance running in each cluster!
"""

def __init__(self, db: SubscriberDb, message_bus: MessageBus) -> None:
def __init__(self, message_bus: MessageBus) -> None:
super().__init__()
self.db = db
self.message_bus = message_bus
self._subscribers_by_id: Dict[SubscriberId, Subscriber] = {}
self._subscribers_by_event: Dict[str, List[Subscriber]] = {}
self.started_at = utc()
self.cleaner = Periodic("subscription_cleaner", self.check_outdated_handler, timedelta(seconds=10))
self.not_connected_since: Dict[str, datetime] = {}
self.lock: Optional[Lock] = None
self.lock: Lock = Lock()

async def start(self) -> None:
self.lock = Lock()
await self.__load_from_db()
log.info(f"Loaded {len(self._subscribers_by_id)} subscribers for {len(self._subscribers_by_event)} events")
await self.cleaner.start()

async def stop(self) -> None:
Expand All @@ -100,54 +91,48 @@ async def add_subscription(
updated = existing.add_subscription(event_type, wait_for_completion, timeout)
if existing != updated:
log.info(f"Subscriber {subscriber_id}: add subscription={event_type} ({wait_for_completion}, {timeout})")
await self.db.update(updated)
await self.__load_from_db()
await self.__update_subscriber(updated)
return updated

async def remove_subscription(self, subscriber_id: SubscriberId, event_type: str) -> Subscriber:
existing = self._subscribers_by_id.get(subscriber_id, Subscriber(subscriber_id, {}))
updated = existing.remove_subscription(event_type)
if existing != updated:
log.info(f"Subscriber {subscriber_id}: remove subscription={event_type}")
if updated.subscriptions:
await self.db.update(updated)
else:
await self.db.delete(subscriber_id)
await self.__load_from_db()
await self.__update_subscriber(updated)
return updated

async def update_subscriptions(self, subscriber_id: SubscriberId, subscriptions: List[Subscription]) -> Subscriber:
existing = self._subscribers_by_id.get(subscriber_id, None)
updated = Subscriber.from_list(subscriber_id, subscriptions)
if existing != updated:
log.info(f"Subscriber {subscriber_id}: update all subscriptions={subscriptions}")
await self.db.update(updated)
await self.__load_from_db()
await self.__update_subscriber(updated)
return updated

async def remove_subscriber(self, subscriber_id: SubscriberId) -> Optional[Subscriber]:
existing = self._subscribers_by_id.get(subscriber_id, None)
if existing:
log.info(f"Subscriber {subscriber_id}: remove subscriber")
await self.db.delete(subscriber_id)
await self.__load_from_db()
async with self.lock:
self._subscribers_by_id.pop(subscriber_id, None)
self.__update_subscriber_by_event()
return existing

async def __load_from_db(self) -> None:
assert self.lock is not None
async with self.lock:
self._subscribers_by_id = {s.id: s async for s in self.db.all()}
self._subscribers_by_event = self.update_subscriber_by_event(self._subscribers_by_id.values())

def subscribers_by_event(self) -> Dict[str, List[Subscriber]]:
return self._subscribers_by_event

def update_subscriber_by_event(self, subscribers: Iterable[Subscriber]) -> Dict[str, List[Subscriber]]:
def __update_subscriber_by_event(self) -> None:
result: Dict[str, List[Subscriber]] = defaultdict(list)
for subscriber in subscribers:
for subscriber in self._subscribers_by_id.values():
for subscription in subscriber.subscriptions.values():
result[subscription.message_type].append(subscriber)
return result
self._subscribers_by_event = result

async def __update_subscriber(self, subscriber: Subscriber) -> None:
async with self.lock:
self._subscribers_by_id[subscriber.id] = subscriber
self.__update_subscriber_by_event()

async def check_outdated_handler(self) -> None:
"""
Expand Down
3 changes: 1 addition & 2 deletions resotocore/tests/resotocore/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,7 @@ def workflow_instance(

@fixture
async def subscription_handler(message_bus: MessageBus) -> AsyncIterator[SubscriptionHandlerService]:
in_mem = InMemoryDb(Subscriber, lambda x: x.id)
async with SubscriptionHandlerService(in_mem, message_bus) as handler:
async with SubscriptionHandlerService(message_bus) as handler:
yield handler


Expand Down
78 changes: 0 additions & 78 deletions resotocore/tests/resotocore/db/subscriberdb_test.py

This file was deleted.

23 changes: 4 additions & 19 deletions resotocore/tests/resotocore/task/subscribers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,16 @@
from deepdiff import DeepDiff
from pytest import fixture, mark

from resotocore.db.subscriberdb import SubscriberDb
from resotocore.ids import SubscriberId
from resotocore.message_bus import MessageBus
from resotocore.model.typed_model import to_js, from_js
from resotocore.task.model import Subscription, Subscriber
from resotocore.task.subscribers import SubscriptionHandler, SubscriptionHandlerService
from tests.resotocore.db.entitydb import InMemoryDb


@fixture
def in_mem_db() -> SubscriberDb:
return InMemoryDb[SubscriberId, Subscriber](Subscriber, lambda x: x.id)


@fixture
async def handler(in_mem_db: SubscriberDb) -> AsyncIterator[SubscriptionHandlerService]:
async with SubscriptionHandlerService(in_mem_db, MessageBus()) as handler:
async def handler() -> AsyncIterator[SubscriptionHandlerService]:
async with SubscriptionHandlerService(MessageBus()) as handler:
await handler.add_subscription(SubscriberId("sub_1"), "test", True, timedelta(seconds=3))
yield handler

Expand All @@ -46,40 +39,32 @@ def test_json_marshalling_subscribers() -> None:


@mark.asyncio
async def test_subscribe(handler: SubscriptionHandler, in_mem_db: SubscriberDb) -> None:
async def test_subscribe(handler: SubscriptionHandler) -> None:
# register first time
result = await handler.add_subscription(SubscriberId("foo"), "event_bla", True, timedelta(seconds=3))
assert len(result.subscriptions) == 1
assert result.subscriptions["event_bla"].message_type == "event_bla"
# should be persisted in database as well
assert len((await in_mem_db.get("foo")).subscriptions) == 1 # type: ignore
# register again is ignored
result = await handler.add_subscription(SubscriberId("foo"), "event_bla", True, timedelta(seconds=3))
assert len(result.subscriptions) == 1
assert result.subscriptions["event_bla"].message_type == "event_bla"
# should be persisted in database as well
assert len((await in_mem_db.get("foo")).subscriptions) == 1 # type: ignore


@mark.asyncio
async def test_unsubscribe(handler: SubscriptionHandler, in_mem_db: SubscriberDb) -> None:
async def test_unsubscribe(handler: SubscriptionHandler) -> None:
# register first time
subscriber_id = SubscriberId("foo")
subs = [Subscription("event_bla"), Subscription("event_bar")]
result = await handler.update_subscriptions(subscriber_id, subs)
assert len(result.subscriptions) == 2
updated = await handler.remove_subscription(subscriber_id, "event_bla")
assert len(updated.subscriptions) == 1
# should be persisted in database as well
assert len((await in_mem_db.get(subscriber_id)).subscriptions) == 1 # type: ignore
# second time should be ignored
updated = await handler.remove_subscription(subscriber_id, "event_bla")
assert len(updated.subscriptions) == 1
# last subscription is removed
updated = await handler.remove_subscription(subscriber_id, "event_bar")
assert len(updated.subscriptions) == 0
# should be persisted in database as well
assert await in_mem_db.get(subscriber_id) is None


@mark.asyncio
Expand Down

0 comments on commit 28a13a6

Please sign in to comment.