Skip to content

Commit

Permalink
ir: Start metrics and control service before blockchain init (#2679)
Browse files Browse the repository at this point in the history
Sync may take some time while nothing stops IR from showing some useful
info.

Closes #2585.
  • Loading branch information
roman-khimov authored Dec 18, 2023
2 parents 34cdb12 + c33a21a commit c1b8cac
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog for NeoFS Node
### Fixed
- Fund transfer deadlock in NeoFS chain auto-deploy/update procedure (#2681)
- Invalid contracts' update transactions when epochs are stuck during the NeoFS chain update (#2680)
- Metrics availability during startup (#2677)

### Changed

Expand Down
136 changes: 75 additions & 61 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,28 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}

isLocalConsensus := isLocalConsensusMode(cfg)
if isLocalConsensus {
if singleAcc == nil {
return nil, fmt.Errorf("missing account with label '%s' in wallet '%s'", singleAccLabel, walletPass)
}

server.key = singleAcc.PrivateKey()
} else {
acc, err := utilConfig.LoadAccount(walletPath, cfg.GetString("wallet.address"), walletPass)
if err != nil {
return nil, fmt.Errorf("ir: %w", err)
}

server.key = acc.PrivateKey()
}

err = serveControl(server, log, cfg, errChan)
if err != nil {
return nil, err
}

serveMetrics(server, cfg)

var localWSClient *rpcclient.WSClient // set if isLocalConsensus only

// create morph client
Expand All @@ -406,10 +428,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
return nil, fmt.Errorf("invalid blockchain configuration: %w", err)
}

if singleAcc == nil {
return nil, fmt.Errorf("missing account with label '%s' in wallet '%s'", singleAccLabel, walletPass)
}

if consensusAcc == nil {
return nil, fmt.Errorf("missing account with label '%s' in wallet '%s'", consensusAccLabel, walletPass)
}
Expand Down Expand Up @@ -450,7 +468,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
return nil, fmt.Errorf("build WS client on internal blockchain: %w", err)
}

server.key = singleAcc.PrivateKey()
morphChain.key = server.key
sidechainOpts := make([]client.Option, 3, 4)
sidechainOpts[0] = client.WithContext(ctx)
Expand All @@ -471,16 +488,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}

// fallback to the pure RPC architecture
acc, err := utilConfig.LoadAccount(
walletPath,
cfg.GetString("wallet.address"),
walletPass,
)
if err != nil {
return nil, fmt.Errorf("ir: %w", err)
}

server.key = acc.PrivateKey()
morphChain.key = server.key
morphChain.withAutoSidechainScope = !isAutoDeploy

Expand Down Expand Up @@ -564,53 +572,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}
}

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return nil, err
}
var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

go func() {
errChan <- grpcControlSrv.Serve(lis)
}()

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

// create morph listener
server.morphListener, err = createListener(server.morphClient, morphChain)
if err != nil {
Expand Down Expand Up @@ -1272,3 +1233,56 @@ func (s *Server) restartMorph() error {
func (s *Server) restartMainChain() error {
return nil
}

func serveControl(server *Server, log *zap.Logger, cfg *viper.Viper, errChan chan<- error) error {
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
}
var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

go func() {
errChan <- grpcControlSrv.Serve(lis)
}()

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

return nil
}

func serveMetrics(server *Server, cfg *viper.Viper) {
if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}
}

0 comments on commit c1b8cac

Please sign in to comment.