Skip to content

Commit

Permalink
Merge pull request #108 from nspcc-dev/104-merge-multinode-and-regula…
Browse files Browse the repository at this point in the history
…r-pools
  • Loading branch information
cthulhu-rider authored Dec 29, 2023
2 parents 935a88c + d146024 commit 063f0bf
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 148 deletions.
14 changes: 4 additions & 10 deletions cmd/neofs-net-monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain/contracts"
"github.com/nspcc-dev/neofs-net-monitor/pkg/multinodepool"
"github.com/nspcc-dev/neofs-net-monitor/pkg/pool"
"github.com/spf13/viper"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,7 +38,7 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) {
var job monitor.Job
if cfg.GetBool(cfgChainFSChain) {
monitor.RegisterSideChainMetrics()
job, err = sideChainJob(ctx, cfg, sideNeogoClient, logger, sideChainEndpoints)
job, err = sideChainJob(sideNeogoClient, logger)
} else {
monitor.RegisterMainChainMetrics()
job, err = mainChainJob(cfg, sideNeogoClient, logger)
Expand Down Expand Up @@ -89,7 +88,7 @@ func mainChainJob(cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger)
}), nil
}

func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger, sideChainEndpoints []string) (*monitor.SideJob, error) {
func sideChainJob(neogoClient *pool.Pool, logger *zap.Logger) (*monitor.SideJob, error) {
netmapContract, err := neogoClient.ResolveContract(rpcnns.NameNetmap)
if err != nil {
return nil, fmt.Errorf("can't read netmap scripthash: %w", err)
Expand Down Expand Up @@ -140,11 +139,6 @@ func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool,
proxy = &proxyContract
}

mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval))
if err = mnPool.Dial(ctx); err != nil {
return nil, fmt.Errorf("multinodepool: %w", err)
}

return monitor.NewSideJob(monitor.SideJobArgs{
Logger: logger,
Balance: balance,
Expand All @@ -154,7 +148,7 @@ func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool,
IRFetcher: nmFetcher,
BalanceFetcher: balanceFetcher,
CnrFetcher: cnrFetcher,
HeightFetcher: mnPool,
StateFetcher: mnPool,
HeightFetcher: neogoClient,
StateFetcher: neogoClient,
}), nil
}
127 changes: 0 additions & 127 deletions pkg/multinodepool/pool.go

This file was deleted.

153 changes: 142 additions & 11 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
Expand All @@ -19,22 +20,22 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
rpcnns "github.com/nspcc-dev/neofs-contract/rpc/nns"
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
)

// Pool represent virtual connection to the Neo network to communicate
// with multiple Neo servers.
type Pool struct {
ctx context.Context
mu sync.RWMutex
rpc *rpcclient.Client
invoker *invoker.Invoker
clients []*rpcclient.Client
opts rpcclient.Options

lastHealthyTimestamp int64
recheckInterval time.Duration

next int
endpoints []string
current, next int
endpoints []string
}

// PrmPool groups parameter to create Pool.
Expand All @@ -59,9 +60,61 @@ func NewPool(ctx context.Context, prm PrmPool) (*Pool, error) {
endpoints: prm.Endpoints,
recheckInterval: recheck,
opts: rpcclient.Options{DialTimeout: prm.DialTimeout},
clients: make([]*rpcclient.Client, len(prm.Endpoints)),
}

return pool, pool.establishNewConnection()
if err := pool.dial(ctx); err != nil {
return nil, err
}

go func() {
tick := time.NewTicker(recheck)

for {
select {
case <-tick.C:
pool.recheck(ctx)
case <-ctx.Done():
tick.Stop()
return
}
}
}()

return pool, nil
}

func (p *Pool) dial(ctx context.Context) error {
opts := rpcclient.Options{DialTimeout: p.opts.DialTimeout}

for i, ep := range p.endpoints {
neoClient, err := neoGoClient(ctx, ep, opts)
if err != nil {
return err
}

p.clients[i] = neoClient
}

return nil
}

func (p *Pool) recheck(ctx context.Context) {
p.mu.Lock()
defer p.mu.Unlock()

for i := 0; i < len(p.endpoints); i++ {
cl := p.clients[i]

if _, err := cl.GetBlockCount(); err != nil {
p.clients[i], err = neoGoClient(ctx, p.endpoints[i], p.opts)
if err != nil {
log.Printf("reconnect to Neo node %s failed: %v", cl.Endpoint(), err)
}

continue
}
}
}

func (p *Pool) isCurrentHealthy() bool {
Expand Down Expand Up @@ -192,13 +245,13 @@ func (p *Pool) GetCommittee() (keys.PublicKeys, error) {
func (p *Pool) conn() *rpcclient.Client {
p.mu.RLock()
defer p.mu.RUnlock()
return p.rpc
return p.clients[p.current]
}

func (p *Pool) invokerConn() *invoker.Invoker {
p.mu.RLock()
defer p.mu.RUnlock()
return p.invoker
return invoker.New(p.clients[p.current], nil)
}

func (p *Pool) establishNewConnection() error {
Expand All @@ -208,8 +261,8 @@ func (p *Pool) establishNewConnection() error {

for i := p.next; i < p.next+len(p.endpoints); i++ {
index := i % len(p.endpoints)
if p.rpc, err = neoGoClient(p.ctx, p.endpoints[index], p.opts); err == nil {
p.invoker = invoker.New(p.rpc, nil)
if p.clients[p.current], err = neoGoClient(p.ctx, p.endpoints[index], p.opts); err == nil {
p.current = index % len(p.endpoints)
p.next = (index + 1) % len(p.endpoints)
return nil
}
Expand All @@ -221,12 +274,12 @@ func (p *Pool) establishNewConnection() error {
func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) {
cli, err := rpcclient.New(ctx, endpoint, opts)
if err != nil {
return nil, fmt.Errorf("can't create neo-go client: %w", err)
return nil, fmt.Errorf("create Neo RPC client: %w", err)
}

err = cli.Init()
if err != nil {
return nil, fmt.Errorf("can't init neo-go client: %w", err)
return nil, fmt.Errorf("init Neo RPC client: %w", err)
}

return cli, nil
Expand All @@ -248,3 +301,81 @@ func (p *Pool) ResolveContract(contractName string) (util.Uint160, error) {

return addr, nil
}

func (p *Pool) FetchHeight() []monitor.HeightData {
p.mu.RLock()
defer p.mu.RUnlock()

var (
heights []monitor.HeightData
wg sync.WaitGroup
heightChan = make(chan monitor.HeightData, len(p.clients))
)

for _, cl := range p.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateHeight()
if err != nil {
log.Printf("read state height of Neo node %s: %v", cl.Endpoint(), err)
return
}

heightChan <- monitor.HeightData{
Host: cl.Endpoint(),
Value: stHeight.Local,
}
}(cl)
}

wg.Wait()
close(heightChan)

for height := range heightChan {
heights = append(heights, height)
}

return heights
}

func (p *Pool) FetchState(height uint32) []monitor.StateData {
p.mu.RLock()
defer p.mu.RUnlock()

var (
states []monitor.StateData
wg sync.WaitGroup
stateChan = make(chan monitor.StateData, len(p.clients))
)

for _, cl := range p.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateRootByHeight(height)
if err != nil {
log.Printf("read state root at height #%d from Neo node %s: %v", height, cl.Endpoint(), err)
return
}

stateChan <- monitor.StateData{
Host: cl.Endpoint(),
Value: stHeight.Hash().String(),
}
}(cl)
}

wg.Wait()
close(stateChan)

for st := range stateChan {
states = append(states, st)
}

return states
}

0 comments on commit 063f0bf

Please sign in to comment.