From 803e9a41cdb65f95662b34cec09d8a725b18922d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 18 Jan 2024 10:31:12 +0400 Subject: [PATCH 1/3] object/get: Refactor storage policy processing Continues 2f2933817b45b776cca76d8d32e91c8c3d1139d1 for `ObjectService`'s `Get`/`Head`/`GetRange` server handlers. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 2 + cmd/neofs-node/object.go | 28 +++++++---- cmd/neofs-node/policy.go | 45 +++++++++++++++++ pkg/services/object/get/container.go | 75 +++++++++++++++++----------- pkg/services/object/get/exec.go | 21 -------- pkg/services/object/get/get_test.go | 35 ++++++------- pkg/services/object/get/service.go | 57 ++++++++++----------- pkg/services/object/get/util.go | 9 ---- 8 files changed, 153 insertions(+), 119 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 37d8ec46e1..05171db716 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -472,6 +472,8 @@ type cfgObject struct { cfgLocalStorage cfgLocalStorage tombstoneLifetime uint64 + + containerNodes *containerNodes } type cfgLocalStorage struct { diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index e63cee3c28..612e74cce3 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -224,20 +224,12 @@ func initObjectService(c *cfg) { policer.WithObjectBatchSize(c.applicationConfiguration.policer.objectBatchSize), ) - traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.workers = append(c.workers, c.shared.policer) - sGet := getsvc.New( + sGet := getsvc.New(c, getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), getsvc.WithClientConstructor(coreConstructor), - getsvc.WithTraverserGenerator( - traverseGen.WithTraverseOptions( - placement.SuccessAfter(1), - ), - ), - getsvc.WithNetMapSource(c.netMapSource), getsvc.WithKeyStorage(keyStorage), ) @@ -250,6 +242,7 @@ func initObjectService(c *cfg) { cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource) fatalOnErr(err) + c.cfgObject.containerNodes = cnrNodes sSearch := searchsvc.New(newRemoteContainerNodes(cnrNodes, c.IsLocalKey), searchsvc.WithLogger(c.log), @@ -719,3 +712,20 @@ func (o objectSource) Search(ctx context.Context, cnr cid.ID, filters objectSDK. return sw.ids, nil } + +// IsLocalNodePublicKey checks whether given binary-encoded public key is +// assigned in the network map to a local storage node. +// +// IsLocalNodePublicKey implements [getsvc.NeoFSNetwork]. +func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) } + +// GetNodesForObject reads storage policy of the referenced container from the +// underlying container storage, reads network map at the specified epoch from +// the underlying storage, applies the storage policy to it and returns sorted +// lists of selected storage nodes along with the per-list numbers of primary +// object holders. Resulting slices must not be changed. +// +// GetNodesForObject implements [getsvc.NeoFSNetwork]. +func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { + return c.cfgObject.containerNodes.getNodesForObject(addr) +} diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index 2c22404b84..cebc4d7ac2 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/netmap" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) // storagePolicyRes structures persistent storage policy application result for @@ -152,3 +153,47 @@ func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[con cache.Add(cacheKey, result) return result, nil } + +// getNodesForObject reads storage policy of the referenced container from the +// underlying container storage, reads network map at the specified epoch from +// the underlying storage, applies the storage policy to it and returns sorted +// lists of selected storage nodes along with the per-list numbers of primary +// object holders. Resulting slices must not be changed. +func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { + epoch, err := x.network.Epoch() + if err != nil { + return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err) + } + cnrID := addr.Container() + cnr, err := x.containers.Get(cnrID) + if err != nil { + return nil, nil, fmt.Errorf("read container by ID: %w", err) + } + networkMap, err := x.network.GetNetMapByEpoch(epoch) + if err != nil { + return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err) + } + + policy := cnr.Value.PlacementPolicy() + nodeLists, err := networkMap.ContainerNodes(policy, cnrID) + if err != nil { + return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) + } + if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil { + return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err) + } + if len(nodeLists) != policy.NumberOfReplicas() { + return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ + "diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas()) + } + + primaryCounts := make([]uint, len(nodeLists)) + for i := range nodeLists { + if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) { + return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ + "invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)", + epoch, i, len(nodeLists), policy.NumberOfReplicas()) + } + } + return nodeLists, primaryCounts, nil +} diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 5a902c2eff..f5cb585777 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -4,6 +4,8 @@ import ( "context" "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -15,71 +17,84 @@ func (exec *execCtx) executeOnContainer() { exec.log.Debug("trying to execute in container...") - // initialize epoch number - epoch, err := exec.svc.currentEpochReceiver.currentEpoch() + nodeLists, primaryCounts, err := exec.svc.neoFSNet.GetNodesForObject(exec.address()) if err != nil { exec.status = statusUndefined exec.err = err - exec.log.Debug("could not get current epoch number", zap.Error(err)) + exec.log.Debug("failed to list storage nodes for the object", zap.Error(err)) return } - exec.processEpoch(epoch) -} - -func (exec *execCtx) processEpoch(epoch uint64) bool { - exec.log.Debug("process epoch", - zap.Uint64("number", epoch), - ) - - traverser, ok := exec.generateTraverser(exec.address(), epoch) - if !ok { - return true - } - ctx, cancel := context.WithCancel(exec.context()) defer cancel() exec.status = statusUndefined mProcessedNodes := make(map[string]struct{}) - - for { - addrs := traverser.Next() - if len(addrs) == 0 { - exec.log.Debug("no more nodes, abort placement iteration") - - return false + var endpoints, externalEndpoints network.AddressGroup + var j, jLim uint + primary := true + + for i := 0; i < len(nodeLists); i++ { // do not use for-range! + if primary { + j, jLim = 0, primaryCounts[i] + } else { + j, jLim = primaryCounts[i], uint(len(nodeLists[i])) } - for i := range addrs { + for ; j < jLim; j++ { select { case <-ctx.Done(): exec.log.Debug("interrupt placement iteration by context", zap.String("error", ctx.Err().Error()), ) - return true + return default: } - strKey := string(addrs[i].PublicKey()) - if _, ok = mProcessedNodes[strKey]; ok { + bKey := nodeLists[i][j].PublicKey() + strKey := string(bKey) + if _, ok := mProcessedNodes[strKey]; ok || exec.svc.neoFSNet.IsLocalNodePublicKey(bKey) { continue } mProcessedNodes[strKey] = struct{}{} + if err = endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil { + // critical error that may ultimately block the storage service. Normally it + // should not appear because entry into the network map under strict control + exec.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(err)) + continue + } + // TODO: #1142 consider parallel execution // TODO: #1142 consider optimization: if status == SPLIT we can continue until // we reach the best result - split info with linking object ID. var info client.NodeInfo - - client.NodeInfoFromNetmapElement(&info, addrs[i]) + info.SetAddressGroup(endpoints) + info.SetPublicKey(bKey) + if ext := nodeLists[i][j].ExternalAddresses(); len(ext) > 0 { + if err = externalEndpoints.FromStringSlice(ext); err != nil { + // less critical since the main ones must work, but also important + exec.log.Warn("failed to decode external network endpoints of the storage node, ignore them", + zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), + zap.Strings("endpoints", ext), zap.Error(err)) + } else { + info.SetExternalAddressGroup(externalEndpoints) + } + } if exec.processNode(info) { exec.log.Debug("completing the operation") - return true + return } } + + if primary && i == len(nodeLists)-1 { + // switch to reserve nodes + primary = false + i = -1 + } } } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index f11b3be265..ada3e44017 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -8,7 +8,6 @@ import ( clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -162,26 +161,6 @@ func (exec *execCtx) headOnly() bool { return exec.head } -func (exec *execCtx) generateTraverser(addr oid.Address, epoch uint64) (*placement.Traverser, bool) { - obj := addr.Object() - - t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, epoch) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug("could not generate container traverser", - zap.String("error", err.Error()), - ) - - return nil, false - case err == nil: - return t, true - } -} - func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) { log := exec.log if rng != nil { diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index cb3facdf7c..22a826cb7c 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -34,9 +34,9 @@ type testStorage struct { phy map[string]*objectSDK.Object } -type testTraverserGenerator struct { +type testNeoFS struct { c container.Container - b map[uint64]placement.Builder + b placement.Builder } type testPlacementBuilder struct { @@ -68,19 +68,21 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { - opts := make([]placement.Option, 0, 4) - opts = append(opts, - placement.ForContainer(g.c), - placement.UseBuilder(g.b[e]), - placement.SuccessAfter(1), - ) +func (g *testNeoFS) IsLocalNodePublicKey([]byte) bool { return false } - if obj != nil { - opts = append(opts, placement.ForObject(*obj)) +func (g *testNeoFS) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, error) { + obj := addr.Object() + nodeLists, err := g.b.BuildPlacement(addr.Container(), &obj, netmap.PlacementPolicy{}) // policy is ignored in this test + if err != nil { + return nil, nil, err + } + + primaryNums := make([]uint, len(nodeLists)) + for i := range primaryNums { + primaryNums[i] = 1 } - return placement.NewTraverser(opts...) + return nodeLists, primaryNums, nil } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { @@ -500,16 +502,11 @@ func TestGetRemoteSmall(t *testing.T) { svc.localStorage = newTestStorage() svc.assembly = true - const curEpoch = 13 - - svc.traverserGenerator = &testTraverserGenerator{ + svc.neoFSNet = &testNeoFS{ c: cnr, - b: map[uint64]placement.Builder{ - curEpoch: b, - }, + b: b, } svc.clientCache = c - svc.currentEpochReceiver = testEpochReceiver(curEpoch) return svc } diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index a53eba0d55..94d2456bb6 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -2,19 +2,39 @@ package getsvc import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) +// NeoFSNetwork provides access to the NeoFS network to get information +// necessary for the [Service] to work. +type NeoFSNetwork interface { + // GetNodesForObject returns descriptors of storage nodes matching storage + // policy of the referenced object for now. Nodes are identified by their public + // keys and can be repeated in different lists. The second value specifies the + // number (N) of primary object holders for each list (L) so: + // - size of each L >= N; + // - first N nodes of each L are primary data holders while others (if any) + // are backup. + // + // GetContainerNodes does not change resulting slices and their elements. + // + // Returns [apistatus.ContainerNotFound] if requested container is missing in + // the network. + GetNodesForObject(oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) + // IsLocalNodePublicKey checks whether given binary-encoded public key is + // assigned in the network map to a local storage node providing [Service]. + IsLocalNodePublicKey([]byte) bool +} + // Service utility serving requests of Object.Get service. type Service struct { *cfg + neoFSNet NeoFSNetwork } // Option is a Service's constructor option. @@ -37,14 +57,6 @@ type cfg struct { get(client.NodeInfo) (getClient, error) } - traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) - } - - currentEpochReceiver interface { - currentEpoch() (uint64, error) - } - keyStore *util.KeyStorage } @@ -59,7 +71,7 @@ func defaultCfg() *cfg { // New creates, initializes and returns utility serving // Object.Get service requests. -func New(opts ...Option) *Service { +func New(neoFSNet NeoFSNetwork, opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -67,7 +79,8 @@ func New(opts ...Option) *Service { } return &Service{ - cfg: c, + cfg: c, + neoFSNet: neoFSNet, } } @@ -104,24 +117,6 @@ func WithClientConstructor(v ClientConstructor) Option { } } -// WithTraverserGenerator returns option to set generator of -// placement traverser to get the objects from containers. -func WithTraverserGenerator(t *util.TraverserGenerator) Option { - return func(c *cfg) { - c.traverserGenerator = t - } -} - -// WithNetMapSource returns option to set network -// map storage to receive current network state. -func WithNetMapSource(nmSrc netmap.Source) Option { - return func(c *cfg) { - c.currentEpochReceiver = &nmSrcWrapper{ - nmSrc: nmSrc, - } - } -} - // WithKeyStorage returns option to set private // key storage for session tokens and node key. func WithKeyStorage(store *util.KeyStorage) Option { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 7de1fbf7d4..ee7c012e18 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -7,7 +7,6 @@ import ( "io" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" @@ -45,10 +44,6 @@ type hasherWrapper struct { hash io.Writer } -type nmSrcWrapper struct { - nmSrc netmap.Source -} - func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.New(), @@ -256,10 +251,6 @@ func (h *hasherWrapper) WriteChunk(p []byte) error { return err } -func (n *nmSrcWrapper) currentEpoch() (uint64, error) { - return n.nmSrc.Epoch() -} - func prettyRange(rng *object.Range) string { return fmt.Sprintf("[%d:%d]", rng.GetOffset(), rng.GetLength()) } From 5389a1e269a9541f1d5795771782d23fd686ee58 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 18 Jul 2024 15:39:34 +0400 Subject: [PATCH 2/3] node/policy: Cache object policy application results Continues 10d05a45bd72758c661f9727decbde6efa2bf651 for sorting container nodes for objects. Since each container may include plenty of objects, cache size limit is chosen - again heuristically - 10 times bigger, i.e. 10K. Refs #2692. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + cmd/neofs-node/policy.go | 161 +++++++++++++++++++++------------- cmd/neofs-node/policy_test.go | 136 +++++++++++++++++++++++++--- 3 files changed, 227 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2306a18ccd..7c88815d77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Changelog for NeoFS Node - `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802) - `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892) - Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897) +- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896) ### Removed diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index cebc4d7ac2..603681a1e1 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -14,18 +14,30 @@ import ( // storagePolicyRes structures persistent storage policy application result for // particular container and network map incl. error. type storagePolicyRes struct { - nodeSets [][]netmapsdk.NodeInfo - err error + nodeSets [][]netmapsdk.NodeInfo + repCounts []uint + err error } -type containerNodesCacheKey struct { - epoch uint64 - cnr cid.ID -} +type ( + containerNodesCacheKey struct { + epoch uint64 + cnr cid.ID + } + objectNodesCacheKey struct { + epoch uint64 + addr oid.Address + } +) -// max number of container storage policy applications results cached by -// containerNodes. -const cachedContainerNodesNum = 1000 +const ( + // max number of container storage policy applications results cached by + // containerNodes. + cachedContainerNodesNum = 1000 + // max number of object storage policy applications results cached by + // containerNodes. + cachedObjectNodesNum = 10000 +) // containerNodes wraps NeoFS network state to apply container storage policies. // @@ -36,7 +48,8 @@ type containerNodes struct { containers container.Source network netmap.Source - cache *lru.Cache[containerNodesCacheKey, storagePolicyRes] + cache *lru.Cache[containerNodesCacheKey, storagePolicyRes] + objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes] } func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { @@ -44,10 +57,15 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con if err != nil { return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err) } + lo, err := lru.New[objectNodesCacheKey, storagePolicyRes](cachedObjectNodesNum) + if err != nil { + return nil, fmt.Errorf("create LRU container node cache for objects: %w", err) + } return &containerNodes{ containers: containers, network: network, cache: l, + objCache: lo, }, nil } @@ -117,6 +135,47 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, return nil } +// getNodesForObject reads storage policy of the referenced container from the +// underlying container storage, reads network map at the specified epoch from +// the underlying storage, applies the storage policy to it and returns sorted +// lists of selected storage nodes along with the per-list numbers of primary +// object holders. Resulting slices must not be changed. +func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { + curEpoch, err := x.network.Epoch() + if err != nil { + return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err) + } + cacheKey := objectNodesCacheKey{curEpoch, addr} + res, ok := x.objCache.Get(cacheKey) + if ok { + return res.nodeSets, res.repCounts, res.err + } + cnrRes, networkMap, err := (&containerPolicyContext{ + id: addr.Container(), + containers: x.containers, + network: x.network, + }).applyToNetmap(curEpoch, x.cache) + if err != nil || cnrRes.err != nil { + if err == nil { + err = cnrRes.err // cached in x.cache, no need to store in x.objCache + } + return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + } + if networkMap == nil { + if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil { + // non-persistent error => do not cache + return nil, nil, fmt.Errorf("read network map by epoch: %w", err) + } + } + res.repCounts = cnrRes.repCounts + res.nodeSets, res.err = networkMap.PlacementVectors(cnrRes.nodeSets, addr.Object()) + if res.err != nil { + res.err = fmt.Errorf("sort container nodes for object: %w", res.err) + } + x.objCache.Add(cacheKey, res) + return res.nodeSets, res.repCounts, res.err +} + // preserves context of storage policy processing for the particular container. type containerPolicyContext struct { // static @@ -131,9 +190,18 @@ type containerPolicyContext struct { // ID to the network map at the specified epoch. applyAtEpoch checks existing // results in the cache and stores new results in it. func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, error) { + res, _, err := x.applyToNetmap(epoch, cache) + return res, err +} + +// applyToNetmap applies storage policy of container referenced by parameterized +// ID to the network map at the specified epoch. applyAtEpoch checks existing +// results in the cache and stores new results in it. Network map is returned if +// it was requested, i.e. on cache miss only. +func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, *netmapsdk.NetMap, error) { cacheKey := containerNodesCacheKey{epoch, x.id} if result, ok := cache.Get(cacheKey); ok { - return result, nil + return result, nil, nil } var result storagePolicyRes var err error @@ -141,59 +209,34 @@ func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[con x.cnr, err = x.containers.Get(x.id) if err != nil { // non-persistent error => do not cache - return result, fmt.Errorf("read container by ID: %w", err) + return result, nil, fmt.Errorf("read container by ID: %w", err) } } networkMap, err := x.network.GetNetMapByEpoch(epoch) if err != nil { // non-persistent error => do not cache - return result, fmt.Errorf("read network map by epoch: %w", err) - } - result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id) - cache.Add(cacheKey, result) - return result, nil -} - -// getNodesForObject reads storage policy of the referenced container from the -// underlying container storage, reads network map at the specified epoch from -// the underlying storage, applies the storage policy to it and returns sorted -// lists of selected storage nodes along with the per-list numbers of primary -// object holders. Resulting slices must not be changed. -func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { - epoch, err := x.network.Epoch() - if err != nil { - return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err) - } - cnrID := addr.Container() - cnr, err := x.containers.Get(cnrID) - if err != nil { - return nil, nil, fmt.Errorf("read container by ID: %w", err) - } - networkMap, err := x.network.GetNetMapByEpoch(epoch) - if err != nil { - return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err) - } - - policy := cnr.Value.PlacementPolicy() - nodeLists, err := networkMap.ContainerNodes(policy, cnrID) - if err != nil { - return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) - } - if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil { - return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err) - } - if len(nodeLists) != policy.NumberOfReplicas() { - return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ - "diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas()) - } - - primaryCounts := make([]uint, len(nodeLists)) - for i := range nodeLists { - if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) { - return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ - "invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)", - epoch, i, len(nodeLists), policy.NumberOfReplicas()) + return result, nil, fmt.Errorf("read network map by epoch: %w", err) + } + policy := x.cnr.Value.PlacementPolicy() + result.nodeSets, result.err = networkMap.ContainerNodes(policy, x.id) + if result.err == nil { + // ContainerNodes should control following, but still better to double-check + if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum { + result.err = fmt.Errorf("invalid result of container's storage policy application to the network map: "+ + "diff number of storage node sets (%d) and required replica descriptors (%d)", + len(result.nodeSets), policyNum) + } else { + result.repCounts = make([]uint, len(result.nodeSets)) + for i := range result.nodeSets { + if result.repCounts[i] = uint(policy.ReplicaNumberByIndex(i)); result.repCounts[i] > uint(len(result.nodeSets[i])) { + result.err = fmt.Errorf("invalid result of container's storage policy application to the network map: "+ + "invalid storage node set #%d: number of nodes (%d) is less than minimum required by the container policy (%d)", + i, len(result.nodeSets[i]), result.repCounts[i]) + break + } + } } } - return nodeLists, primaryCounts, nil + cache.Add(cacheKey, result) + return result, networkMap, nil } diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go index 66121e1775..150348e6b1 100644 --- a/cmd/neofs-node/policy_test.go +++ b/cmd/neofs-node/policy_test.go @@ -4,6 +4,8 @@ import ( "crypto/rand" "errors" "fmt" + "strconv" + "strings" "testing" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" @@ -11,9 +13,12 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/netmap" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) +const anyEpoch = 42 + type testContainer struct { id cid.ID val container.Container @@ -84,9 +89,8 @@ func (x *testNetwork) Epoch() (uint64, error) { return x.epoch, x.epochErr } -func newNetmapWithContainer(tb testing.TB, nodeNum int, selected []int) ([]netmap.NodeInfo, *netmap.NetMap, container.Container) { +func newNetmapWithContainer(tb testing.TB, nodeNum int, selected ...[]int) ([]netmap.NodeInfo, *netmap.NetMap, container.Container) { nodes := make([]netmap.NodeInfo, nodeNum) -nextNode: for i := range nodes { key := make([]byte, 33) _, err := rand.Read(key) @@ -94,28 +98,36 @@ nextNode: nodes[i].SetPublicKey(key) for j := range selected { - if i == selected[j] { - nodes[i].SetAttribute("attr", "true") - continue nextNode + for k := range selected[j] { + if i == selected[j][k] { + nodes[i].SetAttribute("attr"+strconv.Itoa(j), "true") + break + } } } - - nodes[i].SetAttribute("attr", "false") } var networkMap netmap.NetMap networkMap.SetNodes(nodes) + var sbRpl, sbSlc, sbFlt strings.Builder + for i := range selected { + sbFlt.WriteString(fmt.Sprintf("FILTER attr%d EQ true AS F%d\n", i, i)) + sbSlc.WriteString(fmt.Sprintf("SELECT %d FROM F%d AS S%d\n", len(selected[i]), i, i)) + sbRpl.WriteString(fmt.Sprintf("REP %d IN S%d\n", len(selected[i]), i)) + } var policy netmap.PlacementPolicy - strPolicy := fmt.Sprintf("REP %d CBF 1 SELECT %d FROM F FILTER attr EQ true AS F", len(selected), len(selected)) - require.NoError(tb, policy.DecodeString(strPolicy)) + strPolicy := fmt.Sprintf("%sCBF 1\n%s%s", &sbRpl, &sbSlc, &sbFlt) + require.NoError(tb, policy.DecodeString(strPolicy), strPolicy) nodeSets, err := networkMap.ContainerNodes(policy, cidtest.ID()) require.NoError(tb, err) - require.Len(tb, nodeSets, 1) - require.Len(tb, nodeSets[0], len(selected)) + require.Len(tb, nodeSets, len(selected)) for i := range selected { - require.Contains(tb, nodeSets[0], nodes[selected[i]], i) + require.Len(tb, nodeSets[i], len(selected[i]), i) + for j := range selected[i] { + require.Contains(tb, nodeSets[i], nodes[selected[i][j]], [2]int{i, j}) + } } var cnr container.Container @@ -500,3 +512,103 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing. } }) } + +func TestContainerNodes_GetNodesForObject(t *testing.T) { + anyAddr := oidtest.Address() + t.Run("read current epoch failure", func(t *testing.T) { + epochErr := errors.New("any epoch error") + network := &testNetwork{epochErr: epochErr} + ns, err := newContainerNodes(new(testContainer), network) + require.NoError(t, err) + + for n := 1; n < 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.ErrorIs(t, err, epochErr) + require.EqualError(t, err, "read current NeoFS epoch: any epoch error") + // such error must not be cached + network.assertEpochCallCount(t, n) + } + }) + t.Run("read container failure", func(t *testing.T) { + cnrErr := errors.New("any container error") + cnrs := &testContainer{id: anyAddr.Container(), err: cnrErr} + ns, err := newContainerNodes(cnrs, &testNetwork{epoch: anyEpoch}) + require.NoError(t, err) + + for n := 1; n < 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.ErrorIs(t, err, cnrErr) + require.EqualError(t, err, "select container nodes for current epoch #42: read container by ID: any container error") + // such error must not be cached + cnrs.assertCalledNTimesWith(t, n, anyAddr.Container()) + } + }) + t.Run("read netmap failure", func(t *testing.T) { + curNetmapErr := errors.New("any current netmap error") + network := &testNetwork{epoch: anyEpoch, curNetmapErr: curNetmapErr} + ns, err := newContainerNodes(&testContainer{id: anyAddr.Container()}, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.ErrorIs(t, err, curNetmapErr) + require.EqualError(t, err, "select container nodes for current epoch #42: read network map by epoch: any current netmap error") + network.assertEpochCallCount(t, n) + // such error must not be cached + network.assertNetmapCalledNTimes(t, n, network.epoch) + } + }) + t.Run("apply policy failures", func(t *testing.T) { + t.Run("select container nodes", func(t *testing.T) { + _, _, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + failNetmap := new(netmap.NetMap) + _, policyErr := failNetmap.ContainerNodes(cnr.PlacementPolicy(), anyAddr.Container()) + require.Error(t, policyErr) + + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: failNetmap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, fmt.Sprintf("select container nodes for current epoch #42: %v", policyErr)) + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + }) + t.Run("OK", func(t *testing.T) { + nodes, networkMap, cnr := newNetmapWithContainer(t, 10, [][]int{ + {1, 3}, + {2, 4, 6}, + {5}, + {0, 1, 7, 8, 9}, + }...) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + nodeLists, primCounts, err := ns.getNodesForObject(anyAddr) + require.NoError(t, err) + require.Len(t, primCounts, 4) + require.Len(t, nodeLists, 4) + require.EqualValues(t, 2, primCounts[0]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[1], nodes[3]}, nodeLists[0]) + require.EqualValues(t, 3, primCounts[1]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[2], nodes[4], nodes[6]}, nodeLists[1]) + require.EqualValues(t, 1, primCounts[2]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[5]}, nodeLists[2]) + require.EqualValues(t, 5, primCounts[3]) + require.ElementsMatch(t, []netmap.NodeInfo{nodes[0], nodes[1], nodes[7], nodes[8], nodes[9]}, nodeLists[3]) + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) +} From 8b82bbd09bed07fbdf8ba416fda1790d0743130f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 18 Jul 2024 17:16:20 +0400 Subject: [PATCH 3/3] node/policy: Test functions selecting nodes for container and objects They are vendored by NeoFS SDK, but the reaction to their unexpected behavior also needs to be tested. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/policy.go | 39 ++++++++---- cmd/neofs-node/policy_test.go | 110 ++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 13 deletions(-) diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index 603681a1e1..a98d0a43d0 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -39,6 +39,11 @@ const ( cachedObjectNodesNum = 10000 ) +type ( + getContainerNodesFunc = func(netmapsdk.NetMap, netmapsdk.PlacementPolicy, cid.ID) ([][]netmapsdk.NodeInfo, error) + sortContainerNodesFunc = func(netmapsdk.NetMap, [][]netmapsdk.NodeInfo, oid.ID) ([][]netmapsdk.NodeInfo, error) +) + // containerNodes wraps NeoFS network state to apply container storage policies. // // Since policy application results are consistent for fixed container and @@ -50,6 +55,10 @@ type containerNodes struct { cache *lru.Cache[containerNodesCacheKey, storagePolicyRes] objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes] + + // for testing + getContainerNodesFunc getContainerNodesFunc + sortContainerNodesFunc sortContainerNodesFunc } func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { @@ -62,10 +71,12 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con return nil, fmt.Errorf("create LRU container node cache for objects: %w", err) } return &containerNodes{ - containers: containers, - network: network, - cache: l, - objCache: lo, + containers: containers, + network: network, + cache: l, + objCache: lo, + getContainerNodesFunc: netmapsdk.NetMap.ContainerNodes, + sortContainerNodesFunc: netmapsdk.NetMap.PlacementVectors, }, nil } @@ -84,7 +95,7 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, return fmt.Errorf("read current NeoFS epoch: %w", err) } - cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network} + cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network, getNodesFunc: x.getContainerNodesFunc} resCur, err := cnrCtx.applyAtEpoch(curEpoch, x.cache) if err != nil { @@ -151,9 +162,10 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node return res.nodeSets, res.repCounts, res.err } cnrRes, networkMap, err := (&containerPolicyContext{ - id: addr.Container(), - containers: x.containers, - network: x.network, + id: addr.Container(), + containers: x.containers, + network: x.network, + getNodesFunc: x.getContainerNodesFunc, }).applyToNetmap(curEpoch, x.cache) if err != nil || cnrRes.err != nil { if err == nil { @@ -168,7 +180,7 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node } } res.repCounts = cnrRes.repCounts - res.nodeSets, res.err = networkMap.PlacementVectors(cnrRes.nodeSets, addr.Object()) + res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object()) if res.err != nil { res.err = fmt.Errorf("sort container nodes for object: %w", res.err) } @@ -179,9 +191,10 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node // preserves context of storage policy processing for the particular container. type containerPolicyContext struct { // static - id cid.ID - containers container.Source - network netmap.Source + id cid.ID + containers container.Source + network netmap.Source + getNodesFunc getContainerNodesFunc // dynamic cnr *container.Container } @@ -218,7 +231,7 @@ func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[co return result, nil, fmt.Errorf("read network map by epoch: %w", err) } policy := x.cnr.Value.PlacementPolicy() - result.nodeSets, result.err = networkMap.ContainerNodes(policy, x.id) + result.nodeSets, result.err = x.getNodesFunc(*networkMap, policy, x.id) if result.err == nil { // ContainerNodes should control following, but still better to double-check if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum { diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go index 150348e6b1..43cb425528 100644 --- a/cmd/neofs-node/policy_test.go +++ b/cmd/neofs-node/policy_test.go @@ -13,6 +13,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/netmap" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) @@ -580,6 +581,115 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) { require.EqualValues(t, network.epoch, network.callsNetmap[0]) } }) + t.Run("diff num of node lists and replica descriptors", func(t *testing.T) { + _, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, cnr.PlacementPolicy(), policy) + require.Equal(t, anyAddr.Container(), cnrID) + return make([][]netmap.NodeInfo, 4), nil + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "select container nodes for current epoch #42: "+ + "invalid result of container's storage policy application to the network map: "+ + "diff number of storage node sets (4) and required replica descriptors (2)") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + + t.Run("not enough nodes in some list", func(t *testing.T) { + _, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, cnr.PlacementPolicy(), policy) + require.Equal(t, anyAddr.Container(), cnrID) + nodeLists, err := nm.ContainerNodes(policy, cnrID) + require.NoError(t, err) + res := make([][]netmap.NodeInfo, len(nodeLists)) + copy(res, nodeLists) + res[1] = res[1][:len(res[1])-1] + return res, nil + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "select container nodes for current epoch #42: "+ + "invalid result of container's storage policy application to the network map: "+ + "invalid storage node set #1: number of nodes (1) is less than minimum required by the container policy (2)") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + t.Run("diff num of node lists and replica descriptors", func(t *testing.T) { + _, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, cnr.PlacementPolicy(), policy) + require.Equal(t, anyAddr.Container(), cnrID) + return make([][]netmap.NodeInfo, 4), nil + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "select container nodes for current epoch #42: "+ + "invalid result of container's storage policy application to the network map: "+ + "diff number of storage node sets (4) and required replica descriptors (2)") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + + t.Run("sort nodes failure", func(t *testing.T) { + nodes, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.sortContainerNodesFunc = func(nm netmap.NetMap, ns [][]netmap.NodeInfo, id oid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, anyAddr.Object(), id) + for i := range ns { + for j := range ns[i] { + require.Contains(t, nodes, ns[i][j], [2]int{i, j}) + } + } + return nil, errors.New("any sort error") + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "sort container nodes for object: any sort error") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) }) t.Run("OK", func(t *testing.T) { nodes, networkMap, cnr := newNetmapWithContainer(t, 10, [][]int{