Skip to content

Commit

Permalink
Merge pull request #9 from rotationalio/sc-21689
Browse files Browse the repository at this point in the history
Update to PyEnsign v0.10b0
  • Loading branch information
pdeziel authored Oct 6, 2023
2 parents 305979e + e98b99f commit 96e78ac
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 33 deletions.
14 changes: 3 additions & 11 deletions pypredict/api/endpoints/homepage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import json

from fastapi import APIRouter
Expand Down Expand Up @@ -34,12 +33,8 @@ async def subscribe(self):
Subscribe to trading events from Ensign and run an
online model pipeline and publish predictions to a new topic.
"""
# Get the topic ID from the topic name.
topic_id = await self.ensign.topic_id(self.topic)
# Subscribe to the topic.
# self.generate_price_info is a callback function that gets executed when
# a new event arrives in the topic
await self.ensign.subscribe(topic_id, on_event=self.generate_price_info)
async for event in self.ensign.subscribe(self.topic):
await self.generate_price_info(event)

templates = Jinja2Templates(directory=config.TEMPLATE_DIR)
router = APIRouter()
Expand All @@ -49,10 +44,7 @@ async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
subscriber = PredictionsSubscriber(websocket)
await subscriber.subscribe()
# create a Future and await its result - this will ensure that the
# subscriber will run forever since nothing in the code is setting the
# result of the Future
await asyncio.Future()


@router.get("/")
async def home(request: Request):
Expand Down
33 changes: 12 additions & 21 deletions pypredict/trades.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sys
import json
import logging
import asyncio
from datetime import datetime

Expand All @@ -12,11 +13,14 @@
from river import preprocessing


logging.basicConfig(level=logging.INFO)


async def handle_ack(ack):
_ = datetime.fromtimestamp(ack.committed.seconds + ack.committed.nanos / 1e9)

async def handle_nack(nack):
print(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}")
logging.error(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}")

class TradesPublisher:
"""
Expand Down Expand Up @@ -45,7 +49,6 @@ async def recv_and_publish(self, uri):
"""
Receive messages from the websocket and publish events to Ensign.
"""
topic_id = await self.ensign.topic_id(self.topic)
while True:
try:
async with websockets.connect(uri) as websocket:
Expand All @@ -55,11 +58,10 @@ async def recv_and_publish(self, uri):
while True:
message = await websocket.recv()
for event in self.message_to_events(json.loads(message)):
await self.ensign.publish(topic_id, event, on_ack=handle_ack, on_nack=handle_nack)
await self.ensign.publish(self.topic, event, on_ack=handle_ack, on_nack=handle_nack)
except websockets.exceptions.ConnectionClosedError as e:
# TODO: Make sure reconnect is happening for dropped connections.
print(f"Websocket connection closed: {e}")
await asyncio.sleep(1)
logging.error(f"Websocket connection closed: {e}")
continue

def message_to_events(self, message):
"""
Expand Down Expand Up @@ -139,27 +141,16 @@ async def run_model_pipeline(self, event):

# create an Ensign event and publish to the predictions topic
event = Event(json.dumps(message).encode("utf-8"), mimetype="application/json")
# Get the topic ID from the topic name.
topic_id = await self.ensign.topic_id(self.pub_topic)
await self.ensign.publish(topic_id, event, on_ack=handle_ack, on_nack=handle_nack)
await self.ensign.publish(self.pub_topic, event, on_ack=handle_ack, on_nack=handle_nack)

async def subscribe(self):
"""
Subscribe to trading events from Ensign and run an
online model pipeline and publish predictions to a new topic.
"""

# Get the topic ID from the topic name.
topic_id = await self.ensign.topic_id(self.sub_topic)

# Subscribe to the topic.
# self.run_model_pipeline is a callback function that gets executed when
# a new event arrives in the topic
await self.ensign.subscribe(topic_id, on_event=self.run_model_pipeline)
# create a Future and await its result - this will ensure that the
# subscriber will run forever since nothing in the code is setting the
# result of the Future
await asyncio.Future()
# Subscribe to the topic and run the model pipeline after receiving event
async for event in self.ensign.subscribe(self.sub_topic):
await self.run_model_pipeline(event)

if __name__ == "__main__":
# Run the publisher or subscriber depending on the command line arguments.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fastapi
jinja2
pyensign>=0.7.1b1
pyensign==0.10b0
river==0.17.0
uvicorn
websockets

0 comments on commit 96e78ac

Please sign in to comment.