Skip to content

Commit

Permalink
Merge pull request #96 from nspcc-dev/95-bring-state-height-and-root-…
Browse files Browse the repository at this point in the history
…metrics

Collect height and state data from chain
  • Loading branch information
roman-khimov authored Nov 14, 2023
2 parents 2b03087 + ae56655 commit bfb2f33
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/neofs-net-monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain"
"github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain/contracts"
"github.com/nspcc-dev/neofs-net-monitor/pkg/multinodepool"
"github.com/nspcc-dev/neofs-net-monitor/pkg/pool"
"github.com/spf13/viper"
"go.uber.org/zap"
Expand Down Expand Up @@ -123,6 +124,11 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) {
logger.Info("neofs contract ignored")
}

mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval))
if err = mnPool.Dial(ctx); err != nil {
return nil, fmt.Errorf("multinodepool: %w", err)
}

return monitor.New(monitor.Args{
Balance: balance,
Proxy: proxy,
Expand All @@ -136,5 +142,7 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) {
SideBlFetcher: sideBalanceFetcher,
MainBlFetcher: mainBalanceFetcher,
CnrFetcher: cnrFetcher,
HeightFetcher: mnPool,
StateFetcher: mnPool,
}), nil
}
22 changes: 22 additions & 0 deletions pkg/monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,26 @@ var (
Help: "Number of available containers",
},
)

chainHeight = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "neofs_net_monitor",
Name: "chain_height",
Help: "Chain height in blocks",
},
[]string{
"host",
},
)

chainState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "neofs_net_monitor",
Name: "chain_state",
Help: "Chain state hash in specific height",
},
[]string{
"host", "hash",
},
)
)
59 changes: 59 additions & 0 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ type (
Total() (int64, error)
}

HeightFetcher interface {
FetchHeight() []HeightData
}

StateFetcher interface {
FetchState(height uint32) []StateData
}

HeightData struct {
Host string
Value uint32
}

StateData struct {
Host string
Value string
}

Node struct {
ID uint64
Address string
Expand Down Expand Up @@ -75,6 +93,8 @@ type (
sideBlFetcher BalanceFetcher
mainBlFetcher BalanceFetcher
cnrFetcher ContainerFetcher
heightFetcher HeightFetcher
stateFetcher StateFetcher
}

Args struct {
Expand All @@ -90,6 +110,8 @@ type (
SideBlFetcher BalanceFetcher
MainBlFetcher BalanceFetcher
CnrFetcher ContainerFetcher
HeightFetcher HeightFetcher
StateFetcher StateFetcher
}
)

Expand All @@ -110,6 +132,8 @@ func New(p Args) *Monitor {
sideBlFetcher: p.SideBlFetcher,
mainBlFetcher: p.MainBlFetcher,
cnrFetcher: p.CnrFetcher,
heightFetcher: p.HeightFetcher,
stateFetcher: p.StateFetcher,
}
}

Expand All @@ -130,6 +154,8 @@ func (m *Monitor) Start(ctx context.Context) {
prometheus.MustRegister(alphabetMainDivergence)
prometheus.MustRegister(alphabetSideDivergence)
prometheus.MustRegister(containersNumber)
prometheus.MustRegister(chainHeight)
prometheus.MustRegister(chainState)

go func() {
err := m.metricsServer.ListenAndServe()
Expand Down Expand Up @@ -195,6 +221,9 @@ func (m *Monitor) Job(ctx context.Context) {

m.processContainersNumber()

minHeight := m.processChainHeight()
m.processChainState(minHeight)

select {
case <-time.After(m.sleep):
// sleep for some time before next prometheus update
Expand Down Expand Up @@ -498,3 +527,33 @@ func (m *Monitor) processContainersNumber() {

containersNumber.Set(float64(total))
}

func (m *Monitor) processChainHeight() uint32 {
var minHeight uint32
heightData := m.heightFetcher.FetchHeight()

for _, d := range heightData {
chainHeight.WithLabelValues(d.Host).Set(float64(d.Value))

if minHeight == 0 || d.Value < minHeight {
minHeight = d.Value
}
}

return minHeight
}

func (m *Monitor) processChainState(height uint32) {
if height == 0 {
return
}

stateData := m.stateFetcher.FetchState(height)
chainState.Reset()

h := float64(height)

for _, d := range stateData {
chainState.WithLabelValues(d.Host, d.Value).Set(h)
}
}
127 changes: 127 additions & 0 deletions pkg/multinodepool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package multinodepool

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neofs-net-monitor/pkg/monitor"
)

// Pool collects data from each node.
type Pool struct {
endpoints []string
dialTimeout time.Duration
clients []*rpcclient.Client
}

func NewPool(endpoints []string, dialTimeout time.Duration) *Pool {
return &Pool{
endpoints: endpoints,
dialTimeout: dialTimeout,
clients: make([]*rpcclient.Client, len(endpoints)),
}
}

func (c *Pool) Dial(ctx context.Context) error {
opts := rpcclient.Options{DialTimeout: c.dialTimeout}

for i, ep := range c.endpoints {
neoClient, err := neoGoClient(ctx, ep, opts)
if err != nil {
return fmt.Errorf("neoGoClient: %w", err)
}

c.clients[i] = neoClient
}

return nil
}

func (c *Pool) FetchHeight() []monitor.HeightData {
var (
heights []monitor.HeightData
wg sync.WaitGroup
heightChan = make(chan monitor.HeightData, len(c.clients))
)

for _, cl := range c.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateHeight()
if err != nil {
log.Printf("GetStateHeight for %s: %v", cl.Endpoint(), err)
return
}

heightChan <- monitor.HeightData{
Host: cl.Endpoint(),
Value: stHeight.Local,
}
}(cl)
}

go func() {
wg.Wait()
close(heightChan)
}()

for height := range heightChan {
heights = append(heights, height)
}

return heights
}

func (c *Pool) FetchState(height uint32) []monitor.StateData {
var (
states []monitor.StateData
wg sync.WaitGroup
stateChan = make(chan monitor.StateData, len(c.clients))
)

for _, cl := range c.clients {
wg.Add(1)

go func(cl *rpcclient.Client) {
defer wg.Done()

stHeight, err := cl.GetStateRootByHeight(height)
if err != nil {
log.Printf("GetStateRootByHeight for %s: %v", cl.Endpoint(), err)
return
}

stateChan <- monitor.StateData{
Host: cl.Endpoint(),
Value: stHeight.Hash().String(),
}
}(cl)
}

go func() {
wg.Wait()
close(stateChan)
}()

for state := range stateChan {
states = append(states, state)
}

return states
}

func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) {
cli, err := rpcclient.New(ctx, endpoint, opts)
if err != nil {
return nil, fmt.Errorf("can't create neo-go client: %w", err)
}

return cli, nil
}

0 comments on commit bfb2f33

Please sign in to comment.