Skip to content

Commit

Permalink
node: Fix nil panic in control service
Browse files Browse the repository at this point in the history
Split service creation on two steps:
1. service creation with key initialization and basic health checks;
2. service real initialization that includes internals reading (such as storage
engine, tree service, etc).

The first one handles incoming requests when the app starts, the second one is
responsible for the full work mode (storage engine initializations may take
time in the real runs with the real objects stored).

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Nov 21, 2023
1 parent 67c5b02 commit 77404cc
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 98 deletions.
3 changes: 3 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone"
tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
Expand Down Expand Up @@ -318,6 +319,8 @@ type shared struct {
treeService *tree.Service

metricsCollector *metrics.NodeMetrics

control *controlSvc.Server
}

func (s *shared) resetCaches() {
Expand Down
16 changes: 2 additions & 14 deletions cmd/neofs-node/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,7 @@ func initControlService(c *cfg) {
rawPubs = append(rawPubs, pubs[i].Bytes())
}

ctlSvc := controlSvc.New(
controlSvc.WithKey(&c.key.PrivateKey),
controlSvc.WithAuthorizedKeys(rawPubs),
controlSvc.WithHealthChecker(c),
controlSvc.WithNetMapSource(c.netMapSource),
controlSvc.WithContainerSource(c.cfgObject.cnrSource),
controlSvc.WithReplicator(c.replicator),
controlSvc.WithNodeState(c),
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
controlSvc.WithTreeService(treeSynchronizer{
c.treeService,
}),
)
c.shared.control = controlSvc.New(&c.key.PrivateKey, rawPubs, c)

Check warning on line 39 in cmd/neofs-node/control.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/control.go#L39

Added line #L39 was not covered by tests

lis, err := net.Listen("tcp", endpoint)
if err != nil {
Expand All @@ -62,7 +50,7 @@ func initControlService(c *cfg) {
stopGRPC("NeoFS Control API", c.cfgControlService.server, c.log)
})

control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)
control.RegisterControlServiceServer(c.cfgControlService.server, c.shared.control)

Check warning on line 53 in cmd/neofs-node/control.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/control.go#L53

Added line #L53 was not covered by tests
c.wg.Add(1)
go func() {
runAndLog(c, "control", false, func(c *cfg) {
Expand Down
9 changes: 9 additions & 0 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ func initApp(c *cfg) {
initAndLog(c, "morph notifications", listenMorphNotifications)

c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher))

c.shared.control.MarkReady(
c.cfgObject.cfgLocalStorage.localStorage,
c.netMapSource,
c.cfgObject.cnrSource,
c.replicator,
c,
treeSynchronizer{c.treeService},
)

Check warning on line 155 in cmd/neofs-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/main.go#L148-L155

Added lines #L148 - L155 were not covered by tests
}

func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*c
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())

var prm shard.DumpPrm
prm.WithPath(req.GetBody().GetFilepath())
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())

err = s.s.DumpShard(shardID, prm)
err = s.storage.DumpShard(shardID, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)

res, err := s.s.Evacuate(prm)
res, err := s.storage.Evacuate(prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/flush_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) (
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
var prm engine.FlushWriteCachePrm
prm.SetShardID(shardID)

_, err = s.s.FlushWriteCache(prm)
_, err = s.storage.FlushWriteCache(prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest)
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err := s.ready()
if err != nil {
return nil, err
}

binAddrList := req.GetBody().GetAddressList()
addrList := make([]oid.Address, len(binAddrList))

Expand All @@ -41,7 +47,7 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest)
prm.WithForceRemoval()
prm.WithAddress(addrList[i])

_, err := s.s.Delete(prm)
_, err := s.storage.Delete(prm)
if err != nil && firstErr == nil {
firstErr = err
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/services/control/server/helpers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package control

import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// call only if `ready` returned no error.
func (s *Server) getShardIDList(raw [][]byte) []*shard.ID {
if len(raw) != 0 {
res := make([]*shard.ID, 0, len(raw))
Expand All @@ -11,10 +16,18 @@ func (s *Server) getShardIDList(raw [][]byte) []*shard.ID {
return res
}

info := s.s.DumpInfo()
info := s.storage.DumpInfo()
res := make([]*shard.ID, 0, len(info.Shards))
for i := range info.Shards {
res = append(res, info.Shards[i].ID)
}
return res
}

func (s *Server) ready() error {
if !s.available.Load() {
return status.Error(codes.Unavailable, "service has not been completely initialized yet")
}

return nil
}
7 changes: 6 additions & 1 deletion pkg/services/control/server/list_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err := s.ready()
if err != nil {
return nil, err
}
// create and fill response
resp := new(control.ListShardsResponse)

body := new(control.ListShardsResponse_Body)
resp.SetBody(body)

info := s.s.DumpInfo()
info := s.storage.DumpInfo()

shardInfos := make([]*control.ShardInfo, 0, len(info.Shards))

Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardReques
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())

var prm shard.RestorePrm
prm.WithPath(req.GetBody().GetFilepath())
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())

err = s.s.RestoreShard(shardID, prm)
err = s.storage.RestoreShard(shardID, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
113 changes: 39 additions & 74 deletions pkg/services/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package control

import (
"crypto/ecdsa"
"fmt"
"sync/atomic"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
Expand All @@ -13,6 +15,11 @@ import (
// Server is an entity that serves
// Control service on storage node.
type Server struct {
// initialization sync; locks any calls except
// health checks before [Server.MarkReady] is
// called
available atomic.Bool

*cfg
}

Expand Down Expand Up @@ -65,89 +72,47 @@ type cfg struct {

treeService TreeService

s *engine.StorageEngine
}

func defaultCfg() *cfg {
return &cfg{}
storage *engine.StorageEngine
}

// New creates, initializes and returns new Server instance.
func New(opts ...Option) *Server {
c := defaultCfg()

for _, opt := range opts {
opt(c)
// Must be marked as available with [Server.MarkReady] when all the
// components for serving are ready. Before [Server.MarkReady] call
// only health checks are available.
func New(key *ecdsa.PrivateKey, authorizedKeys [][]byte, healthChecker HealthChecker) *Server {
cfg := &cfg{
key: key,
allowedKeys: authorizedKeys,
healthChecker: healthChecker,
}

return &Server{
cfg: c,
}
}

// WithKey returns option to set private key
// used for signing responses.
func WithKey(key *ecdsa.PrivateKey) Option {
return func(c *cfg) {
c.key = key
cfg: cfg,
}
}

// WithAuthorizedKeys returns option to add list of public
// keys that have rights to use Control service.
func WithAuthorizedKeys(keys [][]byte) Option {
return func(c *cfg) {
c.allowedKeys = append(c.allowedKeys, keys...)
// MarkReady marks server available. Before this call none of the other calls
// are available except for the health checks.
func (s *Server) MarkReady(e *engine.StorageEngine, nm netmap.Source, c container.Source, r *replicator.Replicator, st NodeState, tr TreeService) {
panicOnNil := func(name string, service any) {
if service == nil {
panic(fmt.Sprintf("'%s' is nil", name))
}
}
}

// WithHealthChecker returns option to set component
// to calculate node health status.
func WithHealthChecker(hc HealthChecker) Option {
return func(c *cfg) {
c.healthChecker = hc
}
}

// WithNetMapSource returns option to set network map storage.
func WithNetMapSource(netMapSrc netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = netMapSrc
}
}

// WithContainerSource returns option to set container storage.
func WithContainerSource(cnrSrc container.Source) Option {
return func(c *cfg) {
c.cnrSrc = cnrSrc
}
}

// WithReplicator returns option to set network map storage.
func WithReplicator(r *replicator.Replicator) Option {
return func(c *cfg) {
c.replicator = r
}
}

// WithNodeState returns option to set node network state component.
func WithNodeState(state NodeState) Option {
return func(c *cfg) {
c.nodeState = state
}
}

// WithLocalStorage returns option to set local storage engine that
// contains information about shards.
func WithLocalStorage(engine *engine.StorageEngine) Option {
return func(c *cfg) {
c.s = engine
}
}

// WithTreeService returns an option to set tree service.
func WithTreeService(s TreeService) Option {
return func(c *cfg) {
c.treeService = s
}
panicOnNil("storage engine", e)
panicOnNil("netmap source", nm)
panicOnNil("container source", c)
panicOnNil("replicator", r)
panicOnNil("node state", st)
panicOnNil("tree service", st)

s.storage = e
s.netMapSrc = nm
s.cnrSrc = c
s.replicator = r
s.nodeState = st
s.treeService = tr

s.available.Store(true)
}
7 changes: 6 additions & 1 deletion pkg/services/control/server/set_netmap_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ func (s *Server) SetNetmapStatus(_ context.Context, req *control.SetNetmapStatus
return nil, status.Error(codes.PermissionDenied, err.Error())
}

var err error
// check availability
err := s.ready()
if err != nil {
return nil, err
}

bodyReq := req.GetBody()
st := bodyReq.GetStatus()
force := bodyReq.GetForceMaintenance()
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/set_shard_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

var (
m mode.Mode

Expand All @@ -37,7 +43,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
}

for _, shardID := range s.getShardIDList(req.Body.GetShard_ID()) {
err = s.s.SetShardMode(shardID, m, req.Body.GetResetErrorCounter())
err = s.storage.SetShardMode(shardID, m, req.Body.GetResetErrorCounter())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
Loading

0 comments on commit 77404cc

Please sign in to comment.