From 5c36a7662ca90de5d3ba6c412e8a6d67d837cd52 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Thu, 21 Dec 2023 15:01:33 +0400 Subject: [PATCH 1/7] *: Derive metric registration from monitor Signed-off-by: Evgenii Baidakov --- cmd/neofs-net-monitor/main.go | 3 +++ pkg/monitor/metrics.go | 22 ++++++++++++++++++++++ pkg/monitor/monitor.go | 19 ------------------- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/cmd/neofs-net-monitor/main.go b/cmd/neofs-net-monitor/main.go index fcf3996..7295969 100644 --- a/cmd/neofs-net-monitor/main.go +++ b/cmd/neofs-net-monitor/main.go @@ -10,6 +10,7 @@ import ( "strings" "syscall" + "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" "github.com/spf13/viper" "go.uber.org/zap" ) @@ -44,6 +45,8 @@ func main() { os.Exit(1) } + monitor.RegisterMetrics() + neofsMonitor.Start(ctx) neofsMonitor.Logger().Info("application started", zap.String("version", Version)) diff --git a/pkg/monitor/metrics.go b/pkg/monitor/metrics.go index a6b73ac..adc86bb 100644 --- a/pkg/monitor/metrics.go +++ b/pkg/monitor/metrics.go @@ -190,3 +190,25 @@ var ( }, ) ) + +// RegisterMetrics inits prometheus metrics. Panics if can't do it. +func RegisterMetrics() { + prometheus.MustRegister(locationPresent) + prometheus.MustRegister(droppedNodesCount) + prometheus.MustRegister(newNodesCount) + prometheus.MustRegister(epochNumber) + prometheus.MustRegister(storageNodeGASBalances) + prometheus.MustRegister(storageNodeNotaryBalances) + prometheus.MustRegister(innerRingBalances) + prometheus.MustRegister(alphabetGASBalances) + prometheus.MustRegister(alphabetNotaryBalances) + prometheus.MustRegister(proxyBalance) + prometheus.MustRegister(mainChainSupply) + prometheus.MustRegister(sideChainSupply) + prometheus.MustRegister(alphabetDivergence) + prometheus.MustRegister(alphabetMainDivergence) + prometheus.MustRegister(alphabetSideDivergence) + prometheus.MustRegister(containersNumber) + prometheus.MustRegister(chainHeight) + prometheus.MustRegister(chainState) +} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 43131bd..9c8262b 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -138,25 +138,6 @@ func New(p Args) *Monitor { } func (m *Monitor) Start(ctx context.Context) { - prometheus.MustRegister(locationPresent) - prometheus.MustRegister(droppedNodesCount) - prometheus.MustRegister(newNodesCount) - prometheus.MustRegister(epochNumber) - prometheus.MustRegister(storageNodeGASBalances) - prometheus.MustRegister(storageNodeNotaryBalances) - prometheus.MustRegister(innerRingBalances) - prometheus.MustRegister(alphabetGASBalances) - prometheus.MustRegister(alphabetNotaryBalances) - prometheus.MustRegister(proxyBalance) - prometheus.MustRegister(mainChainSupply) - prometheus.MustRegister(sideChainSupply) - prometheus.MustRegister(alphabetDivergence) - prometheus.MustRegister(alphabetMainDivergence) - prometheus.MustRegister(alphabetSideDivergence) - prometheus.MustRegister(containersNumber) - prometheus.MustRegister(chainHeight) - prometheus.MustRegister(chainState) - go func() { err := m.metricsServer.ListenAndServe() if !errors.Is(err, http.ErrServerClosed) { From 568ecee62b82e208cdad2492e06265e7b3ced41f Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Dec 2023 09:24:34 +0400 Subject: [PATCH 2/7] *: Separate main and side chain alphabet fetchers Signed-off-by: Evgenii Baidakov --- cmd/neofs-net-monitor/monitor.go | 12 +++---- pkg/monitor/monitor.go | 50 +++++++++++++------------- pkg/morphchain/alphabet.go | 52 ---------------------------- pkg/morphchain/mainchain_alphabet.go | 35 +++++++++++++++++++ pkg/morphchain/sidechain_alphabet.go | 26 ++++++++++++++ 5 files changed, 91 insertions(+), 84 deletions(-) delete mode 100644 pkg/morphchain/alphabet.go create mode 100644 pkg/morphchain/mainchain_alphabet.go create mode 100644 pkg/morphchain/sidechain_alphabet.go diff --git a/cmd/neofs-net-monitor/monitor.go b/cmd/neofs-net-monitor/monitor.go index d53f988..95adbab 100644 --- a/cmd/neofs-net-monitor/monitor.go +++ b/cmd/neofs-net-monitor/monitor.go @@ -73,13 +73,8 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { return nil, fmt.Errorf("can't initialize container fetcher: %w", err) } - alphabetFetcher, err := morphchain.NewAlphabetFetcher(morphchain.AlphabetFetcherArgs{ - Committeer: sideNeogoClient, - Designater: mainNeogoClient, - }) - if err != nil { - return nil, fmt.Errorf("can't initialize alphabet fetcher: %w", err) - } + mainAlphabetFetcher := morphchain.NewMainChainAlphabetFetcher(mainNeogoClient) + sideAlphabetFetcher := morphchain.NewSideChainAlphabetFetcher(sideNeogoClient) sideBalanceFetcher, err := morphchain.NewBalanceFetcher(morphchain.BalanceFetcherArgs{ Cli: sideNeogoClient, @@ -136,7 +131,8 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { Logger: logger, Sleep: cfg.GetDuration(cfgMetricsInterval), MetricsAddress: cfg.GetString(cfgMetricsEndpoint), - AlpFetcher: alphabetFetcher, + MainAlpFetcher: mainAlphabetFetcher, + SideAlpFetcher: sideAlphabetFetcher, NmFetcher: nmFetcher, IRFetcher: nmFetcher, SideBlFetcher: sideBalanceFetcher, diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 9c8262b..b622de0 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -36,8 +36,7 @@ type ( } AlphabetFetcher interface { - FetchSideAlphabet() (keys.PublicKeys, error) - FetchMainAlphabet() (keys.PublicKeys, error) + FetchAlphabet() (keys.PublicKeys, error) } ContainerFetcher interface { @@ -84,17 +83,18 @@ type ( proxy *util.Uint160 neofs *util.Uint160 - logger *zap.Logger - sleep time.Duration - metricsServer http.Server - alpFetcher AlphabetFetcher - nmFetcher NetmapFetcher - irFetcher InnerRingFetcher - sideBlFetcher BalanceFetcher - mainBlFetcher BalanceFetcher - cnrFetcher ContainerFetcher - heightFetcher HeightFetcher - stateFetcher StateFetcher + logger *zap.Logger + sleep time.Duration + metricsServer http.Server + mainAlpFetcher AlphabetFetcher + sideAlpFetcher AlphabetFetcher + nmFetcher NetmapFetcher + irFetcher InnerRingFetcher + sideBlFetcher BalanceFetcher + mainBlFetcher BalanceFetcher + cnrFetcher ContainerFetcher + heightFetcher HeightFetcher + stateFetcher StateFetcher } Args struct { @@ -104,7 +104,8 @@ type ( Logger *zap.Logger Sleep time.Duration MetricsAddress string - AlpFetcher AlphabetFetcher + MainAlpFetcher AlphabetFetcher + SideAlpFetcher AlphabetFetcher NmFetcher NetmapFetcher IRFetcher InnerRingFetcher SideBlFetcher BalanceFetcher @@ -126,14 +127,15 @@ func New(p Args) *Monitor { Addr: p.MetricsAddress, Handler: promhttp.Handler(), }, - alpFetcher: p.AlpFetcher, - nmFetcher: p.NmFetcher, - irFetcher: p.IRFetcher, - sideBlFetcher: p.SideBlFetcher, - mainBlFetcher: p.MainBlFetcher, - cnrFetcher: p.CnrFetcher, - heightFetcher: p.HeightFetcher, - stateFetcher: p.StateFetcher, + mainAlpFetcher: p.MainAlpFetcher, + sideAlpFetcher: p.SideAlpFetcher, + nmFetcher: p.NmFetcher, + irFetcher: p.IRFetcher, + sideBlFetcher: p.SideBlFetcher, + mainBlFetcher: p.MainBlFetcher, + cnrFetcher: p.CnrFetcher, + heightFetcher: p.HeightFetcher, + stateFetcher: p.StateFetcher, } } @@ -188,12 +190,12 @@ func (m *Monitor) Job(ctx context.Context) { m.processMainChainSupply() } - if sideAlphabet, err := m.alpFetcher.FetchSideAlphabet(); err != nil { + if sideAlphabet, err := m.sideAlpFetcher.FetchAlphabet(); err != nil { m.logger.Warn("can't scrap side alphabet info", zap.Error(err)) } else { m.processAlphabet(sideAlphabet) - if mainAlphabet, err := m.alpFetcher.FetchMainAlphabet(); err != nil { + if mainAlphabet, err := m.mainAlpFetcher.FetchAlphabet(); err != nil { m.logger.Warn("can't scrap main alphabet info", zap.Error(err)) } else { m.processAlphabetDivergence(mainAlphabet, sideAlphabet) diff --git a/pkg/morphchain/alphabet.go b/pkg/morphchain/alphabet.go deleted file mode 100644 index 00e54d4..0000000 --- a/pkg/morphchain/alphabet.go +++ /dev/null @@ -1,52 +0,0 @@ -package morphchain - -import ( - "fmt" - - "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" -) - -// Committeer must provide side chain committee -// public keys. -type Committeer interface { - GetCommittee() (keys.PublicKeys, error) -} - -// Designater must provide main chain alphabet -// public keys. -type Designater interface { - GetBlockCount() (uint32, error) - GetDesignatedByRole(noderoles.Role, uint32) (keys.PublicKeys, error) -} - -type ( - AlphabetFetcher struct { - c Committeer - d Designater - } - - AlphabetFetcherArgs struct { - Committeer Committeer - Designater Designater - } -) - -func NewAlphabetFetcher(p AlphabetFetcherArgs) (*AlphabetFetcher, error) { - return &AlphabetFetcher{ - c: p.Committeer, - d: p.Designater, - }, nil -} - -func (a AlphabetFetcher) FetchSideAlphabet() (keys.PublicKeys, error) { - return a.c.GetCommittee() -} - -func (a AlphabetFetcher) FetchMainAlphabet() (keys.PublicKeys, error) { - height, err := a.d.GetBlockCount() - if err != nil { - return nil, fmt.Errorf("can't get block height: %w", err) - } - return a.d.GetDesignatedByRole(noderoles.NeoFSAlphabet, height) -} diff --git a/pkg/morphchain/mainchain_alphabet.go b/pkg/morphchain/mainchain_alphabet.go new file mode 100644 index 0000000..75da011 --- /dev/null +++ b/pkg/morphchain/mainchain_alphabet.go @@ -0,0 +1,35 @@ +package morphchain + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" +) + +// Designater must provide main chain alphabet public keys. +type Designater interface { + GetBlockCount() (uint32, error) + GetDesignatedByRole(noderoles.Role, uint32) (keys.PublicKeys, error) +} + +type ( + MainChainAlphabetFetcher struct { + designater Designater + } +) + +func NewMainChainAlphabetFetcher(designater Designater) *MainChainAlphabetFetcher { + return &MainChainAlphabetFetcher{ + designater: designater, + } +} + +func (a MainChainAlphabetFetcher) FetchAlphabet() (keys.PublicKeys, error) { + height, err := a.designater.GetBlockCount() + if err != nil { + return nil, fmt.Errorf("can't get chain height: %w", err) + } + + return a.designater.GetDesignatedByRole(noderoles.NeoFSAlphabet, height) +} diff --git a/pkg/morphchain/sidechain_alphabet.go b/pkg/morphchain/sidechain_alphabet.go new file mode 100644 index 0000000..def69b2 --- /dev/null +++ b/pkg/morphchain/sidechain_alphabet.go @@ -0,0 +1,26 @@ +package morphchain + +import ( + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" +) + +// Committeer must provide side chain committee public keys. +type Committeer interface { + GetCommittee() (keys.PublicKeys, error) +} + +type ( + SideChainAlphabetFetcher struct { + committeer Committeer + } +) + +func NewSideChainAlphabetFetcher(committeer Committeer) *SideChainAlphabetFetcher { + return &SideChainAlphabetFetcher{ + committeer: committeer, + } +} + +func (a SideChainAlphabetFetcher) FetchAlphabet() (keys.PublicKeys, error) { + return a.committeer.GetCommittee() +} From 483b2f305548deb067c8726e2efc391c878139a6 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Dec 2023 12:24:45 +0400 Subject: [PATCH 3/7] *: Separate main and side chain processing functions Also removing divergence calculations metric. The divergence should be calculated outside net-monitor by using data from main and side chain net-monitors. Signed-off-by: Evgenii Baidakov --- pkg/monitor/metrics.go | 32 +------- pkg/monitor/monitor.go | 154 ++++++++++++++---------------------- pkg/monitor/monitor_test.go | 73 ----------------- 3 files changed, 63 insertions(+), 196 deletions(-) diff --git a/pkg/monitor/metrics.go b/pkg/monitor/metrics.go index adc86bb..5b0d8aa 100644 --- a/pkg/monitor/metrics.go +++ b/pkg/monitor/metrics.go @@ -127,33 +127,11 @@ var ( }, ) - alphabetDivergence = prometheus.NewGaugeVec( + alphabetPubKeys = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: "neofs_net_monitor", - Name: "alphabet_divergence_count", - Help: "Number of unique alphabet keys in main chain and side chain", - }, - []string{ - "chain", - }, - ) - - alphabetMainDivergence = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: "neofs_net_monitor", - Name: "alphabet_main_divergence", - Help: "Alphabet keys divergence in main chain", - }, - []string{ - "key", - }, - ) - - alphabetSideDivergence = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: "neofs_net_monitor", - Name: "alphabet_side_divergence", - Help: "Alphabet keys divergence in side chain", + Name: "alphabet_public_key", + Help: "Alphabet public keys in chain", }, []string{ "key", @@ -205,9 +183,7 @@ func RegisterMetrics() { prometheus.MustRegister(proxyBalance) prometheus.MustRegister(mainChainSupply) prometheus.MustRegister(sideChainSupply) - prometheus.MustRegister(alphabetDivergence) - prometheus.MustRegister(alphabetMainDivergence) - prometheus.MustRegister(alphabetSideDivergence) + prometheus.MustRegister(alphabetPubKeys) prometheus.MustRegister(containersNumber) prometheus.MustRegister(chainHeight) prometheus.MustRegister(chainState) diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index b622de0..4aceef7 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -159,62 +159,68 @@ func (m *Monitor) Stop() { func (m *Monitor) Job(ctx context.Context) { for { - m.logger.Debug("scraping data from side chain") + m.processSideChain() + m.processMainChain() - netmap, err := m.nmFetcher.FetchNetmap() - if err != nil { - m.logger.Warn("can't scrap network map info", zap.Error(err)) - } else { - candidatesNetmap, err := m.nmFetcher.FetchCandidates() - if err != nil { - m.logger.Warn("can't scrap network map candidates info", zap.Error(err)) - } else { - m.processNetworkMap(netmap, candidatesNetmap) - } - } - - innerRing, err := m.irFetcher.FetchInnerRingKeys() - if err != nil { - m.logger.Warn("can't scrap inner ring info", zap.Error(err)) - } else { - m.processInnerRing(innerRing) + select { + case <-time.After(m.sleep): + // sleep for some time before next prometheus update + case <-ctx.Done(): + m.logger.Info("context closed, stop monitor") + return } + } +} - if m.proxy != nil { - m.processProxyContract() - } +func (m *Monitor) processMainChain() { + if mainAlphabet, err := m.mainAlpFetcher.FetchAlphabet(); err != nil { + m.logger.Warn("can't scrap main alphabet info", zap.Error(err)) + } else { + m.processAlphabetPublicKeys(mainAlphabet) + } - m.processSideChainSupply() + m.processMainChainSupply() +} - if m.neofs != nil { - m.processMainChainSupply() - } +func (m *Monitor) processSideChain() { + m.logger.Debug("scraping data from side chain") - if sideAlphabet, err := m.sideAlpFetcher.FetchAlphabet(); err != nil { - m.logger.Warn("can't scrap side alphabet info", zap.Error(err)) + netmap, err := m.nmFetcher.FetchNetmap() + if err != nil { + m.logger.Warn("can't scrap network map info", zap.Error(err)) + } else { + candidatesNetmap, err := m.nmFetcher.FetchCandidates() + if err != nil { + m.logger.Warn("can't scrap network map candidates info", zap.Error(err)) } else { - m.processAlphabet(sideAlphabet) - - if mainAlphabet, err := m.mainAlpFetcher.FetchAlphabet(); err != nil { - m.logger.Warn("can't scrap main alphabet info", zap.Error(err)) - } else { - m.processAlphabetDivergence(mainAlphabet, sideAlphabet) - } + m.processNetworkMap(netmap, candidatesNetmap) } + } - m.processContainersNumber() + innerRing, err := m.irFetcher.FetchInnerRingKeys() + if err != nil { + m.logger.Warn("can't scrap inner ring info", zap.Error(err)) + } else { + m.processInnerRing(innerRing) + } - minHeight := m.processChainHeight() - m.processChainState(minHeight) + if m.proxy != nil { + m.processProxyContract() + } - select { - case <-time.After(m.sleep): - // sleep for some time before next prometheus update - case <-ctx.Done(): - m.logger.Info("context closed, stop monitor") - return - } + m.processSideChainSupply() + + if alphabet, err := m.sideAlpFetcher.FetchAlphabet(); err != nil { + m.logger.Warn("can't scrap side alphabet info", zap.Error(err)) + } else { + m.processAlphabetPublicKeys(alphabet) + m.processAlphabet(alphabet) } + + m.processContainersNumber() + + minHeight := m.processChainHeight() + m.processChainState(minHeight) } func (m *Monitor) Logger() *zap.Logger { @@ -416,59 +422,13 @@ func (m *Monitor) processAlphabet(alphabet keys.PublicKeys) { } } -const ( - mainChainDivergenceLabel = "main" - sideChainDivergenceLabel = "side" -) - -func (m *Monitor) processAlphabetDivergence(mainAlphabet, sideAlphabet keys.PublicKeys) { - sortedMain := sortedAlphabet(mainAlphabet) - sortedSide := sortedAlphabet(sideAlphabet) - - uniqueMain, uniqueSide := computeUniqueAlphabets(sortedMain, sortedSide) - - alphabetDivergence.Reset() - alphabetDivergence.WithLabelValues(mainChainDivergenceLabel).Set(float64(len(uniqueMain))) - alphabetDivergence.WithLabelValues(sideChainDivergenceLabel).Set(float64(len(uniqueSide))) - - alphabetMainDivergence.Reset() - for _, key := range uniqueMain { - alphabetMainDivergence.WithLabelValues(key).Set(1) - } - alphabetSideDivergence.Reset() - for _, key := range uniqueSide { - alphabetSideDivergence.WithLabelValues(key).Set(1) - } -} - -func computeUniqueAlphabets(sortedMain, sortedSide []string) ([]string, []string) { - var uniqueMain, uniqueSide []string - - i, j := 0, 0 - len1, len2 := len(sortedMain), len(sortedSide) - for i < len1 && j < len2 { - if sortedMain[i] == sortedSide[j] { - i++ - j++ - continue - } - - if sortedMain[i] < sortedSide[j] { - uniqueMain = append(uniqueMain, sortedMain[i]) - i++ - } else { - uniqueSide = append(uniqueSide, sortedSide[j]) - j++ - } - } +func (m *Monitor) processAlphabetPublicKeys(alphabet keys.PublicKeys) { + sorted := sortedAlphabet(alphabet) - if i == len1 { - uniqueSide = append(uniqueSide, sortedSide[j:]...) - } else if j == len2 { - uniqueMain = append(uniqueMain, sortedMain[i:]...) + alphabetPubKeys.Reset() + for _, key := range sorted { + alphabetPubKeys.WithLabelValues(key).Set(1) } - - return uniqueMain, uniqueSide } func sortedAlphabet(alphabet keys.PublicKeys) []string { @@ -482,6 +442,10 @@ func sortedAlphabet(alphabet keys.PublicKeys) []string { } func (m *Monitor) processMainChainSupply() { + if m.neofs == nil { + return + } + balance, err := m.mainBlFetcher.FetchGASByScriptHash(*m.neofs) if err != nil { m.logger.Debug("can't fetch neofs contract balance", zap.Error(err)) diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index b9ac5ab..9d030a5 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -79,76 +79,3 @@ func generateNodes(start, finish int) []*Node { return nodes } - -func TestComputeUniqueAlphabetKeys(t *testing.T) { - tests := []struct { - name string - sortedMainAlphabet []string - sortedSideAlphabet []string - expectedMainUnique []string - expectedSideUnique []string - }{ - { - name: "empty", - }, - { - name: "no unique", - sortedMainAlphabet: []string{"a", "b"}, - sortedSideAlphabet: []string{"a", "b"}, - }, - { - name: "tail unique", - sortedMainAlphabet: []string{"a", "b"}, - sortedSideAlphabet: []string{"a", "b", "c"}, - expectedSideUnique: []string{"c"}, - }, - { - name: "middle both unique", - sortedMainAlphabet: []string{"a", "b", "d"}, - sortedSideAlphabet: []string{"a", "c", "d"}, - expectedMainUnique: []string{"b"}, - expectedSideUnique: []string{"c"}, - }, - { - name: "middle side unique", - sortedMainAlphabet: []string{"a", "b", "d"}, - sortedSideAlphabet: []string{"a", "b", "c", "d"}, - expectedSideUnique: []string{"c"}, - }, - { - name: "middle main unique", - sortedMainAlphabet: []string{"a", "b", "c", "d"}, - sortedSideAlphabet: []string{"a", "c", "d"}, - expectedMainUnique: []string{"b"}, - }, - { - name: "same length tail unique", - sortedMainAlphabet: []string{"a", "b", "c"}, - sortedSideAlphabet: []string{"a", "b", "d"}, - expectedMainUnique: []string{"c"}, - expectedSideUnique: []string{"d"}, - }, - { - name: "same length all unique", - sortedMainAlphabet: []string{"a", "c"}, - sortedSideAlphabet: []string{"b", "d"}, - expectedMainUnique: []string{"a", "c"}, - expectedSideUnique: []string{"b", "d"}, - }, - { - name: "all unique", - sortedMainAlphabet: []string{"e", "f"}, - sortedSideAlphabet: []string{"a", "b", "c", "d"}, - expectedMainUnique: []string{"e", "f"}, - expectedSideUnique: []string{"a", "b", "c", "d"}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - uniqueMain, uniqueSide := computeUniqueAlphabets(test.sortedMainAlphabet, test.sortedSideAlphabet) - require.Equal(t, test.expectedMainUnique, uniqueMain) - require.Equal(t, test.expectedSideUnique, uniqueSide) - }) - } -} From cc5f1a71f644187e477522195a9c187361e7a2e9 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Dec 2023 12:41:21 +0400 Subject: [PATCH 4/7] *: Separate main and side chain balances Signed-off-by: Evgenii Baidakov --- pkg/monitor/monitor.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 4aceef7..62e878a 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -177,6 +177,7 @@ func (m *Monitor) processMainChain() { m.logger.Warn("can't scrap main alphabet info", zap.Error(err)) } else { m.processAlphabetPublicKeys(mainAlphabet) + m.processMainAlphabet(mainAlphabet) } m.processMainChainSupply() @@ -214,7 +215,7 @@ func (m *Monitor) processSideChain() { m.logger.Warn("can't scrap side alphabet info", zap.Error(err)) } else { m.processAlphabetPublicKeys(alphabet) - m.processAlphabet(alphabet) + m.processSideAlphabet(alphabet) } m.processContainersNumber() @@ -389,25 +390,37 @@ func (m *Monitor) processProxyContract() { proxyBalance.Set(float64(balance)) } -func (m *Monitor) processAlphabet(alphabet keys.PublicKeys) { - exportGasBalances := make(map[string]int64, len(alphabet)) +func (m *Monitor) processSideAlphabet(alphabet keys.PublicKeys) { exportNotaryBalances := make(map[string]int64, len(alphabet)) for _, key := range alphabet { keyHex := hex.EncodeToString(key.Bytes()) - balanceGAS, err := m.mainBlFetcher.FetchGAS(*key) + balanceNotary, err := m.sideBlFetcher.FetchNotary(*key) if err != nil { - m.logger.Debug("can't fetch gas balance", zap.String("key", keyHex), zap.Error(err)) + m.logger.Debug("can't fetch notary balance", zap.String("key", keyHex), zap.Error(err)) } else { - exportGasBalances[keyHex] = balanceGAS + exportNotaryBalances[keyHex] = balanceNotary } + } - balanceNotary, err := m.sideBlFetcher.FetchNotary(*key) + alphabetNotaryBalances.Reset() + for k, v := range exportNotaryBalances { + alphabetNotaryBalances.WithLabelValues(k).Set(float64(v)) + } +} + +func (m *Monitor) processMainAlphabet(alphabet keys.PublicKeys) { + exportGasBalances := make(map[string]int64, len(alphabet)) + + for _, key := range alphabet { + keyHex := hex.EncodeToString(key.Bytes()) + + balanceGAS, err := m.mainBlFetcher.FetchGAS(*key) if err != nil { - m.logger.Debug("can't fetch notary balance", zap.String("key", keyHex), zap.Error(err)) + m.logger.Debug("can't fetch gas balance", zap.String("key", keyHex), zap.Error(err)) } else { - exportNotaryBalances[keyHex] = balanceNotary + exportGasBalances[keyHex] = balanceGAS } } @@ -415,11 +428,6 @@ func (m *Monitor) processAlphabet(alphabet keys.PublicKeys) { for k, v := range exportGasBalances { alphabetGASBalances.WithLabelValues(k).Set(float64(v)) } - - alphabetNotaryBalances.Reset() - for k, v := range exportNotaryBalances { - alphabetNotaryBalances.WithLabelValues(k).Set(float64(v)) - } } func (m *Monitor) processAlphabetPublicKeys(alphabet keys.PublicKeys) { From e2a2050f0ec4d6609555bfed47b80286e432c2c5 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Dec 2023 13:43:32 +0400 Subject: [PATCH 5/7] *: Make monitor a single-network tracker Closes #103. Signed-off-by: Evgenii Baidakov --- cmd/neofs-net-monitor/config.go | 12 +- cmd/neofs-net-monitor/monitor.go | 130 +++++---- config/config.yaml | 15 +- pkg/monitor/main_job.go | 79 ++++++ pkg/monitor/monitor.go | 441 ++----------------------------- pkg/monitor/side_job.go | 386 +++++++++++++++++++++++++++ 6 files changed, 563 insertions(+), 500 deletions(-) create mode 100644 pkg/monitor/main_job.go create mode 100644 pkg/monitor/side_job.go diff --git a/cmd/neofs-net-monitor/config.go b/cmd/neofs-net-monitor/config.go index d0abb7b..2f28952 100644 --- a/cmd/neofs-net-monitor/config.go +++ b/cmd/neofs-net-monitor/config.go @@ -15,8 +15,9 @@ const ( cfgNeoFSContract = "contracts.neofs" // neo rpc node related config values - mainPrefix = "mainnet" - sidePrefix = "morph" + prefix = "chain" + + cfgChainFSChain = "chain.fschain" cfgNeoRPCEndpoint = "rpc.endpoint" cfgNeoRPCDialTimeout = "rpc.dial_timeout" @@ -31,11 +32,8 @@ const ( ) func DefaultConfiguration(cfg *viper.Viper) { - cfg.SetDefault(sidePrefix+delimiter+cfgNeoRPCEndpoint, "") - cfg.SetDefault(sidePrefix+delimiter+cfgNeoRPCDialTimeout, 5*time.Second) - - cfg.SetDefault(mainPrefix+delimiter+cfgNeoRPCEndpoint, "") - cfg.SetDefault(mainPrefix+delimiter+cfgNeoRPCDialTimeout, 5*time.Second) + cfg.SetDefault(prefix+delimiter+cfgNeoRPCEndpoint, "") + cfg.SetDefault(prefix+delimiter+cfgNeoRPCDialTimeout, 5*time.Second) cfg.SetDefault(cfgMetricsEndpoint, ":16512") cfg.SetDefault(cfgMetricsInterval, 15*time.Minute) diff --git a/cmd/neofs-net-monitor/monitor.go b/cmd/neofs-net-monitor/monitor.go index 95adbab..a68f55e 100644 --- a/cmd/neofs-net-monitor/monitor.go +++ b/cmd/neofs-net-monitor/monitor.go @@ -23,13 +23,9 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { return nil, err } - sideChainEndpoints := cfg.GetStringSlice(sidePrefix + delimiter + cfgNeoRPCEndpoint) - sideChainTimeout := cfg.GetDuration(sidePrefix + delimiter + cfgNeoRPCDialTimeout) - sideChainRecheck := cfg.GetDuration(sidePrefix + delimiter + cfgNeoRPCRecheckInterval) - - mainChainEndpoints := cfg.GetStringSlice(mainPrefix + delimiter + cfgNeoRPCEndpoint) - mainChainTimeout := cfg.GetDuration(mainPrefix + delimiter + cfgNeoRPCDialTimeout) - mainChainRecheck := cfg.GetDuration(mainPrefix + delimiter + cfgNeoRPCRecheckInterval) + sideChainEndpoints := cfg.GetStringSlice(prefix + delimiter + cfgNeoRPCEndpoint) + sideChainTimeout := cfg.GetDuration(prefix + delimiter + cfgNeoRPCDialTimeout) + sideChainRecheck := cfg.GetDuration(prefix + delimiter + cfgNeoRPCRecheckInterval) sideNeogoClient, err := pool.NewPool(ctx, pool.PrmPool{ Endpoints: sideChainEndpoints, @@ -40,27 +36,70 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { return nil, fmt.Errorf("can't create side chain neo-go client: %w", err) } - mainNeogoClient, err := pool.NewPool(ctx, pool.PrmPool{ - Endpoints: mainChainEndpoints, - DialTimeout: mainChainTimeout, - RecheckInterval: mainChainRecheck, - }) + var job monitor.Job + if cfg.GetBool(cfgChainFSChain) { + job, err = sideChainJob(ctx, cfg, sideNeogoClient, logger, sideChainEndpoints) + } else { + job, err = mainChainJob(cfg, sideNeogoClient, logger) + } + if err != nil { - return nil, fmt.Errorf("can't create main chain neo-go client: %w", err) + return nil, err } - netmapContract, err := sideNeogoClient.ResolveContract(rpcnns.NameNetmap) + return monitor.New( + job, + cfg.GetString(cfgMetricsEndpoint), + cfg.GetDuration(cfgMetricsInterval), + logger, + ), nil +} + +func mainChainJob(cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger) (*monitor.MainJob, error) { + alphabetFetcher := morphchain.NewMainChainAlphabetFetcher(neogoClient) + + balanceFetcher, err := morphchain.NewBalanceFetcher( + morphchain.BalanceFetcherArgs{ + Cli: neogoClient, + }) + if err != nil { + return nil, fmt.Errorf("can't initialize Neo chain balance reader: %w", err) + } + + var neofs *util.Uint160 + + neofsContract := cfg.GetString(cfgNeoFSContract) + if len(neofsContract) != 0 { + sh, err := util.Uint160DecodeStringLE(neofsContract) + if err != nil { + return nil, fmt.Errorf("decode configured NeoFS contract address %q: %w", cfgNeoFSContract, err) + } + neofs = &sh + } else { + logger.Info("NeoFS contract address not configured, continue without it") + } + + return monitor.NewMainJob(monitor.MainJobArgs{ + AlphabetFetcher: alphabetFetcher, + BalanceFetcher: balanceFetcher, + Neofs: neofs, + Logger: logger, + }), nil +} + +func sideChainJob(ctx context.Context, cfg *viper.Viper, neogoClient *pool.Pool, logger *zap.Logger, sideChainEndpoints []string) (*monitor.SideJob, error) { + netmapContract, err := neogoClient.ResolveContract(rpcnns.NameNetmap) if err != nil { return nil, fmt.Errorf("can't read netmap scripthash: %w", err) } - containerContract, err := sideNeogoClient.ResolveContract(rpcnns.NameContainer) + containerContract, err := neogoClient.ResolveContract(rpcnns.NameContainer) if err != nil { return nil, fmt.Errorf("can't read container scripthash: %w", err) } nmFetcher, err := contracts.NewNetmap(contracts.NetmapArgs{ - Pool: sideNeogoClient, + Pool: neogoClient, NetmapContract: netmapContract, Logger: logger, }) @@ -68,77 +107,52 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { return nil, fmt.Errorf("can't initialize netmap fetcher: %w", err) } - cnrFetcher, err := contracts.NewContainer(sideNeogoClient, containerContract) + cnrFetcher, err := contracts.NewContainer(neogoClient, containerContract) if err != nil { return nil, fmt.Errorf("can't initialize container fetcher: %w", err) } - mainAlphabetFetcher := morphchain.NewMainChainAlphabetFetcher(mainNeogoClient) - sideAlphabetFetcher := morphchain.NewSideChainAlphabetFetcher(sideNeogoClient) + alphabetFetcher := morphchain.NewSideChainAlphabetFetcher(neogoClient) - sideBalanceFetcher, err := morphchain.NewBalanceFetcher(morphchain.BalanceFetcherArgs{ - Cli: sideNeogoClient, + balanceFetcher, err := morphchain.NewBalanceFetcher(morphchain.BalanceFetcherArgs{ + Cli: neogoClient, }) if err != nil { return nil, fmt.Errorf("can't initialize side balance fetcher: %w", err) } - mainBalanceFetcher, err := morphchain.NewBalanceFetcher(morphchain.BalanceFetcherArgs{ - Cli: mainNeogoClient, - }) - if err != nil { - return nil, fmt.Errorf("can't initialize main balance fetcher: %w", err) - } - var ( balance util.Uint160 proxy *util.Uint160 - neofs *util.Uint160 ) - balance, err = sideNeogoClient.ResolveContract(rpcnns.NameBalance) + balance, err = neogoClient.ResolveContract(rpcnns.NameBalance) if err != nil { return nil, fmt.Errorf("balance contract is not available: %w", err) } - proxyContract, err := sideNeogoClient.ResolveContract(rpcnns.NameProxy) + proxyContract, err := neogoClient.ResolveContract(rpcnns.NameProxy) if err != nil { logger.Info("proxy disabled") } else { proxy = &proxyContract } - neofsContract := cfg.GetString(cfgNeoFSContract) - if len(neofsContract) != 0 { - sh, err := util.Uint160DecodeStringLE(neofsContract) - if err != nil { - return nil, fmt.Errorf("NNS u160 decode: %w", err) - } - neofs = &sh - } else { - logger.Info("neofs contract ignored") - } - mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval)) if err = mnPool.Dial(ctx); err != nil { return nil, fmt.Errorf("multinodepool: %w", err) } - return monitor.New(monitor.Args{ - Balance: balance, - Proxy: proxy, - Neofs: neofs, - Logger: logger, - Sleep: cfg.GetDuration(cfgMetricsInterval), - MetricsAddress: cfg.GetString(cfgMetricsEndpoint), - MainAlpFetcher: mainAlphabetFetcher, - SideAlpFetcher: sideAlphabetFetcher, - NmFetcher: nmFetcher, - IRFetcher: nmFetcher, - SideBlFetcher: sideBalanceFetcher, - MainBlFetcher: mainBalanceFetcher, - CnrFetcher: cnrFetcher, - HeightFetcher: mnPool, - StateFetcher: mnPool, + return monitor.NewSideJob(monitor.SideJobArgs{ + Logger: logger, + Balance: balance, + Proxy: proxy, + AlphabetFetcher: alphabetFetcher, + NmFetcher: nmFetcher, + IRFetcher: nmFetcher, + BalanceFetcher: balanceFetcher, + CnrFetcher: cnrFetcher, + HeightFetcher: mnPool, + StateFetcher: mnPool, }), nil } diff --git a/config/config.yaml b/config/config.yaml index 2c952e6..e25058f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,5 +1,7 @@ -# Mainchain NEO RPC related configuration. -mainnet: +# Neo RPC related configuration. +chain: + # If true, monitor connects to the NeoFS chain, otherwise, to the Neo chain. + fschain: false rpc: dial_timeout: 5s # stores the interval after which a current connection health check is performed. @@ -8,15 +10,6 @@ mainnet: - https://rpc1.t5.n3.nspcc.ru:21331 - https://rpc2.t5.n3.nspcc.ru:21331 -# Sidechain NEO RPC related configuration. -morph: - rpc: - dial_timeout: 5s - health_recheck_interval: 5s - endpoint: - - https://rpc1.morph.t5.fs.neo.org:51331 - - https://rpc2.morph.t5.fs.neo.org:51331 - # Prometheus metric configuration. metrics: # Interval between NeoFS metric scrapping. diff --git a/pkg/monitor/main_job.go b/pkg/monitor/main_job.go new file mode 100644 index 0000000..695564d --- /dev/null +++ b/pkg/monitor/main_job.go @@ -0,0 +1,79 @@ +package monitor + +import ( + "encoding/hex" + + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/util" + "go.uber.org/zap" +) + +type ( + MainJobArgs struct { + AlphabetFetcher AlphabetFetcher + BalanceFetcher BalanceFetcher + Neofs *util.Uint160 + Logger *zap.Logger + } + + MainJob struct { + alphabetFetcher AlphabetFetcher + balanceFetcher BalanceFetcher + logger *zap.Logger + neofs *util.Uint160 + } +) + +func NewMainJob(args MainJobArgs) *MainJob { + return &MainJob{ + alphabetFetcher: args.AlphabetFetcher, + balanceFetcher: args.BalanceFetcher, + logger: args.Logger, + neofs: args.Neofs, + } +} + +func (m *MainJob) Process() { + if mainAlphabet, err := m.alphabetFetcher.FetchAlphabet(); err != nil { + m.logger.Warn("can't read NeoFS Aphabet members", zap.Error(err)) + } else { + processAlphabetPublicKeys(mainAlphabet) + m.processMainAlphabet(mainAlphabet) + } + + m.processMainChainSupply() +} + +func (m *MainJob) processMainAlphabet(alphabet keys.PublicKeys) { + exportGasBalances := make(map[string]int64, len(alphabet)) + + for _, key := range alphabet { + keyHex := hex.EncodeToString(key.Bytes()) + + balanceGAS, err := m.balanceFetcher.FetchGAS(*key) + if err != nil { + m.logger.Debug("can't fetch gas balance", zap.String("key", keyHex), zap.Error(err)) + } else { + exportGasBalances[keyHex] = balanceGAS + } + } + + alphabetGASBalances.Reset() + for k, v := range exportGasBalances { + alphabetGASBalances.WithLabelValues(k).Set(float64(v)) + } +} + +func (m *MainJob) processMainChainSupply() { + if m.neofs == nil { + return + } + + balance, err := m.balanceFetcher.FetchGASByScriptHash(*m.neofs) + if err != nil { + m.logger.Debug("can't fetch NeoFS contract's GAS balance", zap.Error(err)) + return + } + + mainChainSupply.Set(float64(balance)) +} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 62e878a..e741055 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -6,27 +6,15 @@ import ( "errors" "net/http" "sort" - "strconv" "time" - "github.com/nspcc-dev/locode-db/pkg/locodedb" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" ) type ( - NetmapFetcher interface { - FetchNetmap() (NetmapInfo, error) - FetchCandidates() (NetmapCandidatesInfo, error) - } - - InnerRingFetcher interface { - FetchInnerRingKeys() (keys.PublicKeys, error) - } - BalanceFetcher interface { FetchGAS(keys.PublicKey) (int64, error) FetchGASByScriptHash(uint160 util.Uint160) (int64, error) @@ -39,103 +27,27 @@ type ( FetchAlphabet() (keys.PublicKeys, error) } - ContainerFetcher interface { - Total() (int64, error) - } - - HeightFetcher interface { - FetchHeight() []HeightData - } - - StateFetcher interface { - FetchState(height uint32) []StateData - } - - HeightData struct { - Host string - Value uint32 - } - - StateData struct { - Host string - Value string - } - - Node struct { - ID uint64 - Address string - PublicKey *keys.PublicKey - Attributes map[string]string - Locode string - } - - NetmapInfo struct { - Epoch uint64 - Nodes []*Node - } - - NetmapCandidatesInfo struct { - Nodes []*Node - } - Monitor struct { - balance util.Uint160 - proxy *util.Uint160 - neofs *util.Uint160 - - logger *zap.Logger - sleep time.Duration - metricsServer http.Server - mainAlpFetcher AlphabetFetcher - sideAlpFetcher AlphabetFetcher - nmFetcher NetmapFetcher - irFetcher InnerRingFetcher - sideBlFetcher BalanceFetcher - mainBlFetcher BalanceFetcher - cnrFetcher ContainerFetcher - heightFetcher HeightFetcher - stateFetcher StateFetcher + job Job + logger *zap.Logger + sleep time.Duration + metricsServer http.Server } - Args struct { - Balance util.Uint160 - Proxy *util.Uint160 - Neofs *util.Uint160 - Logger *zap.Logger - Sleep time.Duration - MetricsAddress string - MainAlpFetcher AlphabetFetcher - SideAlpFetcher AlphabetFetcher - NmFetcher NetmapFetcher - IRFetcher InnerRingFetcher - SideBlFetcher BalanceFetcher - MainBlFetcher BalanceFetcher - CnrFetcher ContainerFetcher - HeightFetcher HeightFetcher - StateFetcher StateFetcher + Job interface { + Process() } ) -func New(p Args) *Monitor { +func New(job Job, metricAddress string, sleep time.Duration, logger *zap.Logger) *Monitor { return &Monitor{ - balance: p.Balance, - proxy: p.Proxy, - neofs: p.Neofs, - logger: p.Logger, - sleep: p.Sleep, + job: job, + sleep: sleep, + logger: logger, metricsServer: http.Server{ - Addr: p.MetricsAddress, + Addr: metricAddress, Handler: promhttp.Handler(), }, - mainAlpFetcher: p.MainAlpFetcher, - sideAlpFetcher: p.SideAlpFetcher, - nmFetcher: p.NmFetcher, - irFetcher: p.IRFetcher, - sideBlFetcher: p.SideBlFetcher, - mainBlFetcher: p.MainBlFetcher, - cnrFetcher: p.CnrFetcher, - heightFetcher: p.HeightFetcher, - stateFetcher: p.StateFetcher, } } @@ -159,8 +71,7 @@ func (m *Monitor) Stop() { func (m *Monitor) Job(ctx context.Context) { for { - m.processSideChain() - m.processMainChain() + m.job.Process() select { case <-time.After(m.sleep): @@ -172,273 +83,10 @@ func (m *Monitor) Job(ctx context.Context) { } } -func (m *Monitor) processMainChain() { - if mainAlphabet, err := m.mainAlpFetcher.FetchAlphabet(); err != nil { - m.logger.Warn("can't scrap main alphabet info", zap.Error(err)) - } else { - m.processAlphabetPublicKeys(mainAlphabet) - m.processMainAlphabet(mainAlphabet) - } - - m.processMainChainSupply() -} - -func (m *Monitor) processSideChain() { - m.logger.Debug("scraping data from side chain") - - netmap, err := m.nmFetcher.FetchNetmap() - if err != nil { - m.logger.Warn("can't scrap network map info", zap.Error(err)) - } else { - candidatesNetmap, err := m.nmFetcher.FetchCandidates() - if err != nil { - m.logger.Warn("can't scrap network map candidates info", zap.Error(err)) - } else { - m.processNetworkMap(netmap, candidatesNetmap) - } - } - - innerRing, err := m.irFetcher.FetchInnerRingKeys() - if err != nil { - m.logger.Warn("can't scrap inner ring info", zap.Error(err)) - } else { - m.processInnerRing(innerRing) - } - - if m.proxy != nil { - m.processProxyContract() - } - - m.processSideChainSupply() - - if alphabet, err := m.sideAlpFetcher.FetchAlphabet(); err != nil { - m.logger.Warn("can't scrap side alphabet info", zap.Error(err)) - } else { - m.processAlphabetPublicKeys(alphabet) - m.processSideAlphabet(alphabet) - } - - m.processContainersNumber() - - minHeight := m.processChainHeight() - m.processChainState(minHeight) -} - func (m *Monitor) Logger() *zap.Logger { return m.logger } -type diffNode struct { - currEpoch *Node - nextEpoch *Node -} - -type nodeLocation struct { - name string - long string - lat string -} - -func (m *Monitor) processNetworkMap(nm NetmapInfo, candidates NetmapCandidatesInfo) { - currentNetmapLen := len(nm.Nodes) - - exportCountries := make(map[nodeLocation]int, currentNetmapLen) - exportBalancesGAS := make(map[string]int64, currentNetmapLen) - exportBalancesNotary := make(map[string]int64, currentNetmapLen) - - newNodes, droppedNodes := getDiff(nm, candidates) - - for _, node := range nm.Nodes { - keyHex := hex.EncodeToString(node.PublicKey.Bytes()) - - balanceGAS, err := m.sideBlFetcher.FetchGAS(*node.PublicKey) - if err != nil { - m.logger.Debug("can't fetch GAS balance", zap.String("key", keyHex), zap.Error(err)) - } else { - exportBalancesGAS[keyHex] = balanceGAS - } - - record, err := locodedb.Get(node.Locode) - if err != nil { - m.logger.Debug("can't fetch geoposition", zap.String("key", keyHex), zap.Error(err)) - } else { - nodeLoc := nodeLocation{ - name: record.Location, - long: strconv.FormatFloat(float64(record.Point.Longitude), 'f', 4, 32), - lat: strconv.FormatFloat(float64(record.Point.Latitude), 'f', 4, 32), - } - - exportCountries[nodeLoc]++ - } - - balanceNotary, err := m.sideBlFetcher.FetchNotary(*node.PublicKey) - if err != nil { - m.logger.Debug("can't fetch notary balance", zap.String("key", keyHex), zap.Error(err)) - } else { - exportBalancesNotary[keyHex] = balanceNotary - } - } - - m.logNodes("new node", newNodes) - m.logNodes("dropped node", droppedNodes) - - epochNumber.Set(float64(nm.Epoch)) - droppedNodesCount.Set(float64(len(droppedNodes))) - newNodesCount.Set(float64(len(newNodes))) - - locationPresent.Reset() - for k, v := range exportCountries { - locationPresent.With(prometheus.Labels{ - location: k.name, - longitude: k.long, - latitude: k.lat, - }).Set(float64(v)) - } - - storageNodeGASBalances.Reset() - for k, v := range exportBalancesGAS { - storageNodeGASBalances.WithLabelValues(k).Set(float64(v)) - } - - storageNodeNotaryBalances.Reset() - for k, v := range exportBalancesNotary { - storageNodeNotaryBalances.WithLabelValues(k).Set(float64(v)) - } -} - -func (m *Monitor) logNodes(msg string, nodes []*Node) { - for _, node := range nodes { - fields := []zap.Field{zap.Uint64("id", node.ID), zap.String("address", node.Address), - zap.String("public key", node.PublicKey.String()), - } - - for key, val := range node.Attributes { - fields = append(fields, zap.String(key, val)) - } - - m.logger.Info(msg, fields...) - } -} - -func getDiff(nm NetmapInfo, cand NetmapCandidatesInfo) ([]*Node, []*Node) { - currentNetmapLen := len(nm.Nodes) - candidatesLen := len(cand.Nodes) - - diff := make(map[uint64]*diffNode, currentNetmapLen+candidatesLen) - - for _, currEpochNode := range nm.Nodes { - diff[currEpochNode.ID] = &diffNode{currEpoch: currEpochNode} - } - - var newCount int - - for _, nextEpochNode := range cand.Nodes { - if _, exists := diff[nextEpochNode.ID]; exists { - diff[nextEpochNode.ID].nextEpoch = nextEpochNode - } else { - newCount++ - diff[nextEpochNode.ID] = &diffNode{nextEpoch: nextEpochNode} - } - } - - droppedCount := currentNetmapLen - (candidatesLen - newCount) - - droppedNodes := make([]*Node, 0, droppedCount) - newNodes := make([]*Node, 0, newCount) - - for _, node := range diff { - if node.nextEpoch == nil { - droppedNodes = append(droppedNodes, node.currEpoch) - } - - if node.currEpoch == nil { - newNodes = append(newNodes, node.nextEpoch) - } - } - - return newNodes, droppedNodes -} - -func (m *Monitor) processInnerRing(ir keys.PublicKeys) { - exportBalances := make(map[string]int64, len(ir)) - - for _, key := range ir { - keyHex := hex.EncodeToString(key.Bytes()) - - balance, err := m.sideBlFetcher.FetchGAS(*key) - if err != nil { - m.logger.Debug("can't fetch balance", zap.String("key", keyHex), zap.Error(err)) - continue - } - - exportBalances[keyHex] = balance - } - - innerRingBalances.Reset() - for k, v := range exportBalances { - innerRingBalances.WithLabelValues(k).Set(float64(v)) - } -} - -func (m *Monitor) processProxyContract() { - balance, err := m.sideBlFetcher.FetchGASByScriptHash(*m.proxy) - if err != nil { - m.logger.Debug("can't fetch proxy contract balance", zap.Error(err)) - return - } - - proxyBalance.Set(float64(balance)) -} - -func (m *Monitor) processSideAlphabet(alphabet keys.PublicKeys) { - exportNotaryBalances := make(map[string]int64, len(alphabet)) - - for _, key := range alphabet { - keyHex := hex.EncodeToString(key.Bytes()) - - balanceNotary, err := m.sideBlFetcher.FetchNotary(*key) - if err != nil { - m.logger.Debug("can't fetch notary balance", zap.String("key", keyHex), zap.Error(err)) - } else { - exportNotaryBalances[keyHex] = balanceNotary - } - } - - alphabetNotaryBalances.Reset() - for k, v := range exportNotaryBalances { - alphabetNotaryBalances.WithLabelValues(k).Set(float64(v)) - } -} - -func (m *Monitor) processMainAlphabet(alphabet keys.PublicKeys) { - exportGasBalances := make(map[string]int64, len(alphabet)) - - for _, key := range alphabet { - keyHex := hex.EncodeToString(key.Bytes()) - - balanceGAS, err := m.mainBlFetcher.FetchGAS(*key) - if err != nil { - m.logger.Debug("can't fetch gas balance", zap.String("key", keyHex), zap.Error(err)) - } else { - exportGasBalances[keyHex] = balanceGAS - } - } - - alphabetGASBalances.Reset() - for k, v := range exportGasBalances { - alphabetGASBalances.WithLabelValues(k).Set(float64(v)) - } -} - -func (m *Monitor) processAlphabetPublicKeys(alphabet keys.PublicKeys) { - sorted := sortedAlphabet(alphabet) - - alphabetPubKeys.Reset() - for _, key := range sorted { - alphabetPubKeys.WithLabelValues(key).Set(1) - } -} - func sortedAlphabet(alphabet keys.PublicKeys) []string { sort.Sort(alphabet) sorted := make([]string, 0, len(alphabet)) @@ -449,66 +97,11 @@ func sortedAlphabet(alphabet keys.PublicKeys) []string { return sorted } -func (m *Monitor) processMainChainSupply() { - if m.neofs == nil { - return - } - - balance, err := m.mainBlFetcher.FetchGASByScriptHash(*m.neofs) - if err != nil { - m.logger.Debug("can't fetch neofs contract balance", zap.Error(err)) - return - } - - mainChainSupply.Set(float64(balance)) -} - -func (m *Monitor) processSideChainSupply() { - balance, err := m.sideBlFetcher.FetchNEP17TotalSupply(m.balance) - if err != nil { - m.logger.Debug("can't fetch balance contract total supply", zap.Error(err)) - return - } - - sideChainSupply.Set(float64(balance)) -} - -func (m *Monitor) processContainersNumber() { - total, err := m.cnrFetcher.Total() - if err != nil { - m.logger.Warn("can't fetch number of available containers", zap.Error(err)) - return - } - - containersNumber.Set(float64(total)) -} - -func (m *Monitor) processChainHeight() uint32 { - var minHeight uint32 - heightData := m.heightFetcher.FetchHeight() - - for _, d := range heightData { - chainHeight.WithLabelValues(d.Host).Set(float64(d.Value)) - - if minHeight == 0 || d.Value < minHeight { - minHeight = d.Value - } - } - - return minHeight -} - -func (m *Monitor) processChainState(height uint32) { - if height == 0 { - return - } - - stateData := m.stateFetcher.FetchState(height) - chainState.Reset() - - h := float64(height) +func processAlphabetPublicKeys(alphabet keys.PublicKeys) { + sorted := sortedAlphabet(alphabet) - for _, d := range stateData { - chainState.WithLabelValues(d.Host, d.Value).Set(h) + alphabetPubKeys.Reset() + for _, key := range sorted { + alphabetPubKeys.WithLabelValues(key).Set(1) } } diff --git a/pkg/monitor/side_job.go b/pkg/monitor/side_job.go new file mode 100644 index 0000000..09007fa --- /dev/null +++ b/pkg/monitor/side_job.go @@ -0,0 +1,386 @@ +package monitor + +import ( + "encoding/hex" + "strconv" + + "github.com/nspcc-dev/locode-db/pkg/locodedb" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +type ( + SideJobArgs struct { + Logger *zap.Logger + Balance util.Uint160 + Proxy *util.Uint160 + AlphabetFetcher AlphabetFetcher + NmFetcher NetmapFetcher + IRFetcher InnerRingFetcher + BalanceFetcher BalanceFetcher + CnrFetcher ContainerFetcher + HeightFetcher HeightFetcher + StateFetcher StateFetcher + } + + SideJob struct { + logger *zap.Logger + nmFetcher NetmapFetcher + irFetcher InnerRingFetcher + balanceFetcher BalanceFetcher + proxy *util.Uint160 + cnrFetcher ContainerFetcher + heightFetcher HeightFetcher + stateFetcher StateFetcher + alphabetFetcher AlphabetFetcher + balance util.Uint160 + } + + diffNode struct { + currEpoch *Node + nextEpoch *Node + } + + nodeLocation struct { + name string + long string + lat string + } + + NetmapCandidatesInfo struct { + Nodes []*Node + } + + ContainerFetcher interface { + Total() (int64, error) + } + + HeightFetcher interface { + FetchHeight() []HeightData + } + + StateFetcher interface { + FetchState(height uint32) []StateData + } + + HeightData struct { + Host string + Value uint32 + } + + StateData struct { + Host string + Value string + } + + Node struct { + ID uint64 + Address string + PublicKey *keys.PublicKey + Attributes map[string]string + Locode string + } + + NetmapInfo struct { + Epoch uint64 + Nodes []*Node + } + + NetmapFetcher interface { + FetchNetmap() (NetmapInfo, error) + FetchCandidates() (NetmapCandidatesInfo, error) + } + + InnerRingFetcher interface { + FetchInnerRingKeys() (keys.PublicKeys, error) + } +) + +func NewSideJob(args SideJobArgs) *SideJob { + return &SideJob{ + logger: args.Logger, + nmFetcher: args.NmFetcher, + irFetcher: args.IRFetcher, + balanceFetcher: args.BalanceFetcher, + proxy: args.Proxy, + cnrFetcher: args.CnrFetcher, + heightFetcher: args.HeightFetcher, + stateFetcher: args.StateFetcher, + alphabetFetcher: args.AlphabetFetcher, + balance: args.Balance, + } +} + +func (m *SideJob) Process() { + m.logger.Debug("retrieving data from side chain") + + netmap, err := m.nmFetcher.FetchNetmap() + if err != nil { + m.logger.Warn("can't read NeoFS network map", zap.Error(err)) + } else { + candidatesNetmap, err := m.nmFetcher.FetchCandidates() + if err != nil { + m.logger.Warn("can't read NeoFS network map candidates", zap.Error(err)) + } else { + m.processNetworkMap(netmap, candidatesNetmap) + } + } + + innerRing, err := m.irFetcher.FetchInnerRingKeys() + if err != nil { + m.logger.Warn("can't read NeoFS Inner Ring members", zap.Error(err)) + } else { + m.processInnerRing(innerRing) + } + + if m.proxy != nil { + m.processProxyContract() + } + + m.processSideChainSupply() + + if alphabet, err := m.alphabetFetcher.FetchAlphabet(); err != nil { + m.logger.Warn("can't read NeoFS ALphabet members", zap.Error(err)) + } else { + processAlphabetPublicKeys(alphabet) + m.processSideAlphabet(alphabet) + } + + m.processContainersNumber() + + minHeight := m.processChainHeight() + m.processChainState(minHeight) +} + +func (m *SideJob) processNetworkMap(nm NetmapInfo, candidates NetmapCandidatesInfo) { + currentNetmapLen := len(nm.Nodes) + + exportCountries := make(map[nodeLocation]int, currentNetmapLen) + exportBalancesGAS := make(map[string]int64, currentNetmapLen) + exportBalancesNotary := make(map[string]int64, currentNetmapLen) + + newNodes, droppedNodes := getDiff(nm, candidates) + + for _, node := range nm.Nodes { + keyHex := hex.EncodeToString(node.PublicKey.Bytes()) + + balanceGAS, err := m.balanceFetcher.FetchGAS(*node.PublicKey) + if err != nil { + m.logger.Debug("can't fetch GAS balance", zap.String("key", keyHex), zap.Error(err)) + } else { + exportBalancesGAS[keyHex] = balanceGAS + } + + record, err := locodedb.Get(node.Locode) + if err != nil { + m.logger.Debug("can't fetch geoposition of node from the NeoFS network map", + zap.String("key", keyHex), + zap.String("locode", node.Locode), + zap.Error(err), + ) + } else { + nodeLoc := nodeLocation{ + name: record.Location, + long: strconv.FormatFloat(float64(record.Point.Longitude), 'f', 4, 32), + lat: strconv.FormatFloat(float64(record.Point.Latitude), 'f', 4, 32), + } + + exportCountries[nodeLoc]++ + } + + balanceNotary, err := m.balanceFetcher.FetchNotary(*node.PublicKey) + if err != nil { + m.logger.Debug("can't fetch notary balance of node from the NeoFS network map", + zap.String("key", keyHex), + zap.Error(err), + ) + } else { + exportBalancesNotary[keyHex] = balanceNotary + } + } + + m.logNodes("new node", newNodes) + m.logNodes("dropped node", droppedNodes) + + epochNumber.Set(float64(nm.Epoch)) + droppedNodesCount.Set(float64(len(droppedNodes))) + newNodesCount.Set(float64(len(newNodes))) + + locationPresent.Reset() + for k, v := range exportCountries { + locationPresent.With(prometheus.Labels{ + location: k.name, + longitude: k.long, + latitude: k.lat, + }).Set(float64(v)) + } + + storageNodeGASBalances.Reset() + for k, v := range exportBalancesGAS { + storageNodeGASBalances.WithLabelValues(k).Set(float64(v)) + } + + storageNodeNotaryBalances.Reset() + for k, v := range exportBalancesNotary { + storageNodeNotaryBalances.WithLabelValues(k).Set(float64(v)) + } +} + +func (m *SideJob) logNodes(msg string, nodes []*Node) { + for _, node := range nodes { + fields := []zap.Field{zap.Uint64("id", node.ID), zap.String("address", node.Address), + zap.String("public key", node.PublicKey.String()), + } + + for key, val := range node.Attributes { + fields = append(fields, zap.String(key, val)) + } + + m.logger.Info(msg, fields...) + } +} + +func (m *SideJob) processInnerRing(ir keys.PublicKeys) { + exportBalances := make(map[string]int64, len(ir)) + + for _, key := range ir { + keyHex := hex.EncodeToString(key.Bytes()) + + balance, err := m.balanceFetcher.FetchGAS(*key) + if err != nil { + m.logger.Debug("can't fetch GAS balance of the NeoFS Inner Ring member", + zap.String("key", keyHex), + zap.Error(err), + ) + continue + } + + exportBalances[keyHex] = balance + } + + innerRingBalances.Reset() + for k, v := range exportBalances { + innerRingBalances.WithLabelValues(k).Set(float64(v)) + } +} + +func (m *SideJob) processProxyContract() { + balance, err := m.balanceFetcher.FetchGASByScriptHash(*m.proxy) + if err != nil { + m.logger.Debug("can't fetch proxy contract balance", zap.Stringer("address", m.proxy), zap.Error(err)) + return + } + + proxyBalance.Set(float64(balance)) +} + +func (m *SideJob) processSideAlphabet(alphabet keys.PublicKeys) { + exportNotaryBalances := make(map[string]int64, len(alphabet)) + + for _, key := range alphabet { + keyHex := hex.EncodeToString(key.Bytes()) + + balanceNotary, err := m.balanceFetcher.FetchNotary(*key) + if err != nil { + m.logger.Debug("can't fetch notary balance of the NeoFS Alphabet member", zap.String("key", keyHex), zap.Error(err)) + } else { + exportNotaryBalances[keyHex] = balanceNotary + } + } + + alphabetNotaryBalances.Reset() + for k, v := range exportNotaryBalances { + alphabetNotaryBalances.WithLabelValues(k).Set(float64(v)) + } +} + +func (m *SideJob) processSideChainSupply() { + balance, err := m.balanceFetcher.FetchNEP17TotalSupply(m.balance) + if err != nil { + m.logger.Debug("can't fetch balance contract total supply", zap.Stringer("address", m.balance), zap.Error(err)) + return + } + + sideChainSupply.Set(float64(balance)) +} + +func (m *SideJob) processContainersNumber() { + total, err := m.cnrFetcher.Total() + if err != nil { + m.logger.Warn("can't fetch number of available containers", zap.Error(err)) + return + } + + containersNumber.Set(float64(total)) +} + +func (m *SideJob) processChainHeight() uint32 { + var minHeight uint32 + heightData := m.heightFetcher.FetchHeight() + + for _, d := range heightData { + chainHeight.WithLabelValues(d.Host).Set(float64(d.Value)) + + if minHeight == 0 || d.Value < minHeight { + minHeight = d.Value + } + } + + return minHeight +} + +func (m *SideJob) processChainState(height uint32) { + if height == 0 { + return + } + + stateData := m.stateFetcher.FetchState(height) + chainState.Reset() + + h := float64(height) + + for _, d := range stateData { + chainState.WithLabelValues(d.Host, d.Value).Set(h) + } +} + +func getDiff(nm NetmapInfo, cand NetmapCandidatesInfo) ([]*Node, []*Node) { + currentNetmapLen := len(nm.Nodes) + candidatesLen := len(cand.Nodes) + + diff := make(map[uint64]*diffNode, currentNetmapLen+candidatesLen) + + for _, currEpochNode := range nm.Nodes { + diff[currEpochNode.ID] = &diffNode{currEpoch: currEpochNode} + } + + var newCount int + + for _, nextEpochNode := range cand.Nodes { + if _, exists := diff[nextEpochNode.ID]; exists { + diff[nextEpochNode.ID].nextEpoch = nextEpochNode + } else { + newCount++ + diff[nextEpochNode.ID] = &diffNode{nextEpoch: nextEpochNode} + } + } + + droppedCount := currentNetmapLen - (candidatesLen - newCount) + + droppedNodes := make([]*Node, 0, droppedCount) + newNodes := make([]*Node, 0, newCount) + + for _, node := range diff { + if node.nextEpoch == nil { + droppedNodes = append(droppedNodes, node.currEpoch) + } + + if node.currEpoch == nil { + newNodes = append(newNodes, node.nextEpoch) + } + } + + return newNodes, droppedNodes +} From f1d96a76c48e02eaae4304866cc243497e11f6b4 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 22 Dec 2023 15:07:42 +0400 Subject: [PATCH 6/7] *: Register prometheus metric for each type of tracker Signed-off-by: Evgenii Baidakov --- cmd/neofs-net-monitor/main.go | 3 --- cmd/neofs-net-monitor/monitor.go | 2 ++ pkg/monitor/metrics.go | 15 ++++++++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cmd/neofs-net-monitor/main.go b/cmd/neofs-net-monitor/main.go index 7295969..fcf3996 100644 --- a/cmd/neofs-net-monitor/main.go +++ b/cmd/neofs-net-monitor/main.go @@ -10,7 +10,6 @@ import ( "strings" "syscall" - "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" "github.com/spf13/viper" "go.uber.org/zap" ) @@ -45,8 +44,6 @@ func main() { os.Exit(1) } - monitor.RegisterMetrics() - neofsMonitor.Start(ctx) neofsMonitor.Logger().Info("application started", zap.String("version", Version)) diff --git a/cmd/neofs-net-monitor/monitor.go b/cmd/neofs-net-monitor/monitor.go index a68f55e..3798c01 100644 --- a/cmd/neofs-net-monitor/monitor.go +++ b/cmd/neofs-net-monitor/monitor.go @@ -38,8 +38,10 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { var job monitor.Job if cfg.GetBool(cfgChainFSChain) { + monitor.RegisterSideChainMetrics() job, err = sideChainJob(ctx, cfg, sideNeogoClient, logger, sideChainEndpoints) } else { + monitor.RegisterMainChainMetrics() job, err = mainChainJob(cfg, sideNeogoClient, logger) } diff --git a/pkg/monitor/metrics.go b/pkg/monitor/metrics.go index 5b0d8aa..4712046 100644 --- a/pkg/monitor/metrics.go +++ b/pkg/monitor/metrics.go @@ -169,8 +169,8 @@ var ( ) ) -// RegisterMetrics inits prometheus metrics. Panics if can't do it. -func RegisterMetrics() { +// RegisterSideChainMetrics inits prometheus metrics for side chain. Panics if can't do it. +func RegisterSideChainMetrics() { prometheus.MustRegister(locationPresent) prometheus.MustRegister(droppedNodesCount) prometheus.MustRegister(newNodesCount) @@ -178,13 +178,18 @@ func RegisterMetrics() { prometheus.MustRegister(storageNodeGASBalances) prometheus.MustRegister(storageNodeNotaryBalances) prometheus.MustRegister(innerRingBalances) - prometheus.MustRegister(alphabetGASBalances) prometheus.MustRegister(alphabetNotaryBalances) prometheus.MustRegister(proxyBalance) - prometheus.MustRegister(mainChainSupply) prometheus.MustRegister(sideChainSupply) - prometheus.MustRegister(alphabetPubKeys) + prometheus.MustRegister(alphabetPubKeys) // used for both monitors prometheus.MustRegister(containersNumber) prometheus.MustRegister(chainHeight) prometheus.MustRegister(chainState) } + +// RegisterMainChainMetrics inits prometheus metrics for main chain. Panics if can't do it. +func RegisterMainChainMetrics() { + prometheus.MustRegister(alphabetGASBalances) + prometheus.MustRegister(mainChainSupply) + prometheus.MustRegister(alphabetPubKeys) // used for both monitors +} From dde85584637c4062d073dc39543b9da7ec9245b0 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 29 Dec 2023 09:25:46 +0400 Subject: [PATCH 7/7] changelog: Update changelog Signed-off-by: Evgenii Baidakov --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7d2104..975421d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,17 @@ Changelog for NeoFS Monitor ### Changed - Usage of Locode DB Go package (#100) +- Configuration supports only one chain in a moment (#103) ### Removed - Locode DB configuration options (#100) +### Upgrading from v0.9.5 + +The configuration sections `mainnet` and `morph` were replaced with similar `chain` sections. To choice between +main (Neo) chain and side (NeoFS) chain, use `chain.fschain` option. If true, monitor connects to the NeoFS chain, +otherwise, to the Neo chain. + ## [0.9.5] - 2022-12-29 ### Changed