Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge multinode and regular pools #108

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions cmd/neofs-net-monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain/contracts"
"github.com/nspcc-dev/neofs-net-monitor/pkg/multinodepool"
"github.com/nspcc-dev/neofs-net-monitor/pkg/pool"
"github.com/spf13/viper"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,7 +38,7 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) {
var job monitor.Job
if cfg.GetBool(cfgChainFSChain) {
monitor.RegisterSideChainMetrics()
job, err = sideChainJob(ctx, cfg, sideNeogoClient, logger, sideChainEndpoints)
job, err = sideChainJob(sideNeogoClient, logger)
} else {
monitor.RegisterMainChainMetrics()
job, err = mainChainJob(cfg, sideNeogoClient, logger)
Expand Down Expand Up @@ -89,7 +88,7 @@ func mainChainJob(cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger)
}), nil
}

func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger, sideChainEndpoints []string) (*monitor.SideJob, error) {
func sideChainJob(neogoClient *pool.Pool, logger *zap.Logger) (*monitor.SideJob, error) {
netmapContract, err := neogoClient.ResolveContract(rpcnns.NameNetmap)
if err != nil {
return nil, fmt.Errorf("can't read netmap scripthash: %w", err)
Expand Down Expand Up @@ -140,11 +139,6 @@ func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool,
proxy = &proxyContract
}

mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval))
if err = mnPool.Dial(ctx); err != nil {
return nil, fmt.Errorf("multinodepool: %w", err)
}

return monitor.NewSideJob(monitor.SideJobArgs{
Logger: logger,
Balance: balance,
Expand All @@ -154,7 +148,7 @@ func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool,
IRFetcher: nmFetcher,
BalanceFetcher: balanceFetcher,
CnrFetcher: cnrFetcher,
HeightFetcher: mnPool,
StateFetcher: mnPool,
HeightFetcher: neogoClient,
StateFetcher: neogoClient,
}), nil
}
127 changes: 0 additions & 127 deletions pkg/multinodepool/pool.go

This file was deleted.

153 changes: 142 additions & 11 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
Expand All @@ -19,22 +20,22 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
rpcnns "github.com/nspcc-dev/neofs-contract/rpc/nns"
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
)

// Pool represent virtual connection to the Neo network to communicate
// with multiple Neo servers.
type Pool struct {
ctx context.Context
mu sync.RWMutex
rpc *rpcclient.Client
invoker *invoker.Invoker
clients []*rpcclient.Client
opts rpcclient.Options

lastHealthyTimestamp int64
recheckInterval time.Duration

next int
endpoints []string
current, next int
endpoints []string
}

// PrmPool groups parameter to create Pool.
Expand All @@ -59,9 +60,61 @@ func NewPool(ctx context.Context, prm PrmPool) (*Pool, error) {
endpoints: prm.Endpoints,
recheckInterval: recheck,
opts: rpcclient.Options{DialTimeout: prm.DialTimeout},
clients: make([]*rpcclient.Client, len(prm.Endpoints)),
}

return pool, pool.establishNewConnection()
if err := pool.dial(ctx); err != nil {
return nil, err
}

go func() {
tick := time.NewTicker(recheck)

for {
select {
case <-tick.C:
pool.recheck(ctx)
case <-ctx.Done():
tick.Stop()
return
}
}
}()

return pool, nil
}

func (p *Pool) dial(ctx context.Context) error {
opts := rpcclient.Options{DialTimeout: p.opts.DialTimeout}

for i, ep := range p.endpoints {
neoClient, err := neoGoClient(ctx, ep, opts)
if err != nil {
return err
}

p.clients[i] = neoClient
}

return nil
}

func (p *Pool) recheck(ctx context.Context) {
p.mu.Lock()
defer p.mu.Unlock()

for i := 0; i < len(p.endpoints); i++ {
cl := p.clients[i]

if _, err := cl.GetBlockCount(); err != nil {
p.clients[i], err = neoGoClient(ctx, p.endpoints[i], p.opts)
if err != nil {
log.Printf("reconnect to Neo node %s failed: %v", cl.Endpoint(), err)
}

continue
}
}
}

func (p *Pool) isCurrentHealthy() bool {
Expand Down Expand Up @@ -192,13 +245,13 @@ func (p *Pool) GetCommittee() (keys.PublicKeys, error) {
func (p *Pool) conn() *rpcclient.Client {
p.mu.RLock()
defer p.mu.RUnlock()
return p.rpc
return p.clients[p.current]
}

func (p *Pool) invokerConn() *invoker.Invoker {
p.mu.RLock()
defer p.mu.RUnlock()
return p.invoker
return invoker.New(p.clients[p.current], nil)
}

func (p *Pool) establishNewConnection() error {
Expand All @@ -208,8 +261,8 @@ func (p *Pool) establishNewConnection() error {

for i := p.next; i < p.next+len(p.endpoints); i++ {
index := i % len(p.endpoints)
if p.rpc, err = neoGoClient(p.ctx, p.endpoints[index], p.opts); err == nil {
p.invoker = invoker.New(p.rpc, nil)
if p.clients[p.current], err = neoGoClient(p.ctx, p.endpoints[index], p.opts); err == nil {
p.current = index % len(p.endpoints)
p.next = (index + 1) % len(p.endpoints)
return nil
}
Expand All @@ -221,12 +274,12 @@ func (p *Pool) establishNewConnection() error {
func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) {
cli, err := rpcclient.New(ctx, endpoint, opts)
if err != nil {
return nil, fmt.Errorf("can't create neo-go client: %w", err)
return nil, fmt.Errorf("create Neo RPC client: %w", err)
}

err = cli.Init()
if err != nil {
return nil, fmt.Errorf("can't init neo-go client: %w", err)
return nil, fmt.Errorf("init Neo RPC client: %w", err)
}

return cli, nil
Expand All @@ -248,3 +301,81 @@ func (p *Pool) ResolveContract(contractName string) (util.Uint160, error) {

return addr, nil
}

func (p *Pool) FetchHeight() []monitor.HeightData {
p.mu.RLock()
defer p.mu.RUnlock()

var (
heights []monitor.HeightData
wg sync.WaitGroup
heightChan = make(chan monitor.HeightData, len(p.clients))
)

for _, cl := range p.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateHeight()
if err != nil {
log.Printf("read state height of Neo node %s: %v", cl.Endpoint(), err)
return
}

heightChan <- monitor.HeightData{
Host: cl.Endpoint(),
Value: stHeight.Local,
}
}(cl)
}

wg.Wait()
close(heightChan)

for height := range heightChan {
heights = append(heights, height)
}

return heights
}

func (p *Pool) FetchState(height uint32) []monitor.StateData {
p.mu.RLock()
defer p.mu.RUnlock()

var (
states []monitor.StateData
wg sync.WaitGroup
stateChan = make(chan monitor.StateData, len(p.clients))
)

for _, cl := range p.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateRootByHeight(height)
if err != nil {
log.Printf("read state root at height #%d from Neo node %s: %v", height, cl.Endpoint(), err)
return
}

stateChan <- monitor.StateData{
Host: cl.Endpoint(),
Value: stHeight.Hash().String(),
}
}(cl)
}

wg.Wait()
close(stateChan)

for st := range stateChan {
states = append(states, st)
}

return states
}
Loading