Skip to content

Commit

Permalink
Revert go_kraken/websocket package (#21)
Browse files Browse the repository at this point in the history
* remove socket address

* rework kraken_websocket.go

* remove go_kraken

* delete .vscode

* clean and update feed_service

* update README

* rename intertal/ports/socket.go
  • Loading branch information
louisinger authored May 5, 2021
1 parent 09069ff commit 71ec31c
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 74 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ connects to kraken socket and to a local instance of tdex-deamon.

```
daemon_endpoint: String with the address and port of gRPC host. Required.
kraken_ws_endpoint: String with the address and port of kraken socket. Required.
markets: Json List with necessary markets informations. Required.
market: Json List with necessary markets informations. Required.
base_asset: String of the Hash of the base asset for gRPC request. Required.
quote_asset: String of the Hash of the quote asset for gRPC request. Required.
kraken_ticker: String with the ticker we want kraken to provide informations on. Required.
Expand Down
2 changes: 1 addition & 1 deletion cmd/feederd/config.test.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"daemon_endpoint":"127.0.0.1:9000","kraken_ws_endpoint":"ws.kraken.com","markets":[{"base_asset":"5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225","quote_asset":"91b988204bfb8568d81d7c2962c5f37d6e413e5efaa36c417d245ba2237e8320","kraken_ticker":"LTC/USDT","interval":500}]}
{"daemon_endpoint":"127.0.0.1:9000","markets":[{"base_asset":"5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225","quote_asset":"25e4fac326a58791c64e31b29d446ac6a6ea5f52ab3a62b12596c7abc34f35a6","kraken_ticker":"LTC/USDT","interval":500}]}
3 changes: 1 addition & 2 deletions cmd/feederd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func runDaemonAndInitConfigFile(t *testing.T) {
usdt := runDaemonAndCreateMarket(t)

configJSON := adapters.ConfigJSON{
DaemonEndpoint: daemonEndpoint,
KrakenWsEndpoint: krakenWsEndpoint,
DaemonEndpoint: daemonEndpoint,
Markets: []adapters.MarketJSON{
adapters.MarketJSON{
KrakenTicker: "LTC/USDT",
Expand Down
19 changes: 10 additions & 9 deletions config.example.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"daemon_endpoint":"127.0.0.1:9000",
"kraken_ws_endpoint":"ws.kraken.com",
"markets":[{
"base_asset":"5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225",
"quote_asset":"e3ec88b0ed9228c3337f746f5d9ea20c1e68bbe1f73d94621c1e8452bebb9e22",
"kraken_ticker":"LTC/USDT",
"interval":500
}]
}
"daemon_endpoint": "127.0.0.1:9000",
"markets": [
{
"base_asset": "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225",
"quote_asset": "e3ec88b0ed9228c3337f746f5d9ea20c1e68bbe1f73d94621c1e8452bebb9e22",
"kraken_ticker": "LTC/USDT",
"interval": 500
}
]
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/tdex-network/tdex-feeder
go 1.15

require (
github.com/aopoltorzhicky/go_kraken/websocket v0.0.10
github.com/gorilla/websocket v1.4.2
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
Expand Down
12 changes: 2 additions & 10 deletions internal/adapters/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,20 @@ type MarketJSON struct {

// ConfigJSON is the struct describing the shape of config JSON file
type ConfigJSON struct {
DaemonEndpoint string `json:"daemon_endpoint"`
KrakenWsEndpoint string `json:"kraken_ws_endpoint"`
Markets []MarketJSON `json:"markets"`
DaemonEndpoint string `json:"daemon_endpoint"`
Markets []MarketJSON `json:"markets"`
}

// Config is the config of the application retreived from config JSON file
type Config struct {
daemonEndpoint string
krakenWSaddress string
markets map[string]domain.Market
marketIntervals map[domain.Market]time.Duration
}

// ToFeederService transforms a Config into FeederService
func (config *Config) ToFeederService() application.FeederService {
feederSvc := application.NewFeederService(application.NewFeederServiceArgs{
KrakenWSaddress: config.krakenWSaddress,
OperatorEndpoint: config.daemonEndpoint,
TickerToMarket: config.markets,
MarketToInterval: config.marketIntervals,
Expand All @@ -58,7 +55,6 @@ func (config *Config) UnmarshalJSON(data []byte) error {
}

config.daemonEndpoint = jsonConfig.DaemonEndpoint
config.krakenWSaddress = jsonConfig.KrakenWsEndpoint

configTickerToMarketMap := make(map[string]domain.Market)
marketIntervalsMap := make(map[domain.Market]time.Duration)
Expand All @@ -84,10 +80,6 @@ func (configJson ConfigJSON) validate() error {
return ErrDaemonEndpointIsEmpty
}

if configJson.KrakenWsEndpoint == "" {
return ErrKrakenEndpointIsEmpty
}

if len(configJson.Markets) == 0 {
return ErrNeedAtLeastOneMarketToFeed
}
Expand Down
8 changes: 4 additions & 4 deletions internal/application/feed_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func NewKrakenFeedService(
tickersToSubscribe = append(tickersToSubscribe, k)
}

krakenSocket := ports.NewKrakenWebSocket()
err := krakenSocket.Connect(address, tickersToSubscribe)
krakenSocket := ports.NewKrakenWebSocket(tickersToSubscribe)
err := krakenSocket.Connect()
if err != nil {
return nil, err
}
Expand All @@ -52,15 +52,15 @@ func NewKrakenFeedService(
func (f *krakenFeedService) Start() {
listening := true
log.Info("Kraken web socket feed is listening")
tickerWithPriceChan, err := f.krakenWebSocket.StartListen()
tickerWithPriceChan, err := f.krakenWebSocket.Start()
if err != nil {
log.Fatal(err)
}
for listening {
select {
case <-f.stopChan:
listening = false
err := f.krakenWebSocket.Close()
err := f.krakenWebSocket.Stop()
if err != nil {
log.Fatal(err)
}
Expand Down
91 changes: 52 additions & 39 deletions internal/ports/kraken_websocket.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,95 @@
package ports

import (
"errors"
"net/url"

ws "github.com/aopoltorzhicky/go_kraken/websocket"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// KrakenWebSocket is the interface to manage kraken web socket streams
type KrakenWebSocket interface {
Connect(address string, tickersToSubscribe []string) error
StartListen() (chan TickerWithPrice, error)
Close() error
Connect() error
Start() (chan TickerWithPrice, error)
Stop() error
}

type krakenWebSocket struct {
krakenWS *ws.Client
krakenWebSocketConn *websocket.Conn
tickerWithPriceChan chan TickerWithPrice
tickersToSubscribe []string
isListening bool
}

// NewKrakenWebSocket is a factory function for KrakenWebSocket interface
func NewKrakenWebSocket() KrakenWebSocket {
func NewKrakenWebSocket(tickersToSubscribe []string) KrakenWebSocket {
return &krakenWebSocket{
krakenWS: ws.New(),
krakenWebSocketConn: nil,
tickerWithPriceChan: make(chan TickerWithPrice),
tickersToSubscribe: tickersToSubscribe,
isListening: false,
}
}

// Connect method will connect to the websocket kraken server, ping it and subscribe to tickers threads.
func (socket *krakenWebSocket) Connect(address string, tickersToSubscribe []string) error {
func (socket *krakenWebSocket) Connect() error {
// connect to server
err := socket.krakenWS.Connect()
if err != nil {
return err
}
// test if the server is alive
err = socket.krakenWS.Ping()
url := url.URL{Scheme: "wss", Host: krakenWebSocketURL, Path: "/"}
websocketConn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
return err
}

// subscribe to tickers
err = socket.krakenWS.SubscribeTicker(tickersToSubscribe)
socket.krakenWebSocketConn = websocketConn
subscribeMsg := createSubscribeToMarketMessage(socket.tickersToSubscribe)
err = sendRequestMessage(socket.krakenWebSocketConn, subscribeMsg)
if err != nil {
return err
}

return nil
}

func (socket *krakenWebSocket) StartListen() (chan TickerWithPrice, error) {
if socket.krakenWS == nil {
return nil, errors.New("Socket not connected")
func (socket *krakenWebSocket) Start() (chan TickerWithPrice, error) {
go socket.listen()
return socket.tickerWithPriceChan, nil
}

func (socket *krakenWebSocket) listen() {
if socket.krakenWebSocketConn == nil {
return
}

if socket.isListening {
return
}

go func() {
for obj := range socket.krakenWS.Listen() {
switch obj := obj.(type) {
case error:
log.Debug("Channel closed: ", obj)
case ws.DataUpdate:
tickerUpdate, ok := obj.Data.(ws.TickerUpdate)
if ok {
result := TickerWithPrice{
Ticker: tickerUpdate.Pair,
Price: tickerUpdate.Close.Today.(float64),
}
socket.tickerWithPriceChan <- result
}
socket.isListening = true

for {
_, message, err := socket.krakenWebSocketConn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Warn("error: ", err)
}
socket.isListening = false
break
}
}()

return socket.tickerWithPriceChan, nil
tickerWithPrice, err := toTickerWithPrice(message)
if err != nil {
continue
}

socket.tickerWithPriceChan <- *tickerWithPrice
}
}

func (socket *krakenWebSocket) Close() error {
socket.krakenWS.Close()
socket.krakenWS = nil
func (socket *krakenWebSocket) Stop() error {
err := socket.krakenWebSocketConn.Close()
if err != nil {
return err
}
socket.krakenWebSocketConn = nil
return nil
}
27 changes: 21 additions & 6 deletions internal/ports/kraken_websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
)

const (
address = "ws.kraken.com"
ticker = "XBT/USDT"
ticker = "XBT/USDT"
)

func createAndConnect() (KrakenWebSocket, error) {
krakenWS := NewKrakenWebSocket()
err := krakenWS.Connect(address, []string{ticker})
krakenWS := NewKrakenWebSocket([]string{ticker})
err := krakenWS.Connect()
return krakenWS, err
}

Expand All @@ -22,15 +21,31 @@ func TestConnectToKrakenWebSocket(t *testing.T) {
assert.Nil(t, err)
}

func TestListen(t *testing.T) {
func TestStart(t *testing.T) {
ws, err := createAndConnect()
if err != nil {
t.Error(err)
}

tickerWithPriceChan, err := ws.StartListen()
tickerWithPriceChan, err := ws.Start()
assert.Nil(t, err)

nextTickerWithPrice := <-tickerWithPriceChan
assert.NotNil(t, nextTickerWithPrice)
}

func TestStop(t *testing.T) {
ws, err := createAndConnect()
if err != nil {
t.Error(err)
}

tickerWithPriceChan, err := ws.Start()
assert.Nil(t, err)

nextTickerWithPrice := <-tickerWithPriceChan
assert.NotNil(t, nextTickerWithPrice)

err = ws.Stop()
assert.Nil(t, err)
}
Loading

0 comments on commit 71ec31c

Please sign in to comment.