diff --git a/VERSION b/VERSION index c1e81b8..a9b738e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.dev4 +1.1.dev5 diff --git a/aimm/server/main.py b/aimm/server/main.py index 6f9d3f8..fd6a034 100644 --- a/aimm/server/main.py +++ b/aimm/server/main.py @@ -43,8 +43,10 @@ def main(): async def async_main(conf): hat_conf = conf.get("hat") or {} subscriptions = list(_get_subscriptions(conf)) + if monitor_conf := hat_conf.get("monitor"): - runner = partial(run, conf) + events_queue = aio.Queue() + runner = partial(run, conf, events_queue=events_queue) if event_server_group := hat_conf.get("event_server_group"): component = await hat.event.component.connect( _parse_tcp(monitor_conf["monitor_address"]), @@ -53,6 +55,9 @@ async def async_main(conf): event_server_group, runner, eventer_kwargs={"subscriptions": subscriptions}, + events_cb=lambda _, __, events: events_queue.put_nowait( + events + ), ) else: mlog.info("running without hat event compatibility") @@ -66,21 +71,30 @@ async def async_main(conf): await component.wait_closed() elif "event_server_address" in hat_conf: async_group = aio.Group() + events_queue = aio.Queue() client = await hat.event.eventer.client.connect( _parse_tcp(hat_conf["event_server_address"]), "aimm_client", subscriptions=subscriptions, + events_cb=lambda _, events: events_queue.put_nowait(events), ) _bind_resource(async_group, client) await async_group.spawn( - run, conf=conf, component=None, server_data=None, client=client + run, + conf=conf, + component=None, + server_data=None, + client=client, + events_queue=events_queue, ) else: mlog.debug("running without hat compatibility") await run(conf) -async def run(conf, component=None, server_data=None, client=None): +async def run( + conf, component=None, server_data=None, client=None, events_queue=None +): group = aio.Group() try: @@ -107,9 +121,7 @@ async def run(conf, component=None, server_data=None, client=None): proxies.append(proxy) if proxies: - # TODO: filthy hack for sake of bw compatibility without too much - # refactoring, to be corrected ASAP - client._events_cb = partial(_events_cb, proxies) + group.spawn(_events_loop, events_queue, proxies) await group.wait_closing() finally: @@ -144,13 +156,14 @@ async def _create_control(conf, engine, client): return await module.create(conf, engine, proxy), proxy -async def _events_cb(proxies, _, events): - for proxy in proxies: - proxy_events = [ - e for e in events if proxy.subscription.matches(e.type) - ] - if proxy_events: - proxy.notify(proxy_events) +async def _events_loop(queue, proxies): + async for events in queue: + for proxy in proxies: + proxy_events = [ + e for e in events if proxy.subscription.matches(e.type) + ] + if proxy_events: + proxy.notify(proxy_events) def _create_parser(): diff --git a/test/test_sys/test_event.py b/test/test_sys/test_event.py index 1f6acac..eea9103 100644 --- a/test/test_sys/test_event.py +++ b/test/test_sys/test_event.py @@ -79,7 +79,26 @@ async def event_server( ): conf = { "type": "event", - "log": {"version": 1}, + "log": { + "disable_existing_loggers": False, + "formatters": {"default": {}}, + "handlers": { + "syslog": { + "class": "hat.syslog.handler.SysLogHandler", + "comm_type": "TCP", + "formatter": "default", + "host": "127.0.0.1", + "level": "INFO", + "port": 6514, + "queue_size": 10, + } + }, + "root": { + "handlers": ["syslog"], + "level": "INFO", + }, + "version": 1, + }, "name": "event-server", "server_id": 0, "backend": {"module": "hat.event.backends.dummy"},