Skip to content

Commit

Permalink
control: set the highest priority for control service.
Browse files Browse the repository at this point in the history
Control ports should be initialised and start served first of all. As
initControlService already listening endpoint, no need of preRun of
control service in storage nodes.

Closes: #2585.

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Nov 16, 2023
1 parent f865d17 commit e9c7422
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Changelog for NeoFS Node
- New optimized SDK version is integrated (#2622)
- IR uses internal LOCODE DB from Go package (#2610)
- Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642)
- The priority of running control service is increased (#2585)

### Removed
- deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496)
Expand Down
7 changes: 4 additions & 3 deletions cmd/neofs-node/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ func initControlService(c *cfg) {
})

control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)

c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.wg.Add(1)
go func() {
runAndLog(c, "control", false, func(c *cfg) {
fatalOnErr(c.cfgControlService.server.Serve(lis))
c.wg.Done()
})
}))
}()
}

func (c *cfg) NetmapStatus() control.NetmapStatus {
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
}

func initApp(c *cfg) {
initAndLog(c, "control", initControlService)
initLocalStorage(c)

c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -99,7 +100,6 @@ func initApp(c *cfg) {
initAndLog(c, "pprof", initProfiler)
initAndLog(c, "prometheus", initMetrics)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)

initAndLog(c, "morph notifications", listenMorphNotifications)

Expand Down
97 changes: 46 additions & 51 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,52 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}
}

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err == nil {
go func() {
errChan <- grpcControlSrv.Serve(lis)
}()
}

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

// create morph listener
server.morphListener, err = createListener(server.morphClient, morphChain)
if err != nil {
Expand Down Expand Up @@ -985,57 +1031,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-

server.addBlockTimer(emissionTimer)

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

server.runners = append(server.runners, func(ch chan<- error) error {
lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
}

go func() {
ch <- grpcControlSrv.Serve(lis)
}()
return nil
})

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

return server, nil
}

Expand Down

0 comments on commit e9c7422

Please sign in to comment.