From e9c74226753195adc8e06d63bd27fdf846b394b6 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 16 Nov 2023 19:14:17 +0400 Subject: [PATCH] control: set the highest priority for control service. Control ports should be initialised and start served first of all. As initControlService already listening endpoint, no need of preRun of control service in storage nodes. Closes: #2585. Signed-off-by: Ekaterina Pavlova --- CHANGELOG.md | 1 + cmd/neofs-node/control.go | 7 +-- cmd/neofs-node/main.go | 2 +- pkg/innerring/innerring.go | 97 ++++++++++++++++++-------------------- 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21540f108be..d218a5e7933 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Changelog for NeoFS Node - New optimized SDK version is integrated (#2622) - IR uses internal LOCODE DB from Go package (#2610) - Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642) +- The priority of running control service is increased (#2585) ### Removed - deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496) diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 02ece79da74..e29dc0566dc 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -63,12 +63,13 @@ func initControlService(c *cfg) { }) control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) - - c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { + c.wg.Add(1) + go func() { runAndLog(c, "control", false, func(c *cfg) { fatalOnErr(c.cfgControlService.server.Serve(lis)) + c.wg.Done() }) - })) + }() } func (c *cfg) NetmapStatus() control.NetmapStatus { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 907e85d63ef..7e0a113fdb6 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -79,6 +79,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) { } func initApp(c *cfg) { + initAndLog(c, "control", initControlService) initLocalStorage(c) c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) @@ -99,7 +100,6 @@ func initApp(c *cfg) { initAndLog(c, "pprof", initProfiler) initAndLog(c, "prometheus", initMetrics) initAndLog(c, "tree", initTreeService) - initAndLog(c, "control", initControlService) initAndLog(c, "morph notifications", listenMorphNotifications) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 0f2d5c8ca32..b71811f0a72 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -519,6 +519,52 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- } } + controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") + if controlSvcEndpoint != "" { + authKeysStr := cfg.GetStringSlice("control.authorized_keys") + authKeys := make([][]byte, 0, len(authKeysStr)) + + for i := range authKeysStr { + key, err := hex.DecodeString(authKeysStr[i]) + if err != nil { + return nil, fmt.Errorf("could not parse Control authorized key %s: %w", + authKeysStr[i], + err, + ) + } + + authKeys = append(authKeys, key) + } + + var p controlsrv.Prm + + p.SetPrivateKey(*server.key) + p.SetHealthChecker(server) + + controlSvc := controlsrv.New(p, + controlsrv.WithAllowedKeys(authKeys), + ) + + grpcControlSrv := grpc.NewServer() + control.RegisterControlServiceServer(grpcControlSrv, controlSvc) + + lis, err := net.Listen("tcp", controlSvcEndpoint) + if err == nil { + go func() { + errChan <- grpcControlSrv.Serve(lis) + }() + } + + server.registerNoErrCloser(grpcControlSrv.GracefulStop) + } else { + log.Info("no Control server endpoint specified, service is disabled") + } + + if cfg.GetString("prometheus.address") != "" { + m := metrics.NewInnerRingMetrics(misc.Version) + server.metrics = &m + } + // create morph listener server.morphListener, err = createListener(server.morphClient, morphChain) if err != nil { @@ -985,57 +1031,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- server.addBlockTimer(emissionTimer) - controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") - if controlSvcEndpoint != "" { - authKeysStr := cfg.GetStringSlice("control.authorized_keys") - authKeys := make([][]byte, 0, len(authKeysStr)) - - for i := range authKeysStr { - key, err := hex.DecodeString(authKeysStr[i]) - if err != nil { - return nil, fmt.Errorf("could not parse Control authorized key %s: %w", - authKeysStr[i], - err, - ) - } - - authKeys = append(authKeys, key) - } - - var p controlsrv.Prm - - p.SetPrivateKey(*server.key) - p.SetHealthChecker(server) - - controlSvc := controlsrv.New(p, - controlsrv.WithAllowedKeys(authKeys), - ) - - grpcControlSrv := grpc.NewServer() - control.RegisterControlServiceServer(grpcControlSrv, controlSvc) - - server.runners = append(server.runners, func(ch chan<- error) error { - lis, err := net.Listen("tcp", controlSvcEndpoint) - if err != nil { - return err - } - - go func() { - ch <- grpcControlSrv.Serve(lis) - }() - return nil - }) - - server.registerNoErrCloser(grpcControlSrv.GracefulStop) - } else { - log.Info("no Control server endpoint specified, service is disabled") - } - - if cfg.GetString("prometheus.address") != "" { - m := metrics.NewInnerRingMetrics(misc.Version) - server.metrics = &m - } - return server, nil }