Skip to content

Commit

Permalink
fix/Nil panic in control service (#2655)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Nov 21, 2023
2 parents 67c5b02 + 77404cc commit 60e564f
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 98 deletions.
3 changes: 3 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone"
tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
Expand Down Expand Up @@ -318,6 +319,8 @@ type shared struct {
treeService *tree.Service

metricsCollector *metrics.NodeMetrics

control *controlSvc.Server
}

func (s *shared) resetCaches() {
Expand Down
16 changes: 2 additions & 14 deletions cmd/neofs-node/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,7 @@ func initControlService(c *cfg) {
rawPubs = append(rawPubs, pubs[i].Bytes())
}

ctlSvc := controlSvc.New(
controlSvc.WithKey(&c.key.PrivateKey),
controlSvc.WithAuthorizedKeys(rawPubs),
controlSvc.WithHealthChecker(c),
controlSvc.WithNetMapSource(c.netMapSource),
controlSvc.WithContainerSource(c.cfgObject.cnrSource),
controlSvc.WithReplicator(c.replicator),
controlSvc.WithNodeState(c),
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
controlSvc.WithTreeService(treeSynchronizer{
c.treeService,
}),
)
c.shared.control = controlSvc.New(&c.key.PrivateKey, rawPubs, c)

lis, err := net.Listen("tcp", endpoint)
if err != nil {
Expand All @@ -62,7 +50,7 @@ func initControlService(c *cfg) {
stopGRPC("NeoFS Control API", c.cfgControlService.server, c.log)
})

control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)
control.RegisterControlServiceServer(c.cfgControlService.server, c.shared.control)
c.wg.Add(1)
go func() {
runAndLog(c, "control", false, func(c *cfg) {
Expand Down
9 changes: 9 additions & 0 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ func initApp(c *cfg) {
initAndLog(c, "morph notifications", listenMorphNotifications)

c.workers = append(c.workers, newWorkerFromFunc(c.configWatcher))

c.shared.control.MarkReady(
c.cfgObject.cfgLocalStorage.localStorage,
c.netMapSource,
c.cfgObject.cnrSource,
c.replicator,
c,
treeSynchronizer{c.treeService},
)
}

func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*c
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())

var prm shard.DumpPrm
prm.WithPath(req.GetBody().GetFilepath())
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())

err = s.s.DumpShard(shardID, prm)
err = s.storage.DumpShard(shardID, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)

res, err := s.s.Evacuate(prm)
res, err := s.storage.Evacuate(prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/flush_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) (
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
var prm engine.FlushWriteCachePrm
prm.SetShardID(shardID)

_, err = s.s.FlushWriteCache(prm)
_, err = s.storage.FlushWriteCache(prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest)
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err := s.ready()
if err != nil {
return nil, err
}

binAddrList := req.GetBody().GetAddressList()
addrList := make([]oid.Address, len(binAddrList))

Expand All @@ -41,7 +47,7 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest)
prm.WithForceRemoval()
prm.WithAddress(addrList[i])

_, err := s.s.Delete(prm)
_, err := s.storage.Delete(prm)
if err != nil && firstErr == nil {
firstErr = err
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/services/control/server/helpers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package control

import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// call only if `ready` returned no error.
func (s *Server) getShardIDList(raw [][]byte) []*shard.ID {
if len(raw) != 0 {
res := make([]*shard.ID, 0, len(raw))
Expand All @@ -11,10 +16,18 @@ func (s *Server) getShardIDList(raw [][]byte) []*shard.ID {
return res
}

info := s.s.DumpInfo()
info := s.storage.DumpInfo()
res := make([]*shard.ID, 0, len(info.Shards))
for i := range info.Shards {
res = append(res, info.Shards[i].ID)
}
return res
}

func (s *Server) ready() error {
if !s.available.Load() {
return status.Error(codes.Unavailable, "service has not been completely initialized yet")
}

return nil
}
7 changes: 6 additions & 1 deletion pkg/services/control/server/list_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err := s.ready()
if err != nil {
return nil, err
}
// create and fill response
resp := new(control.ListShardsResponse)

body := new(control.ListShardsResponse_Body)
resp.SetBody(body)

info := s.s.DumpInfo()
info := s.storage.DumpInfo()

shardInfos := make([]*control.ShardInfo, 0, len(info.Shards))

Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardReques
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())

var prm shard.RestorePrm
prm.WithPath(req.GetBody().GetFilepath())
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())

err = s.s.RestoreShard(shardID, prm)
err = s.storage.RestoreShard(shardID, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
113 changes: 39 additions & 74 deletions pkg/services/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package control

import (
"crypto/ecdsa"
"fmt"
"sync/atomic"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
Expand All @@ -13,6 +15,11 @@ import (
// Server is an entity that serves
// Control service on storage node.
type Server struct {
// initialization sync; locks any calls except
// health checks before [Server.MarkReady] is
// called
available atomic.Bool

*cfg
}

Expand Down Expand Up @@ -65,89 +72,47 @@ type cfg struct {

treeService TreeService

s *engine.StorageEngine
}

func defaultCfg() *cfg {
return &cfg{}
storage *engine.StorageEngine
}

// New creates, initializes and returns new Server instance.
func New(opts ...Option) *Server {
c := defaultCfg()

for _, opt := range opts {
opt(c)
// Must be marked as available with [Server.MarkReady] when all the
// components for serving are ready. Before [Server.MarkReady] call
// only health checks are available.
func New(key *ecdsa.PrivateKey, authorizedKeys [][]byte, healthChecker HealthChecker) *Server {
cfg := &cfg{
key: key,
allowedKeys: authorizedKeys,
healthChecker: healthChecker,
}

return &Server{
cfg: c,
}
}

// WithKey returns option to set private key
// used for signing responses.
func WithKey(key *ecdsa.PrivateKey) Option {
return func(c *cfg) {
c.key = key
cfg: cfg,
}
}

// WithAuthorizedKeys returns option to add list of public
// keys that have rights to use Control service.
func WithAuthorizedKeys(keys [][]byte) Option {
return func(c *cfg) {
c.allowedKeys = append(c.allowedKeys, keys...)
// MarkReady marks server available. Before this call none of the other calls
// are available except for the health checks.
func (s *Server) MarkReady(e *engine.StorageEngine, nm netmap.Source, c container.Source, r *replicator.Replicator, st NodeState, tr TreeService) {
panicOnNil := func(name string, service any) {
if service == nil {
panic(fmt.Sprintf("'%s' is nil", name))
}
}
}

// WithHealthChecker returns option to set component
// to calculate node health status.
func WithHealthChecker(hc HealthChecker) Option {
return func(c *cfg) {
c.healthChecker = hc
}
}

// WithNetMapSource returns option to set network map storage.
func WithNetMapSource(netMapSrc netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = netMapSrc
}
}

// WithContainerSource returns option to set container storage.
func WithContainerSource(cnrSrc container.Source) Option {
return func(c *cfg) {
c.cnrSrc = cnrSrc
}
}

// WithReplicator returns option to set network map storage.
func WithReplicator(r *replicator.Replicator) Option {
return func(c *cfg) {
c.replicator = r
}
}

// WithNodeState returns option to set node network state component.
func WithNodeState(state NodeState) Option {
return func(c *cfg) {
c.nodeState = state
}
}

// WithLocalStorage returns option to set local storage engine that
// contains information about shards.
func WithLocalStorage(engine *engine.StorageEngine) Option {
return func(c *cfg) {
c.s = engine
}
}

// WithTreeService returns an option to set tree service.
func WithTreeService(s TreeService) Option {
return func(c *cfg) {
c.treeService = s
}
panicOnNil("storage engine", e)
panicOnNil("netmap source", nm)
panicOnNil("container source", c)
panicOnNil("replicator", r)
panicOnNil("node state", st)
panicOnNil("tree service", st)

s.storage = e
s.netMapSrc = nm
s.cnrSrc = c
s.replicator = r
s.nodeState = st
s.treeService = tr

s.available.Store(true)
}
7 changes: 6 additions & 1 deletion pkg/services/control/server/set_netmap_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ func (s *Server) SetNetmapStatus(_ context.Context, req *control.SetNetmapStatus
return nil, status.Error(codes.PermissionDenied, err.Error())
}

var err error
// check availability
err := s.ready()
if err != nil {
return nil, err
}

bodyReq := req.GetBody()
st := bodyReq.GetStatus()
force := bodyReq.GetForceMaintenance()
Expand Down
8 changes: 7 additions & 1 deletion pkg/services/control/server/set_shard_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

var (
m mode.Mode

Expand All @@ -37,7 +43,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
}

for _, shardID := range s.getShardIDList(req.Body.GetShard_ID()) {
err = s.s.SetShardMode(shardID, m, req.Body.GetResetErrorCounter())
err = s.storage.SetShardMode(shardID, m, req.Body.GetResetErrorCounter())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
Loading

0 comments on commit 60e564f

Please sign in to comment.