diff --git a/scitt_emulator/federation_activitypub_bovine.py b/scitt_emulator/federation_activitypub_bovine.py index e6a8e220..8073922d 100644 --- a/scitt_emulator/federation_activitypub_bovine.py +++ b/scitt_emulator/federation_activitypub_bovine.py @@ -28,7 +28,12 @@ from bovine_pubsub import BovinePubSub from bovine.activitystreams import factories_for_actor_object from bovine.clients import lookup_uri_with_webfinger -from mechanical_bull.handlers import HandlerEvent, HandlerAPIVersion +from mechanical_bull.handlers import ( + HandlerEvent, + HandlerAPIVersion, + load_handlers, + build_handler, +) from scitt_emulator.scitt import SCITTServiceEmulator from scitt_emulator.federation import SCITTFederation @@ -50,9 +55,9 @@ def __init__(self, app, signals, config_path): # tree_alg class's workspace self.workspace = Path(self.config["workspace"]).expanduser() - self.bovine_db_url = self.config.get("bovine_db_url", - os.environ.get("BOVINE_DB_URL", - None)) + self.bovine_db_url = self.config.get( + "bovine_db_url", os.environ.get("BOVINE_DB_URL", None) + ) if self.bovine_db_url and self.bovine_db_url.startswith("~"): self.bovine_db_url = str(Path(self.bovine_db_url).expanduser()) # TODO Pass this as variable @@ -123,60 +128,24 @@ async def initialize_service(self): "key0", did_key, ) - _account, actor_url = await self.app.config["bovine_store"].get_account_url_for_identity(did_key) + _account, actor_url = await self.app.config[ + "bovine_store" + ].get_account_url_for_identity(did_key) logger.info("Actor key added in database. actor_url is %s", actor_url) # Run client handlers async def mechanical_bull_loop(config): try: - # from mechanical_bull.event_loop import loop - from mechanical_bull.handlers import load_handlers, build_handler - - for client_name, value in config.items(): - if isinstance(value, dict): - handlers = load_handlers(value["handlers"]) - # taskgroup.create_task(loop(client_name, value, handlers)) - # await asyncio.sleep(10) - client_config = value - # TODO DEBUG TESTING XXX NOTE REMOVE - os.environ["BUTCHER_ALLOW_HTTP"] = "1" + for client_name, client_config in config.items(): + if isinstance(client_config, dict): + handlers = load_handlers(client_config["handlers"]) client_config["domain"] = client_config["host"] - self.app.add_background_task(loop, client_name, - client_config, - handlers) - # await loop(client_name, - # client_config, - # handlers) - continue - i = 1 - while True: - try: - pprint.pprint(client_config) - # client = await self.app.config["bovine_async_exit_stack"].enter_async_context(bovine.BovineClient(**client_config)) - client = bovine.BovineClient(**client_config) - print("client:", client) - client.session = await self.make_client_session() - client = await self.app.config["bovine_async_exit_stack"].enter_async_context(client) - print("session:", session) - print("session._request_class:", session._request_class) - print("Client init success!!!") - # await handle_connection_with_reconnect( - # client, handlers, client_name=client_name, - # ) - except aiohttp.client_exceptions.ClientConnectorError as e: - logger.info("Something went wrong connection: %s: attempt %i: %s", client_name, i, e) - except Exception as e: - logger.exception(e) - await asyncio.sleep(1) - # await asyncio.sleep(2 ** i) - i += 1 - continue - self.app.add_background_task(handle_connection_with_reconnect, client, handlers, client_name=client_name) - break + self.app.add_background_task( + loop, client_name, client_config, handlers + ) except Exception as e: logger.exception(e) - # async with aiohttp.ClientSession(trust_env=True) as client_session: async with contextlib.AsyncExitStack() as async_exit_stack: # await mechanical_bull_loop(config_toml_obj) @@ -207,8 +176,9 @@ async def federate_created_entries_pass_client( created_entry: SCITTSignalsFederationCreatedEntry = None, ): nonlocal client - asyncio.create_task(federate_created_entries(client, sender, - created_entry)) + asyncio.create_task( + federate_created_entries(client, sender, created_entry) + ) client.federate_created_entries = types.MethodType( signals.federation.created_entry.connect( @@ -351,6 +321,7 @@ async def federate_created_entries( except: logger.error(traceback.format_exc()) + import asyncio import bovine @@ -364,7 +335,7 @@ async def federate_created_entries( async def handle_connection(client: bovine.BovineClient, handlers: list): print("handle_connection") event_source = await client.event_source() - print(event_source ) + print(event_source) logger.info("Connected") for handler in handlers: await call_handler_compat( @@ -412,21 +383,15 @@ async def handle_connection_with_reconnect( async def loop(client_name, client_config, handlers): - # TODO DEBUG TESTING XXX NOTE REMOVE - os.environ["BUTCHER_ALLOW_HTTP"] = "1" - # client_config["domain"] = "http://" + client_config["host"] i = 1 while True: try: async with bovine.BovineClient(**client_config) as client: - print("client:", client) await handle_connection_with_reconnect( client, handlers, client_name=client_name ) except Exception as e: logger.exception("Something went wrong for %s", client_name) logger.exception(e) - await asyncio.sleep(1) - # await asyncio.sleep(10) - # await asyncio.sleep(2 ** i) + await asyncio.sleep(2**i) i += 1