Skip to content

Commit

Permalink
Cache storage policy application results (Get/Head/GetRange) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Aug 7, 2024
2 parents 4824da9 + 8b82bbd commit 504daae
Show file tree
Hide file tree
Showing 10 changed files with 466 additions and 153 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Changelog for NeoFS Node
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)

### Removed

Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ type cfgObject struct {
cfgLocalStorage cfgLocalStorage

tombstoneLifetime uint64

containerNodes *containerNodes
}

type cfgLocalStorage struct {
Expand Down
28 changes: 19 additions & 9 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,12 @@ func initObjectService(c *cfg) {
policer.WithObjectBatchSize(c.applicationConfiguration.policer.objectBatchSize),
)

traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)

c.workers = append(c.workers, c.shared.policer)

sGet := getsvc.New(
sGet := getsvc.New(c,
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)

Expand All @@ -250,6 +242,7 @@ func initObjectService(c *cfg) {

cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource)
fatalOnErr(err)
c.cfgObject.containerNodes = cnrNodes

sSearch := searchsvc.New(newRemoteContainerNodes(cnrNodes, c.IsLocalKey),
searchsvc.WithLogger(c.log),
Expand Down Expand Up @@ -719,3 +712,20 @@ func (o objectSource) Search(ctx context.Context, cnr cid.ID, filters objectSDK.

return sw.ids, nil
}

// IsLocalNodePublicKey checks whether given binary-encoded public key is
// assigned in the network map to a local storage node.
//
// IsLocalNodePublicKey implements [getsvc.NeoFSNetwork].
func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
//
// GetNodesForObject implements [getsvc.NeoFSNetwork].
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
return c.cfgObject.containerNodes.getNodesForObject(addr)
}
145 changes: 123 additions & 22 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,41 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// storagePolicyRes structures persistent storage policy application result for
// particular container and network map incl. error.
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
err error
nodeSets [][]netmapsdk.NodeInfo
repCounts []uint
err error
}

type containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}
type (
containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}
objectNodesCacheKey struct {
epoch uint64
addr oid.Address
}
)

// max number of container storage policy applications results cached by
// containerNodes.
const cachedContainerNodesNum = 1000
const (
// max number of container storage policy applications results cached by
// containerNodes.
cachedContainerNodesNum = 1000
// max number of object storage policy applications results cached by
// containerNodes.
cachedObjectNodesNum = 10000
)

type (
getContainerNodesFunc = func(netmapsdk.NetMap, netmapsdk.PlacementPolicy, cid.ID) ([][]netmapsdk.NodeInfo, error)
sortContainerNodesFunc = func(netmapsdk.NetMap, [][]netmapsdk.NodeInfo, oid.ID) ([][]netmapsdk.NodeInfo, error)
)

// containerNodes wraps NeoFS network state to apply container storage policies.
//
Expand All @@ -35,18 +53,30 @@ type containerNodes struct {
containers container.Source
network netmap.Source

cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes]

// for testing
getContainerNodesFunc getContainerNodesFunc
sortContainerNodesFunc sortContainerNodesFunc
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
l, err := lru.New[containerNodesCacheKey, storagePolicyRes](cachedContainerNodesNum)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err)
}
lo, err := lru.New[objectNodesCacheKey, storagePolicyRes](cachedObjectNodesNum)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for objects: %w", err)
}
return &containerNodes{
containers: containers,
network: network,
cache: l,
containers: containers,
network: network,
cache: l,
objCache: lo,
getContainerNodesFunc: netmapsdk.NetMap.ContainerNodes,
sortContainerNodesFunc: netmapsdk.NetMap.PlacementVectors,
}, nil
}

Expand All @@ -65,7 +95,7 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool,
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network}
cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network, getNodesFunc: x.getContainerNodesFunc}

resCur, err := cnrCtx.applyAtEpoch(curEpoch, x.cache)
if err != nil {
Expand Down Expand Up @@ -116,12 +146,55 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool,
return nil
}

// getNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
curEpoch, err := x.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cacheKey := objectNodesCacheKey{curEpoch, addr}
res, ok := x.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
// non-persistent error => do not cache
return nil, nil, fmt.Errorf("read network map by epoch: %w", err)
}
}
res.repCounts = cnrRes.repCounts
res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object())
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
x.objCache.Add(cacheKey, res)
return res.nodeSets, res.repCounts, res.err
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
id cid.ID
containers container.Source
network netmap.Source
id cid.ID
containers container.Source
network netmap.Source
getNodesFunc getContainerNodesFunc
// dynamic
cnr *container.Container
}
Expand All @@ -130,25 +203,53 @@ type containerPolicyContext struct {
// ID to the network map at the specified epoch. applyAtEpoch checks existing
// results in the cache and stores new results in it.
func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, error) {
res, _, err := x.applyToNetmap(epoch, cache)
return res, err
}

// applyToNetmap applies storage policy of container referenced by parameterized
// ID to the network map at the specified epoch. applyAtEpoch checks existing
// results in the cache and stores new results in it. Network map is returned if
// it was requested, i.e. on cache miss only.
func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, *netmapsdk.NetMap, error) {
cacheKey := containerNodesCacheKey{epoch, x.id}
if result, ok := cache.Get(cacheKey); ok {
return result, nil
return result, nil, nil
}
var result storagePolicyRes
var err error
if x.cnr == nil {
x.cnr, err = x.containers.Get(x.id)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
return result, nil, fmt.Errorf("read container by ID: %w", err)
}
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read network map by epoch: %w", err)
return result, nil, fmt.Errorf("read network map by epoch: %w", err)
}
policy := x.cnr.Value.PlacementPolicy()
result.nodeSets, result.err = x.getNodesFunc(*networkMap, policy, x.id)
if result.err == nil {
// ContainerNodes should control following, but still better to double-check
if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum {
result.err = fmt.Errorf("invalid result of container's storage policy application to the network map: "+
"diff number of storage node sets (%d) and required replica descriptors (%d)",
len(result.nodeSets), policyNum)
} else {
result.repCounts = make([]uint, len(result.nodeSets))
for i := range result.nodeSets {
if result.repCounts[i] = uint(policy.ReplicaNumberByIndex(i)); result.repCounts[i] > uint(len(result.nodeSets[i])) {
result.err = fmt.Errorf("invalid result of container's storage policy application to the network map: "+
"invalid storage node set #%d: number of nodes (%d) is less than minimum required by the container policy (%d)",
i, len(result.nodeSets[i]), result.repCounts[i])
break
}
}
}
}
result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id)
cache.Add(cacheKey, result)
return result, nil
return result, networkMap, nil
}
Loading

0 comments on commit 504daae

Please sign in to comment.