Skip to content

Commit

Permalink
schedule worker
Browse files Browse the repository at this point in the history
  • Loading branch information
benjyz committed Jan 10, 2019
1 parent e3c107f commit 40c27fc
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 14 deletions.
9 changes: 9 additions & 0 deletions archon/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,15 @@ def sync_orderbook(self, market, exchange):
except Exception as e:
logger.info("sync book failed %s"%e)

def sync_trades(self, market, exchange):
if exchange == exc.BITMEX:
try:
trades = self.afacade.market_history(market, exchange)
#logger.debug("trades %s"%str(trades))
self.db.trades.insert(trades)
except Exception as e:
logger.info("sync trades failed %s"%str(e))

def sync_orderbook_all(self, market):
self.db.orderbooks.drop()
for e in self.active_exchanges:
Expand Down
5 changes: 3 additions & 2 deletions archon/exchange/bitmex/bitmex.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ def recent_trades(self, symbol):
#'start': 0,
#'filter':
}

return self._query_bitmex(path=path,query=query)
print ("query ",query)
result = self._query_bitmex(path=path,query=query)
return result

@property
def snapshot(self):
Expand Down
10 changes: 10 additions & 0 deletions archon/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,18 @@ def market_history(self, market, exchange=None):
f = lambda x: models.conv_tx(x, exchange, market)
tx = list(map(f,tx))
return tx

elif exchange==exc.BITMEX:
try:
trades = client.recent_trades(market)
return trades
except Exception as e:
logger.info("get trades failed %s"%str(e))


#TODO


def get_orderbook(self, market, exchange=None):
client = clients[exchange]
logger.debug("get orderbook %s %i" %(str(market),exchange))
Expand Down
43 changes: 31 additions & 12 deletions archon/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import threading
from _thread import start_new_thread

from apscheduler.schedulers.blocking import BlockingScheduler
sched_logger = logging.getLogger("apscheduler")
sched_logger.setLevel(logging.WARNING)

def get_book(abroker):
symbol = "XBTUSD"
client = abroker.afacade.get_client(exc.BITMEX)
Expand Down Expand Up @@ -53,31 +57,46 @@ def __init__(self, broker, interval=10):
self.broker = broker
self.interval = interval

thread = threading.Thread(target=self.run, args=())
thread.daemon = True # Daemonize thread
#thread = threading.Thread(target=self.run, args=())
#thread.daemon = True # Daemonize thread
#thread.start() # Start the execution

def run(self):
scheduler = BlockingScheduler()
scheduler.add_job(self.sync_job, 'interval', seconds=10)
scheduler.start()

def sync_job(self):
""" run worker """

db = self.broker.get_db()
col = db.orderbooks #['bitmex_orderbook']

i = 0
#i = 0
logger.debug('sync orderbook in the background')
while True:
market = m.market_from("XBT","USD")
smarket = models.conv_markets_to(market, exc.BITMEX)
self.broker.sync_orderbook(smarket, exc.BITMEX)
#book = self.broker.afacade.get_orderbook(market, exc.BITMEX)
#while True:
market = m.market_from("XBT","USD")
smarket = models.conv_markets_to(market, exc.BITMEX)
self.broker.sync_orderbook(smarket, exc.BITMEX)
self.broker.sync_trades(smarket, exc.BITMEX)


#col.insert_one(book)
#logger.debug("sync.. %s"%str(book))
#print (book)

time.sleep(5)
i+=1
#time.sleep(5)
#i+=1

"""
market = models.market_from("XBT","USD")
self.broker.sync_orderbook(market, exc.BITMEX)
"""
"""

def some_job():
t = datetime.datetime.now()
print ("job %s"%t)

def start_schedule():
scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', seconds=5)
scheduler.start()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ bitmex-ws
deribit-api
loguru
tasktiger
apscheduler

0 comments on commit 40c27fc

Please sign in to comment.