diff --git a/config/config.go b/config/config.go index 73cb5c2..a57a005 100644 --- a/config/config.go +++ b/config/config.go @@ -71,15 +71,13 @@ func (c Config) Validate() (err error) { return nil } -func (c *Config) AssetPairs() []server.AssetPair { +func (c *Config) AssetPairs(exchange Exchange) []server.AssetPair { var assetPairs []server.AssetPair - for _, exchange := range c.Exchanges { - for _, pool := range exchange.Pools { - assetPairs = append(assetPairs, server.AssetPair{ - Base: pool.Base, - Quote: pool.Quote, - }) - } + for _, pool := range exchange.Pools { + assetPairs = append(assetPairs, server.AssetPair{ + Base: pool.Base, + Quote: pool.Quote, + }) } return assetPairs diff --git a/go.mod b/go.mod index 4717e4e..4451262 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( cosmossdk.io/math v1.1.2 github.com/ethereum/go-ethereum v1.13.4 github.com/golangci/golangci-lint v1.55.2 - github.com/ojo-network/indexer v0.0.4 + github.com/ojo-network/indexer v0.0.5 github.com/rs/zerolog v1.31.0 - gopkg.in/yaml.v3 v3.0.1 + github.com/spf13/viper v1.12.0 ) require ( @@ -140,7 +140,6 @@ require ( github.com/spf13/cobra v1.7.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.12.0 // indirect github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect github.com/stretchr/objx v0.5.0 // indirect @@ -169,6 +168,7 @@ require ( golang.org/x/text v0.13.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.4.6 // indirect mvdan.cc/gofumpt v0.5.0 // indirect mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed // indirect @@ -202,7 +202,7 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect diff --git a/go.sum b/go.sum index b3d6d68..861f667 100644 --- a/go.sum +++ b/go.sum @@ -479,8 +479,8 @@ github.com/nunnatsa/ginkgolinter v0.14.1 h1:khx0CqR5U4ghsscjJ+lZVthp3zjIFytRXPTa github.com/nunnatsa/ginkgolinter v0.14.1/go.mod h1:nY0pafUSst7v7F637e7fymaMlQqI9c0Wka2fGsDkzWg= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= -github.com/ojo-network/indexer v0.0.4 h1:rAnnCVM6Ydx4aw0z0tNEPAME1sYrjpbpcM/fg6F+Onk= -github.com/ojo-network/indexer v0.0.4/go.mod h1:Zi3h9u2KGKyhKIZd0V5BcBMOh6cVySGdeo3Dv54Var8= +github.com/ojo-network/indexer v0.0.5 h1:TsJcTzntRkMlGzLcltmybeN5Ux3gOMn3mr5FS431q1A= +github.com/ojo-network/indexer v0.0.5/go.mod h1:Zi3h9u2KGKyhKIZd0V5BcBMOh6cVySGdeo3Dv54Var8= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/main.go b/main.go index ef4f7ab..5fce6ba 100644 --- a/main.go +++ b/main.go @@ -21,39 +21,38 @@ func main() { logger := zerolog.New(os.Stderr).With().Timestamp().Logger() ctx, cancel := context.WithCancel(context.Background()) - // Cancel the context on user interrupt - userInterrupt := make(chan os.Signal, 1) - signal.Notify(userInterrupt, os.Interrupt, syscall.SIGTERM) - go func() { - <-userInterrupt - logger.Info().Msg("user interrupt") - cancel() - }() - // Parse the config cfg, err := config.ParseConfig("config.yaml") if err != nil { logger.Fatal().Err(err).Msg("error parsing config") } - // Initialize indexer - i := indexer.NewIndexer(logger, ctx) - - // Start and maintain connection to blockchain nodes for each exchange we index + // Create new websocket and REST server for each exchange for _, exchange := range cfg.Exchanges { + // Initialize indexer + i := indexer.NewIndexer(logger, ctx) + + // Start and maintain connection to blockchain nodes client.MaintainConnection(exchange, i, ctx, logger) - } - // Create new websocket and REST server - s, err := server.NewServer(logger, cfg.Server, cfg.AssetPairs()) - if err != nil { - logger.Error().Err(err).Msg("error creating server") - cancel() + s, err := server.NewServer(logger, cfg.Server, cfg.AssetPairs(exchange)) + if err != nil { + logger.Error().Err(err).Msg("error creating server") + cancel() + } + + go func(exchangeName string) { + err = s.StartWebsocketAPI(ctx, logger, i, exchangeName) + if err != nil { + logger.Error().Err(err).Msg("websocket api error") + } + }(string(exchange.Name)) } - // Start websocket server and block until error or context is cancelled - err = s.StartWebsocketAPI(ctx, logger, i) - if err != nil { - logger.Error().Err(err).Msg("websocket api error") - } + // Cancel the context on user interrupt + userInterrupt := make(chan os.Signal, 1) + signal.Notify(userInterrupt, os.Interrupt, syscall.SIGTERM) + <-userInterrupt + logger.Info().Msg("user interrupt") + cancel() }