diff --git a/cmd/neofs-net-monitor/monitor.go b/cmd/neofs-net-monitor/monitor.go index 3798c01..42fbf04 100644 --- a/cmd/neofs-net-monitor/monitor.go +++ b/cmd/neofs-net-monitor/monitor.go @@ -9,7 +9,6 @@ import ( "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" "github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain" "github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain/contracts" - "github.com/nspcc-dev/neofs-net-monitor/pkg/multinodepool" "github.com/nspcc-dev/neofs-net-monitor/pkg/pool" "github.com/spf13/viper" "go.uber.org/zap" @@ -39,7 +38,7 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { var job monitor.Job if cfg.GetBool(cfgChainFSChain) { monitor.RegisterSideChainMetrics() - job, err = sideChainJob(ctx, cfg, sideNeogoClient, logger, sideChainEndpoints) + job, err = sideChainJob(sideNeogoClient, logger) } else { monitor.RegisterMainChainMetrics() job, err = mainChainJob(cfg, sideNeogoClient, logger) @@ -89,7 +88,7 @@ func mainChainJob(cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger) }), nil } -func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger, sideChainEndpoints []string) (*monitor.SideJob, error) { +func sideChainJob(neogoClient *pool.Pool, logger *zap.Logger) (*monitor.SideJob, error) { netmapContract, err := neogoClient.ResolveContract(rpcnns.NameNetmap) if err != nil { return nil, fmt.Errorf("can't read netmap scripthash: %w", err) @@ -140,11 +139,6 @@ func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool, proxy = &proxyContract } - mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval)) - if err = mnPool.Dial(ctx); err != nil { - return nil, fmt.Errorf("multinodepool: %w", err) - } - return monitor.NewSideJob(monitor.SideJobArgs{ Logger: logger, Balance: balance, @@ -154,7 +148,7 @@ func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool, IRFetcher: nmFetcher, BalanceFetcher: balanceFetcher, CnrFetcher: cnrFetcher, - HeightFetcher: mnPool, - StateFetcher: mnPool, + HeightFetcher: neogoClient, + StateFetcher: neogoClient, }), nil } diff --git a/pkg/multinodepool/pool.go b/pkg/multinodepool/pool.go deleted file mode 100644 index 94719ee..0000000 --- a/pkg/multinodepool/pool.go +++ /dev/null @@ -1,127 +0,0 @@ -package multinodepool - -import ( - "context" - "fmt" - "log" - "sync" - "time" - - "github.com/nspcc-dev/neo-go/pkg/rpcclient" - "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" -) - -// Pool collects data from each node. -type Pool struct { - endpoints []string - dialTimeout time.Duration - clients []*rpcclient.Client -} - -func NewPool(endpoints []string, dialTimeout time.Duration) *Pool { - return &Pool{ - endpoints: endpoints, - dialTimeout: dialTimeout, - clients: make([]*rpcclient.Client, len(endpoints)), - } -} - -func (c *Pool) Dial(ctx context.Context) error { - opts := rpcclient.Options{DialTimeout: c.dialTimeout} - - for i, ep := range c.endpoints { - neoClient, err := neoGoClient(ctx, ep, opts) - if err != nil { - return fmt.Errorf("neoGoClient: %w", err) - } - - c.clients[i] = neoClient - } - - return nil -} - -func (c *Pool) FetchHeight() []monitor.HeightData { - var ( - heights []monitor.HeightData - wg sync.WaitGroup - heightChan = make(chan monitor.HeightData, len(c.clients)) - ) - - for _, cl := range c.clients { - wg.Add(1) - - go func(cl *rpcclient.Client) { - defer wg.Done() - - stHeight, err := cl.GetStateHeight() - if err != nil { - log.Printf("GetStateHeight for %s: %v", cl.Endpoint(), err) - return - } - - heightChan <- monitor.HeightData{ - Host: cl.Endpoint(), - Value: stHeight.Local, - } - }(cl) - } - - go func() { - wg.Wait() - close(heightChan) - }() - - for height := range heightChan { - heights = append(heights, height) - } - - return heights -} - -func (c *Pool) FetchState(height uint32) []monitor.StateData { - var ( - states []monitor.StateData - wg sync.WaitGroup - stateChan = make(chan monitor.StateData, len(c.clients)) - ) - - for _, cl := range c.clients { - wg.Add(1) - - go func(cl *rpcclient.Client) { - defer wg.Done() - - stHeight, err := cl.GetStateRootByHeight(height) - if err != nil { - log.Printf("GetStateRootByHeight for %s: %v", cl.Endpoint(), err) - return - } - - stateChan <- monitor.StateData{ - Host: cl.Endpoint(), - Value: stHeight.Hash().String(), - } - }(cl) - } - - go func() { - wg.Wait() - close(stateChan) - }() - - for state := range stateChan { - states = append(states, state) - } - - return states -} - -func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) { - cli, err := rpcclient.New(ctx, endpoint, opts) - if err != nil { - return nil, fmt.Errorf("can't create neo-go client: %w", err) - } - - return cli, nil -} diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index b3f6ca9..38f1fbb 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log" "sync" "sync/atomic" "time" @@ -19,6 +20,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" rpcnns "github.com/nspcc-dev/neofs-contract/rpc/nns" + "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" ) // Pool represent virtual connection to the Neo network to communicate @@ -26,15 +28,14 @@ import ( type Pool struct { ctx context.Context mu sync.RWMutex - rpc *rpcclient.Client - invoker *invoker.Invoker + clients []*rpcclient.Client opts rpcclient.Options lastHealthyTimestamp int64 recheckInterval time.Duration - next int - endpoints []string + current, next int + endpoints []string } // PrmPool groups parameter to create Pool. @@ -59,9 +60,61 @@ func NewPool(ctx context.Context, prm PrmPool) (*Pool, error) { endpoints: prm.Endpoints, recheckInterval: recheck, opts: rpcclient.Options{DialTimeout: prm.DialTimeout}, + clients: make([]*rpcclient.Client, len(prm.Endpoints)), } - return pool, pool.establishNewConnection() + if err := pool.dial(ctx); err != nil { + return nil, err + } + + go func() { + tick := time.NewTicker(recheck) + + for { + select { + case <-tick.C: + pool.recheck(ctx) + case <-ctx.Done(): + tick.Stop() + return + } + } + }() + + return pool, nil +} + +func (p *Pool) dial(ctx context.Context) error { + opts := rpcclient.Options{DialTimeout: p.opts.DialTimeout} + + for i, ep := range p.endpoints { + neoClient, err := neoGoClient(ctx, ep, opts) + if err != nil { + return err + } + + p.clients[i] = neoClient + } + + return nil +} + +func (p *Pool) recheck(ctx context.Context) { + p.mu.Lock() + defer p.mu.Unlock() + + for i := 0; i < len(p.endpoints); i++ { + cl := p.clients[i] + + if _, err := cl.GetBlockCount(); err != nil { + p.clients[i], err = neoGoClient(ctx, p.endpoints[i], p.opts) + if err != nil { + log.Printf("reconnect to Neo node %s failed: %v", cl.Endpoint(), err) + } + + continue + } + } } func (p *Pool) isCurrentHealthy() bool { @@ -192,13 +245,13 @@ func (p *Pool) GetCommittee() (keys.PublicKeys, error) { func (p *Pool) conn() *rpcclient.Client { p.mu.RLock() defer p.mu.RUnlock() - return p.rpc + return p.clients[p.current] } func (p *Pool) invokerConn() *invoker.Invoker { p.mu.RLock() defer p.mu.RUnlock() - return p.invoker + return invoker.New(p.clients[p.current], nil) } func (p *Pool) establishNewConnection() error { @@ -208,8 +261,8 @@ func (p *Pool) establishNewConnection() error { for i := p.next; i < p.next+len(p.endpoints); i++ { index := i % len(p.endpoints) - if p.rpc, err = neoGoClient(p.ctx, p.endpoints[index], p.opts); err == nil { - p.invoker = invoker.New(p.rpc, nil) + if p.clients[p.current], err = neoGoClient(p.ctx, p.endpoints[index], p.opts); err == nil { + p.current = index % len(p.endpoints) p.next = (index + 1) % len(p.endpoints) return nil } @@ -221,12 +274,12 @@ func (p *Pool) establishNewConnection() error { func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) { cli, err := rpcclient.New(ctx, endpoint, opts) if err != nil { - return nil, fmt.Errorf("can't create neo-go client: %w", err) + return nil, fmt.Errorf("create Neo RPC client: %w", err) } err = cli.Init() if err != nil { - return nil, fmt.Errorf("can't init neo-go client: %w", err) + return nil, fmt.Errorf("init Neo RPC client: %w", err) } return cli, nil @@ -248,3 +301,81 @@ func (p *Pool) ResolveContract(contractName string) (util.Uint160, error) { return addr, nil } + +func (p *Pool) FetchHeight() []monitor.HeightData { + p.mu.RLock() + defer p.mu.RUnlock() + + var ( + heights []monitor.HeightData + wg sync.WaitGroup + heightChan = make(chan monitor.HeightData, len(p.clients)) + ) + + for _, cl := range p.clients { + wg.Add(1) + + go func(cl *rpcclient.Client) { + defer wg.Done() + + stHeight, err := cl.GetStateHeight() + if err != nil { + log.Printf("read state height of Neo node %s: %v", cl.Endpoint(), err) + return + } + + heightChan <- monitor.HeightData{ + Host: cl.Endpoint(), + Value: stHeight.Local, + } + }(cl) + } + + wg.Wait() + close(heightChan) + + for height := range heightChan { + heights = append(heights, height) + } + + return heights +} + +func (p *Pool) FetchState(height uint32) []monitor.StateData { + p.mu.RLock() + defer p.mu.RUnlock() + + var ( + states []monitor.StateData + wg sync.WaitGroup + stateChan = make(chan monitor.StateData, len(p.clients)) + ) + + for _, cl := range p.clients { + wg.Add(1) + + go func(cl *rpcclient.Client) { + defer wg.Done() + + stHeight, err := cl.GetStateRootByHeight(height) + if err != nil { + log.Printf("read state root at height #%d from Neo node %s: %v", height, cl.Endpoint(), err) + return + } + + stateChan <- monitor.StateData{ + Host: cl.Endpoint(), + Value: stHeight.Hash().String(), + } + }(cl) + } + + wg.Wait() + close(stateChan) + + for st := range stateChan { + states = append(states, st) + } + + return states +}