Skip to content

Commit

Permalink
Merge pull request #204 from s4w3d0ff/websocket-dev
Browse files Browse the repository at this point in the history
Cleanup the websocket class
  • Loading branch information
s4w3d0ff authored Jul 13, 2019
2 parents 2dd8a37 + 6938443 commit c8294bb
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 138 deletions.
74 changes: 26 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,60 +77,31 @@ print(polo.returnTradeHistory('BTC_ETH'))
You can also not use the 'helper' methods at all and use `poloniex.PoloniexBase` which only has `returnMarketHist` and `__call__` to make rest api calls.

#### Websocket Usage:
To connect to the websocket api just create a child class of `PoloniexSocketed` like so:
To connect to the websocket api use the `PoloniexSocketed` class like so:
```python
import poloniex
import logging
from time import sleep

logging.basicConfig()

class MySocket(poloniex.PoloniexSocketed):

def on_heartbeat(self, msg):
"""
Triggers whenever we get a heartbeat message
"""
print(msg)

def on_volume(self, msg):
"""
Triggers whenever we get a 24hvolume message
"""
print(msg)

def on_ticker(self, msg):
"""
Triggers whenever we get a ticker message
"""
print(msg)

def on_market(self, msg):
"""
Triggers whenever we get a market ('currencyPair') message
"""
print(msg)

def on_account(self, msg):
"""
Triggers whenever we get an account message
"""
print(msg)

sock = MySocket()
# helps show what is going on
sock.logger.setLevel(logging.DEBUG)
# start the websocket thread and subscribe to '24hvolume'
sock.startws(subscribe=['24hvolume'])
logging.basicConfig()
poloniex.logger.setLevel(logging.DEBUG)

def on_volume(data):
print(data)
# make instance
sock = poloniex.PoloniexSocketed()
# start the websocket thread and subscribe to '24hvolume' setting the callback to 'on_volume'
sock.startws(subscribe={'24hvolume': on_volume})
# give the socket some time to init
poloniex.sleep(5)
# this won't work:
#sock.subscribe('ticker')
# use channel id to un/sub
sock.subscribe('1002')
poloniex.sleep(1)
# unsub from ticker
sock.unsubscribe('1002')
poloniex.sleep(4)
sleep(5)
# use the channel name str or id int to subscribe/unsubscribe
sock.subscribe(chan='ticker', callback=print)
sleep(1)
# unsub from ticker using id (str name can be use as well)
sock.unsubscribe(1002)
sleep(4)
# stop websocket
sock.stopws()

```
Expand All @@ -152,5 +123,12 @@ DEBUG:poloniex:Unsubscribed to ticker
DEBUG:poloniex:Websocket Closed
INFO:poloniex:Websocket thread stopped/joined
```
You can also subscribe and start the websocket thread when creating an instance of `PoloniexSocketed` by using the `subscribe` and `start` args:
```python

sock = poloniex.PoloniexSocketed(subscribe={'24hvolume': print}, start=True)

```


**More examples of how to use websocket push API can be found [here](https://github.com/s4w3d0ff/python-poloniex/tree/master/examples).**
2 changes: 1 addition & 1 deletion examples/websocket/dictTicker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def on_ticker(self, data):
polo = TickPolo()
poloniex.logging.basicConfig()
polo.logger.setLevel(poloniex.logging.DEBUG)
polo.startws(['ticker'])
polo.startws({'ticker': polo.on_ticker})
for i in range(3):
pprint(polo.ticker('BTC_LTC'))
poloniex.sleep(10)
Expand Down
4 changes: 2 additions & 2 deletions examples/websocket/stopLimit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, *args, **kwargs):
def on_ticker(self, msg):
data = [float(dat) for dat in msg]
# check stop orders
mkt = self.channels[str(int(data[0]))]['name']
mkt = self._getChannelName(str(int(data[0])))
la = data[2]
hb = data[3]
for id in self.stopOrders:
Expand Down Expand Up @@ -98,6 +98,6 @@ def callbk(id):
callback=callbk,
# remove or set 'test' to false to place real orders
test=True)
test.startws(['ticker'])
test.startws({'ticker': test.on_ticker})
poloniex.sleep(120)
test.stopws(3)
158 changes: 71 additions & 87 deletions poloniex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,14 +683,8 @@ def toggleAutoRenew(self, orderNumber):

class PoloniexSocketed(Poloniex):
""" Child class of Poloniex with support for the websocket api """
def __init__(self, *args, **kwargs):
subscribe = False
start = False
if 'subscribe' in kwargs:
subscribe = kwargs.pop('subscribe')
if 'startws' in kwargs:
start = kwargs.pop('startws')
super(PoloniexSocketed, self).__init__(*args, **kwargs)
def __init__(self, key=None, secret=None, subscribe={}, start=False, *args, **kwargs):
super(PoloniexSocketed, self).__init__(key, secret, *args, **kwargs)
self.socket = WebSocketApp(url="wss://api2.poloniex.com/",
on_open=self.on_open,
on_message=self.on_message,
Expand All @@ -699,61 +693,30 @@ def __init__(self, *args, **kwargs):
self._t = None
self._running = False
self.channels = {
'1000': {'name': 'account',
'sub': False,
'callback': self.on_account},
'1002': {'name': 'ticker',
'sub': False,
'callback': self.on_ticker},
'1003': {'name': '24hvolume',
'sub': False,
'callback': self.on_volume},
'1010': {'name': 'heartbeat',
'sub': False,
'callback': self.on_heartbeat},
'account': {'id': '1000'},
'ticker': {'id': '1002'},
'24hvolume': {'id': '1003'},
'heartbeat': {'id': '1010',
'callback': self.on_heartbeat},
}
# add each market to channels list by id
# (wish there was a cleaner way of doing this...)
tick = self.returnTicker()
for market in tick:
self.channels[str(tick[market]['id'])] = {
'name': market,
'sub': False,
'callback': self.on_market
}
self.channels[market] = {'id': str(tick[market]['id'])}
# handle init subscribes
if subscribe:
for chan in self.channels:
if self.channels[chan]['name'] in subscribe or chan in subscribe:
self.channels[chan]['sub'] = True
self.setSubscribes(**subscribe)
if start:
self.startws()


def _handle_sub(self, message):
""" Handles websocket un/subscribe messages """
chan = str(message[0])
# skip heartbeats
if not chan == '1010':
# Subscribed
if message[1] == 1:
# update self.channels[chan]['sub'] flag
self.channels[chan]['sub'] = True
self.logger.debug('Subscribed to %s', self.channels[chan]['name'])
# return False so no callback trigger
return False
# Unsubscribed
if message[1] == 0:
# update self.channels[chan]['sub'] flag
self.channels[chan]['sub'] = False
self.logger.debug('Unsubscribed to %s', self.channels[chan]['name'])
# return False so no callback trigger
return False
# return chan name
return chan
def _getChannelName(self, id):
return next(
(chan for chan in self.channels if self.channels[chan]['id'] == id),
False)

def on_open(self, *ws):
for chan in self.channels:
if self.channels[chan]['sub']:
if 'sub' in self.channels[chan] and self.channels[chan]['sub']:
self.subscribe(chan)

def on_message(self, data):
Expand All @@ -766,45 +729,63 @@ def on_message(self, data):
# catch errors
if 'error' in message:
return self.logger.error(message['error'])
chan = self._getChannelName(str(message[0]))
# handle sub/unsub
chan = self._handle_sub(message)
if chan:
# skip heartbeats
if not chan == 'heartbeat':
# Subscribed
if message[1] == 1:
self.logger.debug('Subscribed to %s', chan)
# return False so no callback trigger
return False
# Unsubscribed
if message[1] == 0:
self.logger.debug('Unsubscribed to %s', chan)
# return False so no callback trigger
return False
if 'callback' in self.channels[chan]:
# activate chan callback
# heartbeats are funky
if not chan == '1010':
if not chan in ['account', 'heartbeat']:
message = message[2]
self.socket._callback(self.channels[chan]['callback'], message)

def on_error(self, error):
self.logger.error(error)

def on_close(self, *args):
self.logger.debug('Websocket Closed')
self.logger.info('Websocket Closed')

def on_ticker(self, args):
self.logger.debug(args)

def on_account(self, args):
self.logger.debug(args)

def on_market(self, args):
def on_heartbeat(self, args):
self.logger.debug(args)

def on_volume(self, args):
self.logger.debug(args)
def setCallback(self, chan, callback):
""" Sets the callback function for <chan> """
if isinstance(chan, int):
chan = self._getChannelName(str(chan))
self.channels[chan]['callback'] = callback

def on_heartbeat(self, args):
self.logger.debug(args)
def setSubscribes(self, **subs):
for sub in subs:
if not sub in self.channels:
self.logger.warning('Invalid channel: %s', sub)
else:
self.channels[sub]['sub'] = True
self.channels[sub]['callback'] = subs[sub]

def subscribe(self, chan):
def subscribe(self, chan, callback=None):
""" Sends the 'subscribe' command for <chan> """
if isinstance(chan, int):
chan = self._getChannelName(str(chan))
# account chan?
if chan in ['1000', 1000]:
if chan == 'account':
# sending commands to 'account' requires a key, secret and nonce
if not self.key or not self.secret:
raise PoloniexError(
"self.key and self.secret needed for 'account' channel"
)
self.channels[chan]['sub'] = True
if callback:
self.channels[chan]['callback'] = callback
payload = {'nonce': self.nonce}
payload_encoded = _urlencode(payload)
sign = _new(
Expand All @@ -814,22 +795,29 @@ def subscribe(self, chan):

self.socket.send(_dumps({
'command': 'subscribe',
'channel': chan,
'channel': self.channels[chan]['id'],
'sign': sign.hexdigest(),
'key': self.key,
'payload': payload_encoded}))
else:
self.socket.send(_dumps({'command': 'subscribe', 'channel': chan}))
self.channels[chan]['sub'] = True
if callback:
self.channels[chan]['callback'] = callback
self.socket.send(_dumps({'command': 'subscribe',
'channel': self.channels[chan]['id']}))

def unsubscribe(self, chan):
""" Sends the 'unsubscribe' command for <chan> """
if isinstance(chan, int):
chan = self._getChannelName(str(chan))
# account chan?
if chan in ['1000', 1000]:
if chan == 'account':
# sending commands to 'account' requires a key, secret and nonce
if not self.key or not self.secret:
raise PoloniexError(
"self.key and self.secret needed for 'account' channel"
)
self.channels[chan]['sub'] = False
payload = {'nonce': self.nonce}
payload_encoded = _urlencode(payload)
sign = _new(
Expand All @@ -839,39 +827,35 @@ def unsubscribe(self, chan):

self.socket.send(_dumps({
'command': 'unsubscribe',
'channel': chan,
'channel': self.channels[chan]['id'],
'sign': sign.hexdigest(),
'key': self.key,
'payload': payload_encoded}))
else:
self.socket.send(_dumps({'command': 'unsubscribe', 'channel': chan}))
self.channels[chan]['sub'] = False
self.socket.send(_dumps({'command': 'unsubscribe',
'channel': self.channels[chan]['id']}))

def setCallback(self, chan, callback):
""" Sets the callback function for <chan> """
self.channels[chan]['callback'] = callback

def startws(self, subscribe=[]):
def startws(self, subscribe=False):
"""
Run the websocket in a thread, use 'subscribe' arg to subscribe
to a channel on start
to channels on start
"""
self._t = Thread(target=self.socket.run_forever)
self._t.daemon = True
self._running = True
# set subscribes
for chan in self.channels:
if self.channels[chan]['name'] in subscribe or chan in subscribe:
self.channels[chan]['sub'] = True
if subscribe:
self.setSubscribes(**subscribe)
self._t.start()
self.logger.info('Websocket thread started')


def stopws(self, wait=0):
def stopws(self, wait=1):
""" Stop/join the websocket thread """
self._running = False
# unsubscribe from subs
for chan in self.channels:
if self.channels[chan]['sub'] == True:
if 'sub' in self.channels[chan] and self.channels[chan]['sub']:
self.unsubscribe(chan)
sleep(wait)
try:
Expand Down

0 comments on commit c8294bb

Please sign in to comment.