Skip to content

Commit

Permalink
control: set the highest priority for control service.
Browse files Browse the repository at this point in the history
Control ports should be initialised and start served first of all. As
initControlService already listening endpoint, no need of preRun of
control service in storage nodes.

Closes: #2585.

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Nov 17, 2023
1 parent 8d9a2bc commit a595390
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Changelog for NeoFS Node
- IR uses internal LOCODE DB from Go package (#2610)
- Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642)
- The priority of metrics service is increased (#2428)
- The priority of running control service is increased (#2585)

### Removed
- deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496)
Expand Down
7 changes: 4 additions & 3 deletions cmd/neofs-node/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ func initControlService(c *cfg) {
})

control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)

c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.wg.Add(1)
go func() {
runAndLog(c, "control", false, func(c *cfg) {
fatalOnErr(c.cfgControlService.server.Serve(lis))
c.wg.Done()
})
}))
}()
}

func (c *cfg) NetmapStatus() control.NetmapStatus {
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
}

func initApp(c *cfg) {
initAndLog(c, "control", initControlService)
initLocalStorage(c)

c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -139,7 +140,6 @@ func initApp(c *cfg) {
initAndLog(c, "notification", initNotifications)
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)

initAndLog(c, "morph notifications", listenMorphNotifications)

Expand Down
97 changes: 46 additions & 51 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,52 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}
}

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err == nil {
go func() {
errChan <- grpcControlSrv.Serve(lis)
}()
}

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

// create morph listener
server.morphListener, err = createListener(server.morphClient, morphChain)
if err != nil {
Expand Down Expand Up @@ -985,57 +1031,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-

server.addBlockTimer(emissionTimer)

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

server.runners = append(server.runners, func(ch chan<- error) error {
lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
}

go func() {
ch <- grpcControlSrv.Serve(lis)
}()
return nil
})

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

return server, nil
}

Expand Down

0 comments on commit a595390

Please sign in to comment.