Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated code to use the latest version of PyEnsign and added error handling for closed websocket connections #9

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions pypredict/api/endpoints/homepage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import json

from fastapi import APIRouter
Expand Down Expand Up @@ -34,12 +33,8 @@ async def subscribe(self):
Subscribe to trading events from Ensign and run an
online model pipeline and publish predictions to a new topic.
"""
# Get the topic ID from the topic name.
topic_id = await self.ensign.topic_id(self.topic)
# Subscribe to the topic.
# self.generate_price_info is a callback function that gets executed when
# a new event arrives in the topic
await self.ensign.subscribe(topic_id, on_event=self.generate_price_info)
async for event in self.ensign.subscribe(self.topic):
await self.generate_price_info(event)

templates = Jinja2Templates(directory=config.TEMPLATE_DIR)
router = APIRouter()
Expand All @@ -49,10 +44,7 @@ async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
subscriber = PredictionsSubscriber(websocket)
await subscriber.subscribe()
# create a Future and await its result - this will ensure that the
# subscriber will run forever since nothing in the code is setting the
# result of the Future
await asyncio.Future()


@router.get("/")
async def home(request: Request):
Expand Down
33 changes: 12 additions & 21 deletions pypredict/trades.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sys
import json
import logging
import asyncio
from datetime import datetime

Expand All @@ -12,11 +13,14 @@
from river import preprocessing


logging.basicConfig(level=logging.INFO)


async def handle_ack(ack):
_ = datetime.fromtimestamp(ack.committed.seconds + ack.committed.nanos / 1e9)

async def handle_nack(nack):
print(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}")
logging.error(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the logging!


class TradesPublisher:
"""
Expand Down Expand Up @@ -45,7 +49,6 @@ async def recv_and_publish(self, uri):
"""
Receive messages from the websocket and publish events to Ensign.
"""
topic_id = await self.ensign.topic_id(self.topic)
while True:
try:
async with websockets.connect(uri) as websocket:
Expand All @@ -55,11 +58,10 @@ async def recv_and_publish(self, uri):
while True:
message = await websocket.recv()
for event in self.message_to_events(json.loads(message)):
await self.ensign.publish(topic_id, event, on_ack=handle_ack, on_nack=handle_nack)
await self.ensign.publish(self.topic, event, on_ack=handle_ack, on_nack=handle_nack)
except websockets.exceptions.ConnectionClosedError as e:
# TODO: Make sure reconnect is happening for dropped connections.
print(f"Websocket connection closed: {e}")
await asyncio.sleep(1)
logging.error(f"Websocket connection closed: {e}")
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok if I'm reading this correctly continue in this instance is the same as falling through to the next iteration right? So maybe it's not the cause of the app not publishing, but I think this is more clear anyway. Maybe the logging will help us figure out what's going on.


def message_to_events(self, message):
"""
Expand Down Expand Up @@ -139,27 +141,16 @@ async def run_model_pipeline(self, event):

# create an Ensign event and publish to the predictions topic
event = Event(json.dumps(message).encode("utf-8"), mimetype="application/json")
# Get the topic ID from the topic name.
topic_id = await self.ensign.topic_id(self.pub_topic)
await self.ensign.publish(topic_id, event, on_ack=handle_ack, on_nack=handle_nack)
await self.ensign.publish(self.pub_topic, event, on_ack=handle_ack, on_nack=handle_nack)

async def subscribe(self):
"""
Subscribe to trading events from Ensign and run an
online model pipeline and publish predictions to a new topic.
"""

# Get the topic ID from the topic name.
topic_id = await self.ensign.topic_id(self.sub_topic)

# Subscribe to the topic.
# self.run_model_pipeline is a callback function that gets executed when
# a new event arrives in the topic
await self.ensign.subscribe(topic_id, on_event=self.run_model_pipeline)
# create a Future and await its result - this will ensure that the
# subscriber will run forever since nothing in the code is setting the
# result of the Future
await asyncio.Future()
# Subscribe to the topic and run the model pipeline after receiving event
async for event in self.ensign.subscribe(self.sub_topic):
await self.run_model_pipeline(event)

if __name__ == "__main__":
# Run the publisher or subscriber depending on the command line arguments.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fastapi
jinja2
pyensign>=0.7.1b1
pyensign==0.10b0
river==0.17.0
uvicorn
websockets
Loading