From a59539072b2a542e3f2a935c1ac18863d2f6f9d5 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 16 Nov 2023 19:14:17 +0400 Subject: [PATCH] control: set the highest priority for control service. Control ports should be initialised and start served first of all. As initControlService already listening endpoint, no need of preRun of control service in storage nodes. Closes: #2585. Signed-off-by: Ekaterina Pavlova --- CHANGELOG.md | 1 + cmd/neofs-node/control.go | 7 +-- cmd/neofs-node/main.go | 2 +- pkg/innerring/innerring.go | 97 ++++++++++++++++++-------------------- 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f01252c2d5e..be866a56021 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Changelog for NeoFS Node - IR uses internal LOCODE DB from Go package (#2610) - Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642) - The priority of metrics service is increased (#2428) +- The priority of running control service is increased (#2585) ### Removed - deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496) diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 02ece79da74..e29dc0566dc 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -63,12 +63,13 @@ func initControlService(c *cfg) { }) control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) - - c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { + c.wg.Add(1) + go func() { runAndLog(c, "control", false, func(c *cfg) { fatalOnErr(c.cfgControlService.server.Serve(lis)) + c.wg.Done() }) - })) + }() } func (c *cfg) NetmapStatus() control.NetmapStatus { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index b6e890518b0..8a7fa836bc9 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -121,6 +121,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) { } func initApp(c *cfg) { + initAndLog(c, "control", initControlService) initLocalStorage(c) c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) @@ -139,7 +140,6 @@ func initApp(c *cfg) { initAndLog(c, "notification", initNotifications) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) - initAndLog(c, "control", initControlService) initAndLog(c, "morph notifications", listenMorphNotifications) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 0f2d5c8ca32..b71811f0a72 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -519,6 +519,52 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- } } + controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") + if controlSvcEndpoint != "" { + authKeysStr := cfg.GetStringSlice("control.authorized_keys") + authKeys := make([][]byte, 0, len(authKeysStr)) + + for i := range authKeysStr { + key, err := hex.DecodeString(authKeysStr[i]) + if err != nil { + return nil, fmt.Errorf("could not parse Control authorized key %s: %w", + authKeysStr[i], + err, + ) + } + + authKeys = append(authKeys, key) + } + + var p controlsrv.Prm + + p.SetPrivateKey(*server.key) + p.SetHealthChecker(server) + + controlSvc := controlsrv.New(p, + controlsrv.WithAllowedKeys(authKeys), + ) + + grpcControlSrv := grpc.NewServer() + control.RegisterControlServiceServer(grpcControlSrv, controlSvc) + + lis, err := net.Listen("tcp", controlSvcEndpoint) + if err == nil { + go func() { + errChan <- grpcControlSrv.Serve(lis) + }() + } + + server.registerNoErrCloser(grpcControlSrv.GracefulStop) + } else { + log.Info("no Control server endpoint specified, service is disabled") + } + + if cfg.GetString("prometheus.address") != "" { + m := metrics.NewInnerRingMetrics(misc.Version) + server.metrics = &m + } + // create morph listener server.morphListener, err = createListener(server.morphClient, morphChain) if err != nil { @@ -985,57 +1031,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- server.addBlockTimer(emissionTimer) - controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") - if controlSvcEndpoint != "" { - authKeysStr := cfg.GetStringSlice("control.authorized_keys") - authKeys := make([][]byte, 0, len(authKeysStr)) - - for i := range authKeysStr { - key, err := hex.DecodeString(authKeysStr[i]) - if err != nil { - return nil, fmt.Errorf("could not parse Control authorized key %s: %w", - authKeysStr[i], - err, - ) - } - - authKeys = append(authKeys, key) - } - - var p controlsrv.Prm - - p.SetPrivateKey(*server.key) - p.SetHealthChecker(server) - - controlSvc := controlsrv.New(p, - controlsrv.WithAllowedKeys(authKeys), - ) - - grpcControlSrv := grpc.NewServer() - control.RegisterControlServiceServer(grpcControlSrv, controlSvc) - - server.runners = append(server.runners, func(ch chan<- error) error { - lis, err := net.Listen("tcp", controlSvcEndpoint) - if err != nil { - return err - } - - go func() { - ch <- grpcControlSrv.Serve(lis) - }() - return nil - }) - - server.registerNoErrCloser(grpcControlSrv.GracefulStop) - } else { - log.Info("no Control server endpoint specified, service is disabled") - } - - if cfg.GetString("prometheus.address") != "" { - m := metrics.NewInnerRingMetrics(misc.Version) - server.metrics = &m - } - return server, nil }