diff --git a/pypredict/api/endpoints/homepage.py b/pypredict/api/endpoints/homepage.py index ebbbf72..f56b086 100644 --- a/pypredict/api/endpoints/homepage.py +++ b/pypredict/api/endpoints/homepage.py @@ -1,4 +1,3 @@ -import asyncio import json from fastapi import APIRouter @@ -34,12 +33,8 @@ async def subscribe(self): Subscribe to trading events from Ensign and run an online model pipeline and publish predictions to a new topic. """ - # Get the topic ID from the topic name. - topic_id = await self.ensign.topic_id(self.topic) - # Subscribe to the topic. - # self.generate_price_info is a callback function that gets executed when - # a new event arrives in the topic - await self.ensign.subscribe(topic_id, on_event=self.generate_price_info) + async for event in self.ensign.subscribe(self.topic): + await self.generate_price_info(event) templates = Jinja2Templates(directory=config.TEMPLATE_DIR) router = APIRouter() @@ -49,10 +44,7 @@ async def websocket_endpoint(websocket: WebSocket): await websocket.accept() subscriber = PredictionsSubscriber(websocket) await subscriber.subscribe() - # create a Future and await its result - this will ensure that the - # subscriber will run forever since nothing in the code is setting the - # result of the Future - await asyncio.Future() + @router.get("/") async def home(request: Request): diff --git a/pypredict/trades.py b/pypredict/trades.py index f0208ec..a89218e 100644 --- a/pypredict/trades.py +++ b/pypredict/trades.py @@ -1,6 +1,7 @@ import os import sys import json +import logging import asyncio from datetime import datetime @@ -12,11 +13,14 @@ from river import preprocessing +logging.basicConfig(level=logging.INFO) + + async def handle_ack(ack): _ = datetime.fromtimestamp(ack.committed.seconds + ack.committed.nanos / 1e9) async def handle_nack(nack): - print(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}") + logging.error(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}") class TradesPublisher: """ @@ -45,7 +49,6 @@ async def recv_and_publish(self, uri): """ Receive messages from the websocket and publish events to Ensign. """ - topic_id = await self.ensign.topic_id(self.topic) while True: try: async with websockets.connect(uri) as websocket: @@ -55,11 +58,10 @@ async def recv_and_publish(self, uri): while True: message = await websocket.recv() for event in self.message_to_events(json.loads(message)): - await self.ensign.publish(topic_id, event, on_ack=handle_ack, on_nack=handle_nack) + await self.ensign.publish(self.topic, event, on_ack=handle_ack, on_nack=handle_nack) except websockets.exceptions.ConnectionClosedError as e: - # TODO: Make sure reconnect is happening for dropped connections. - print(f"Websocket connection closed: {e}") - await asyncio.sleep(1) + logging.error(f"Websocket connection closed: {e}") + continue def message_to_events(self, message): """ @@ -139,27 +141,16 @@ async def run_model_pipeline(self, event): # create an Ensign event and publish to the predictions topic event = Event(json.dumps(message).encode("utf-8"), mimetype="application/json") - # Get the topic ID from the topic name. - topic_id = await self.ensign.topic_id(self.pub_topic) - await self.ensign.publish(topic_id, event, on_ack=handle_ack, on_nack=handle_nack) + await self.ensign.publish(self.pub_topic, event, on_ack=handle_ack, on_nack=handle_nack) async def subscribe(self): """ Subscribe to trading events from Ensign and run an online model pipeline and publish predictions to a new topic. """ - - # Get the topic ID from the topic name. - topic_id = await self.ensign.topic_id(self.sub_topic) - - # Subscribe to the topic. - # self.run_model_pipeline is a callback function that gets executed when - # a new event arrives in the topic - await self.ensign.subscribe(topic_id, on_event=self.run_model_pipeline) - # create a Future and await its result - this will ensure that the - # subscriber will run forever since nothing in the code is setting the - # result of the Future - await asyncio.Future() + # Subscribe to the topic and run the model pipeline after receiving event + async for event in self.ensign.subscribe(self.sub_topic): + await self.run_model_pipeline(event) if __name__ == "__main__": # Run the publisher or subscriber depending on the command line arguments. diff --git a/requirements.txt b/requirements.txt index 9508482..01611d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ fastapi jinja2 -pyensign>=0.7.1b1 +pyensign==0.10b0 river==0.17.0 uvicorn websockets \ No newline at end of file