Skip to content

Commit

Permalink
*: Collect height and state data from chain
Browse files Browse the repository at this point in the history
Closes #95.

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Oct 23, 2023
1 parent fc0b6c1 commit 39f6a54
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 5 deletions.
8 changes: 8 additions & 0 deletions cmd/neofs-net-monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain/contracts"
"github.com/nspcc-dev/neofs-net-monitor/pkg/multinodepool"
"github.com/nspcc-dev/neofs-net-monitor/pkg/pool"
"github.com/spf13/viper"
"go.uber.org/zap"
Expand Down Expand Up @@ -130,6 +131,11 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) {
},
)

mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval))
if err = mnPool.Dial(ctx); err != nil {
return nil, fmt.Errorf("mnPool: %w", err)
}

return monitor.New(monitor.Args{
Balance: balance,
Proxy: proxy,
Expand All @@ -144,5 +150,7 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) {
SideBlFetcher: sideBalanceFetcher,
MainBlFetcher: mainBalanceFetcher,
CnrFetcher: cnrFetcher,
HeightFetcher: mnPool,
StateFetcher: mnPool,
}), nil
}
22 changes: 22 additions & 0 deletions pkg/monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,26 @@ var (
Help: "Number of available containers",
},
)

chainHeight = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "neofs_net_monitor",
Name: "chain_height",
Help: "Chain height in blocks",
},
[]string{
"host",
},
)

chainState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "neofs_net_monitor",
Name: "chain_state",
Help: "Chain state hash in specific height",
},
[]string{
"host", "hash",
},
)
)
59 changes: 59 additions & 0 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ type (
Total() (int64, error)
}

HeightFetcher interface {
FetchHeight() []HeightData
}

StateFetcher interface {
FetchState(height uint32) []StateData
}

HeightData struct {
Host string
Value uint32
}

StateData struct {
Host string
Value string
}

Node struct {
ID uint64
Address string
Expand Down Expand Up @@ -76,6 +94,8 @@ type (
sideBlFetcher BalanceFetcher
mainBlFetcher BalanceFetcher
cnrFetcher ContainerFetcher
heightFetcher HeightFetcher
stateFetcher StateFetcher
}

Args struct {
Expand All @@ -92,6 +112,8 @@ type (
SideBlFetcher BalanceFetcher
MainBlFetcher BalanceFetcher
CnrFetcher ContainerFetcher
HeightFetcher HeightFetcher
StateFetcher StateFetcher
}
)

Expand All @@ -113,6 +135,8 @@ func New(p Args) *Monitor {
sideBlFetcher: p.SideBlFetcher,
mainBlFetcher: p.MainBlFetcher,
cnrFetcher: p.CnrFetcher,
heightFetcher: p.HeightFetcher,
stateFetcher: p.StateFetcher,
}
}

Expand All @@ -133,6 +157,8 @@ func (m *Monitor) Start(ctx context.Context) {
prometheus.MustRegister(alphabetMainDivergence)
prometheus.MustRegister(alphabetSideDivergence)
prometheus.MustRegister(containersNumber)
prometheus.MustRegister(chainHeight)
prometheus.MustRegister(chainState)

if err := m.geoFetcher.Open(); err != nil {
m.logger.Warn("geoposition fetching disabled", zap.Error(err))
Expand Down Expand Up @@ -204,6 +230,9 @@ func (m *Monitor) Job(ctx context.Context) {

m.processContainersNumber()

minHeight := m.processChainHeight()
m.processChainState(minHeight)

select {
case <-time.After(m.sleep):
// sleep for some time before next prometheus update
Expand Down Expand Up @@ -507,3 +536,33 @@ func (m *Monitor) processContainersNumber() {

containersNumber.Set(float64(total))
}

func (m *Monitor) processChainHeight() uint32 {
var minHeight uint32
heightData := m.heightFetcher.FetchHeight()

for _, d := range heightData {
chainHeight.WithLabelValues(d.Host).Set(float64(d.Value))

if minHeight == 0 || d.Value < minHeight {
minHeight = d.Value
}
}

return minHeight
}

func (m *Monitor) processChainState(height uint32) {
if height == 0 {
return
}

stateData := m.stateFetcher.FetchState(height)
chainState.Reset()

h := float64(height)

for _, d := range stateData {
chainState.WithLabelValues(d.Host, d.Value).Set(h)
}
}
126 changes: 126 additions & 0 deletions pkg/multinodepool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package multinodepool

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
)

// Pool collects data from each node.
type Pool struct {
endpoints []string
dialTimeout time.Duration
clients []*rpcclient.Client
}

func NewPool(endpoints []string, dialTimeout time.Duration) *Pool {
return &Pool{
endpoints: endpoints,
dialTimeout: dialTimeout,
clients: make([]*rpcclient.Client, len(endpoints)),
}
}

func (c *Pool) Dial(ctx context.Context) error {
opts := rpcclient.Options{DialTimeout: c.dialTimeout}

for i, ep := range c.endpoints {
neoClient, err := neoGoClient(ctx, ep, opts)
if err != nil {
return fmt.Errorf("neoGoClient: %w", err)
}

c.clients[i] = neoClient
}

return nil
}

func (c *Pool) FetchHeight() []monitor.HeightData {
wg := sync.WaitGroup{}
wg.Add(len(c.clients))

mu := sync.Mutex{}
var heights []monitor.HeightData

for _, cl := range c.clients {
go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateHeight()
if err != nil {
log.Printf("GetStateHeight for %s: %v", cl.Endpoint(), err)
return
}

mu.Lock()
heights = append(heights, monitor.HeightData{
Host: cl.Endpoint(),
Value: stHeight.Local,
})
mu.Unlock()
}(cl)
}

wg.Wait()

return heights
}

func (c *Pool) FetchState(height uint32) []monitor.StateData {
var (
states []monitor.StateData
wg sync.WaitGroup
)

stateChan := make(chan monitor.StateData, len(c.clients))

for _, cl := range c.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateRootByHeight(height)
if err != nil {
log.Printf("GetStateRootByHeight for %s: %v", cl.Endpoint(), err)
return
}

stateChan <- monitor.StateData{
Host: cl.Endpoint(),
Value: stHeight.Hash().String(),
}
}(cl)
}

go func() {
wg.Wait()
close(stateChan)
}()

for state := range stateChan {
states = append(states, state)
}

return states
}

func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) {
cli, err := rpcclient.New(ctx, endpoint, opts)
if err != nil {
return nil, fmt.Errorf("can't create neo-go client: %w", err)
}

err = cli.Init()
if err != nil {
return nil, fmt.Errorf("can't init neo-go client: %w", err)
}

return cli, nil
}
5 changes: 0 additions & 5 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,6 @@ func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (
return nil, fmt.Errorf("can't create neo-go client: %w", err)
}

err = cli.Init()
if err != nil {
return nil, fmt.Errorf("can't init neo-go client: %w", err)
}

return cli, nil
}

Expand Down

0 comments on commit 39f6a54

Please sign in to comment.