Skip to content

Commit

Permalink
ir: Start metrics and control service before blockchain init
Browse files Browse the repository at this point in the history
Sync may take some time while nothing stops IR from showing some useful info.
Closes #2677.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Dec 18, 2023
1 parent 34cdb12 commit c33a21a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog for NeoFS Node
### Fixed
- Fund transfer deadlock in NeoFS chain auto-deploy/update procedure (#2681)
- Invalid contracts' update transactions when epochs are stuck during the NeoFS chain update (#2680)
- Metrics availability during startup (#2677)

### Changed

Expand Down
136 changes: 75 additions & 61 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,28 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}

isLocalConsensus := isLocalConsensusMode(cfg)
if isLocalConsensus {
if singleAcc == nil {
return nil, fmt.Errorf("missing account with label '%s' in wallet '%s'", singleAccLabel, walletPass)
}

server.key = singleAcc.PrivateKey()
} else {
acc, err := utilConfig.LoadAccount(walletPath, cfg.GetString("wallet.address"), walletPass)
if err != nil {
return nil, fmt.Errorf("ir: %w", err)
}

server.key = acc.PrivateKey()
}

err = serveControl(server, log, cfg, errChan)
if err != nil {
return nil, err
}

serveMetrics(server, cfg)

var localWSClient *rpcclient.WSClient // set if isLocalConsensus only

// create morph client
Expand All @@ -406,10 +428,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
return nil, fmt.Errorf("invalid blockchain configuration: %w", err)
}

if singleAcc == nil {
return nil, fmt.Errorf("missing account with label '%s' in wallet '%s'", singleAccLabel, walletPass)
}

if consensusAcc == nil {
return nil, fmt.Errorf("missing account with label '%s' in wallet '%s'", consensusAccLabel, walletPass)
}
Expand Down Expand Up @@ -450,7 +468,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
return nil, fmt.Errorf("build WS client on internal blockchain: %w", err)
}

server.key = singleAcc.PrivateKey()
morphChain.key = server.key
sidechainOpts := make([]client.Option, 3, 4)
sidechainOpts[0] = client.WithContext(ctx)
Expand All @@ -471,16 +488,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}

// fallback to the pure RPC architecture
acc, err := utilConfig.LoadAccount(
walletPath,
cfg.GetString("wallet.address"),
walletPass,
)
if err != nil {
return nil, fmt.Errorf("ir: %w", err)
}

server.key = acc.PrivateKey()
morphChain.key = server.key
morphChain.withAutoSidechainScope = !isAutoDeploy

Expand Down Expand Up @@ -564,53 +572,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}
}

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return nil, err
}
var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

go func() {
errChan <- grpcControlSrv.Serve(lis)
}()

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

// create morph listener
server.morphListener, err = createListener(server.morphClient, morphChain)
if err != nil {
Expand Down Expand Up @@ -1272,3 +1233,56 @@ func (s *Server) restartMorph() error {
func (s *Server) restartMainChain() error {
return nil
}

func serveControl(server *Server, log *zap.Logger, cfg *viper.Viper, errChan chan<- error) error {
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
}
var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

go func() {
errChan <- grpcControlSrv.Serve(lis)
}()

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

return nil
}

func serveMetrics(server *Server, cfg *viper.Viper) {
if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}
}

0 comments on commit c33a21a

Please sign in to comment.