diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 37b1fc2111..a0fc486445 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -52,6 +52,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/control" + controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone" tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source" @@ -318,6 +319,8 @@ type shared struct { treeService *tree.Service metricsCollector *metrics.NodeMetrics + + control *controlSvc.Server } func (s *shared) resetCaches() { diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index e29dc0566d..6fbbc69829 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -36,19 +36,7 @@ func initControlService(c *cfg) { rawPubs = append(rawPubs, pubs[i].Bytes()) } - ctlSvc := controlSvc.New( - controlSvc.WithKey(&c.key.PrivateKey), - controlSvc.WithAuthorizedKeys(rawPubs), - controlSvc.WithHealthChecker(c), - controlSvc.WithNetMapSource(c.netMapSource), - controlSvc.WithContainerSource(c.cfgObject.cnrSource), - controlSvc.WithReplicator(c.replicator), - controlSvc.WithNodeState(c), - controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage), - controlSvc.WithTreeService(treeSynchronizer{ - c.treeService, - }), - ) + c.shared.control = controlSvc.New(&c.key.PrivateKey, rawPubs, c) lis, err := net.Listen("tcp", endpoint) if err != nil { @@ -62,7 +50,7 @@ func initControlService(c *cfg) { stopGRPC("NeoFS Control API", c.cfgControlService.server, c.log) }) - control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) + control.RegisterControlServiceServer(c.cfgControlService.server, c.shared.control) c.wg.Add(1) go func() { runAndLog(c, "control", false, func(c *cfg) { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 8a7fa836bc..ba4707b230 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -144,6 +144,15 @@ func initApp(c *cfg) { initAndLog(c, "morph notifications", listenMorphNotifications) c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher)) + + c.shared.control.MarkReady( + c.cfgObject.cfgLocalStorage.localStorage, + c.netMapSource, + c.cfgObject.cnrSource, + c.replicator, + c, + treeSynchronizer{c.treeService}, + ) } func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { diff --git a/pkg/services/control/server/dump.go b/pkg/services/control/server/dump.go index b73f646462..dba2c532e1 100644 --- a/pkg/services/control/server/dump.go +++ b/pkg/services/control/server/dump.go @@ -15,13 +15,19 @@ func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*c return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err = s.ready() + if err != nil { + return nil, err + } + shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) var prm shard.DumpPrm prm.WithPath(req.GetBody().GetFilepath()) prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - err = s.s.DumpShard(shardID, prm) + err = s.storage.DumpShard(shardID, prm) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 01aa09d8b7..e0a8e9d5e2 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -24,12 +24,18 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err = s.ready() + if err != nil { + return nil, err + } + var prm engine.EvacuateShardPrm prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) prm.WithFaultHandler(s.replicate) - res, err := s.s.Evacuate(prm) + res, err := s.storage.Evacuate(prm) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/flush_cache.go b/pkg/services/control/server/flush_cache.go index 4f580fbb83..0835b4b2d2 100644 --- a/pkg/services/control/server/flush_cache.go +++ b/pkg/services/control/server/flush_cache.go @@ -15,11 +15,17 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) ( return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err = s.ready() + if err != nil { + return nil, err + } + for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { var prm engine.FlushWriteCachePrm prm.SetShardID(shardID) - _, err = s.s.FlushWriteCache(prm) + _, err = s.storage.FlushWriteCache(prm) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/gc.go b/pkg/services/control/server/gc.go index de082a39ee..6dc55ed893 100644 --- a/pkg/services/control/server/gc.go +++ b/pkg/services/control/server/gc.go @@ -23,6 +23,12 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err := s.ready() + if err != nil { + return nil, err + } + binAddrList := req.GetBody().GetAddressList() addrList := make([]oid.Address, len(binAddrList)) @@ -41,7 +47,7 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) prm.WithForceRemoval() prm.WithAddress(addrList[i]) - _, err := s.s.Delete(prm) + _, err := s.storage.Delete(prm) if err != nil && firstErr == nil { firstErr = err } diff --git a/pkg/services/control/server/helpers.go b/pkg/services/control/server/helpers.go index 9f4023566c..930ffd4d48 100644 --- a/pkg/services/control/server/helpers.go +++ b/pkg/services/control/server/helpers.go @@ -1,7 +1,12 @@ package control -import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" +import ( + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) +// call only if `ready` returned no error. func (s *Server) getShardIDList(raw [][]byte) []*shard.ID { if len(raw) != 0 { res := make([]*shard.ID, 0, len(raw)) @@ -11,10 +16,18 @@ func (s *Server) getShardIDList(raw [][]byte) []*shard.ID { return res } - info := s.s.DumpInfo() + info := s.storage.DumpInfo() res := make([]*shard.ID, 0, len(info.Shards)) for i := range info.Shards { res = append(res, info.Shards[i].ID) } return res } + +func (s *Server) ready() error { + if !s.available.Load() { + return status.Error(codes.Unavailable, "service has not been completely initialized yet") + } + + return nil +} diff --git a/pkg/services/control/server/list_shards.go b/pkg/services/control/server/list_shards.go index 0e21d90282..3e5ec8c485 100644 --- a/pkg/services/control/server/list_shards.go +++ b/pkg/services/control/server/list_shards.go @@ -16,13 +16,18 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) ( return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err := s.ready() + if err != nil { + return nil, err + } // create and fill response resp := new(control.ListShardsResponse) body := new(control.ListShardsResponse_Body) resp.SetBody(body) - info := s.s.DumpInfo() + info := s.storage.DumpInfo() shardInfos := make([]*control.ShardInfo, 0, len(info.Shards)) diff --git a/pkg/services/control/server/restore.go b/pkg/services/control/server/restore.go index 895c59a0a6..06a36a9cc6 100644 --- a/pkg/services/control/server/restore.go +++ b/pkg/services/control/server/restore.go @@ -15,13 +15,19 @@ func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardReques return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err = s.ready() + if err != nil { + return nil, err + } + shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) var prm shard.RestorePrm prm.WithPath(req.GetBody().GetFilepath()) prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - err = s.s.RestoreShard(shardID, prm) + err = s.storage.RestoreShard(shardID, prm) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/server.go b/pkg/services/control/server/server.go index 5fbc70df40..806570b973 100644 --- a/pkg/services/control/server/server.go +++ b/pkg/services/control/server/server.go @@ -2,6 +2,8 @@ package control import ( "crypto/ecdsa" + "fmt" + "sync/atomic" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -13,6 +15,11 @@ import ( // Server is an entity that serves // Control service on storage node. type Server struct { + // initialization sync; locks any calls except + // health checks before [Server.MarkReady] is + // called + available atomic.Bool + *cfg } @@ -65,89 +72,47 @@ type cfg struct { treeService TreeService - s *engine.StorageEngine -} - -func defaultCfg() *cfg { - return &cfg{} + storage *engine.StorageEngine } // New creates, initializes and returns new Server instance. -func New(opts ...Option) *Server { - c := defaultCfg() - - for _, opt := range opts { - opt(c) +// Must be marked as available with [Server.MarkReady] when all the +// components for serving are ready. Before [Server.MarkReady] call +// only health checks are available. +func New(key *ecdsa.PrivateKey, authorizedKeys [][]byte, healthChecker HealthChecker) *Server { + cfg := &cfg{ + key: key, + allowedKeys: authorizedKeys, + healthChecker: healthChecker, } return &Server{ - cfg: c, - } -} - -// WithKey returns option to set private key -// used for signing responses. -func WithKey(key *ecdsa.PrivateKey) Option { - return func(c *cfg) { - c.key = key + cfg: cfg, } } -// WithAuthorizedKeys returns option to add list of public -// keys that have rights to use Control service. -func WithAuthorizedKeys(keys [][]byte) Option { - return func(c *cfg) { - c.allowedKeys = append(c.allowedKeys, keys...) +// MarkReady marks server available. Before this call none of the other calls +// are available except for the health checks. +func (s *Server) MarkReady(e *engine.StorageEngine, nm netmap.Source, c container.Source, r *replicator.Replicator, st NodeState, tr TreeService) { + panicOnNil := func(name string, service any) { + if service == nil { + panic(fmt.Sprintf("'%s' is nil", name)) + } } -} - -// WithHealthChecker returns option to set component -// to calculate node health status. -func WithHealthChecker(hc HealthChecker) Option { - return func(c *cfg) { - c.healthChecker = hc - } -} - -// WithNetMapSource returns option to set network map storage. -func WithNetMapSource(netMapSrc netmap.Source) Option { - return func(c *cfg) { - c.netMapSrc = netMapSrc - } -} -// WithContainerSource returns option to set container storage. -func WithContainerSource(cnrSrc container.Source) Option { - return func(c *cfg) { - c.cnrSrc = cnrSrc - } -} - -// WithReplicator returns option to set network map storage. -func WithReplicator(r *replicator.Replicator) Option { - return func(c *cfg) { - c.replicator = r - } -} - -// WithNodeState returns option to set node network state component. -func WithNodeState(state NodeState) Option { - return func(c *cfg) { - c.nodeState = state - } -} - -// WithLocalStorage returns option to set local storage engine that -// contains information about shards. -func WithLocalStorage(engine *engine.StorageEngine) Option { - return func(c *cfg) { - c.s = engine - } -} - -// WithTreeService returns an option to set tree service. -func WithTreeService(s TreeService) Option { - return func(c *cfg) { - c.treeService = s - } + panicOnNil("storage engine", e) + panicOnNil("netmap source", nm) + panicOnNil("container source", c) + panicOnNil("replicator", r) + panicOnNil("node state", st) + panicOnNil("tree service", st) + + s.storage = e + s.netMapSrc = nm + s.cnrSrc = c + s.replicator = r + s.nodeState = st + s.treeService = tr + + s.available.Store(true) } diff --git a/pkg/services/control/server/set_netmap_status.go b/pkg/services/control/server/set_netmap_status.go index d9ddc31547..a4e2601d25 100644 --- a/pkg/services/control/server/set_netmap_status.go +++ b/pkg/services/control/server/set_netmap_status.go @@ -17,7 +17,12 @@ func (s *Server) SetNetmapStatus(_ context.Context, req *control.SetNetmapStatus return nil, status.Error(codes.PermissionDenied, err.Error()) } - var err error + // check availability + err := s.ready() + if err != nil { + return nil, err + } + bodyReq := req.GetBody() st := bodyReq.GetStatus() force := bodyReq.GetForceMaintenance() diff --git a/pkg/services/control/server/set_shard_mode.go b/pkg/services/control/server/set_shard_mode.go index 9e9b50b3bc..cee394c9dd 100644 --- a/pkg/services/control/server/set_shard_mode.go +++ b/pkg/services/control/server/set_shard_mode.go @@ -17,6 +17,12 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err = s.ready() + if err != nil { + return nil, err + } + var ( m mode.Mode @@ -37,7 +43,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques } for _, shardID := range s.getShardIDList(req.Body.GetShard_ID()) { - err = s.s.SetShardMode(shardID, m, req.Body.GetResetErrorCounter()) + err = s.storage.SetShardMode(shardID, m, req.Body.GetResetErrorCounter()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/syncronize_tree.go b/pkg/services/control/server/syncronize_tree.go index b4e91071e8..f0bfdcf35a 100644 --- a/pkg/services/control/server/syncronize_tree.go +++ b/pkg/services/control/server/syncronize_tree.go @@ -20,6 +20,12 @@ func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTr return nil, status.Error(codes.PermissionDenied, err.Error()) } + // check availability + err = s.ready() + if err != nil { + return nil, err + } + if s.treeService == nil { return nil, status.Error(codes.Internal, "tree service is disabled") }