Skip to content

Commit

Permalink
fix problematic hack in the main
Browse files Browse the repository at this point in the history
  • Loading branch information
zlatsic committed Aug 28, 2024
1 parent a8a119d commit 6b2087f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.dev4
1.1.dev5
39 changes: 26 additions & 13 deletions aimm/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def main():
async def async_main(conf):
hat_conf = conf.get("hat") or {}
subscriptions = list(_get_subscriptions(conf))

if monitor_conf := hat_conf.get("monitor"):
runner = partial(run, conf)
events_queue = aio.Queue()
runner = partial(run, conf, events_queue=events_queue)
if event_server_group := hat_conf.get("event_server_group"):
component = await hat.event.component.connect(
_parse_tcp(monitor_conf["monitor_address"]),
Expand All @@ -53,6 +55,9 @@ async def async_main(conf):
event_server_group,
runner,
eventer_kwargs={"subscriptions": subscriptions},
events_cb=lambda _, __, events: events_queue.put_nowait(
events
),
)
else:
mlog.info("running without hat event compatibility")
Expand All @@ -66,21 +71,30 @@ async def async_main(conf):
await component.wait_closed()
elif "event_server_address" in hat_conf:
async_group = aio.Group()
events_queue = aio.Queue()
client = await hat.event.eventer.client.connect(
_parse_tcp(hat_conf["event_server_address"]),
"aimm_client",
subscriptions=subscriptions,
events_cb=lambda _, events: events_queue.put_nowait(events),
)
_bind_resource(async_group, client)
await async_group.spawn(
run, conf=conf, component=None, server_data=None, client=client
run,
conf=conf,
component=None,
server_data=None,
client=client,
events_queue=events_queue,
)
else:
mlog.debug("running without hat compatibility")
await run(conf)


async def run(conf, component=None, server_data=None, client=None):
async def run(
conf, component=None, server_data=None, client=None, events_queue=None
):
group = aio.Group()

try:
Expand All @@ -107,9 +121,7 @@ async def run(conf, component=None, server_data=None, client=None):
proxies.append(proxy)

if proxies:
# TODO: filthy hack for sake of bw compatibility without too much
# refactoring, to be corrected ASAP
client._events_cb = partial(_events_cb, proxies)
group.spawn(_events_loop, events_queue, proxies)

await group.wait_closing()
finally:
Expand Down Expand Up @@ -144,13 +156,14 @@ async def _create_control(conf, engine, client):
return await module.create(conf, engine, proxy), proxy


async def _events_cb(proxies, _, events):
for proxy in proxies:
proxy_events = [
e for e in events if proxy.subscription.matches(e.type)
]
if proxy_events:
proxy.notify(proxy_events)
async def _events_loop(queue, proxies):
async for events in queue:
for proxy in proxies:
proxy_events = [
e for e in events if proxy.subscription.matches(e.type)
]
if proxy_events:
proxy.notify(proxy_events)


def _create_parser():
Expand Down
21 changes: 20 additions & 1 deletion test/test_sys/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,26 @@ async def event_server(
):
conf = {
"type": "event",
"log": {"version": 1},
"log": {
"disable_existing_loggers": False,
"formatters": {"default": {}},
"handlers": {
"syslog": {
"class": "hat.syslog.handler.SysLogHandler",
"comm_type": "TCP",
"formatter": "default",
"host": "127.0.0.1",
"level": "INFO",
"port": 6514,
"queue_size": 10,
}
},
"root": {
"handlers": ["syslog"],
"level": "INFO",
},
"version": 1,
},
"name": "event-server",
"server_id": 0,
"backend": {"module": "hat.event.backends.dummy"},
Expand Down

0 comments on commit 6b2087f

Please sign in to comment.