Skip to content

Commit

Permalink
Feat/push object meta to contracts (#3044)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Dec 23, 2024
2 parents 373a6e0 + ae6e37d commit 3a10bc1
Show file tree
Hide file tree
Showing 37 changed files with 676 additions and 142 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog for NeoFS Node
## [Unreleased]

### Added
- Initial support for meta-on-chain for objects (#2877)

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/morph.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func listenMorphNotifications(c *cfg) {
registerBlockHandler(lis, func(block *block.Block) {
c.log.Debug("new block", zap.Uint32("index", block.Index))

c.networkState.block.Store(block.Index)

err = c.persistate.SetUInt32(persistateFSChainLastBlockKey, block.Index)
if err != nil {
c.log.Warn("can't update persistent state",
Expand Down
12 changes: 11 additions & 1 deletion cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

// primary solution of local network state dump.
type networkState struct {
epoch atomic.Uint64
epoch atomic.Uint64
block atomic.Uint32
epochDuration atomic.Uint64

controlNetStatus atomic.Value // control.NetmapStatus

Expand All @@ -46,6 +48,14 @@ func (s *networkState) CurrentEpoch() uint64 {
return s.epoch.Load()
}

func (s *networkState) CurrentBlock() uint32 {
return s.block.Load()
}

func (s *networkState) CurrentEpochDuration() uint64 {
return s.epochDuration.Load()
}

func (s *networkState) setCurrentEpoch(v uint64) {
s.epoch.Store(v)

Expand Down
3 changes: 2 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func initObjectService(c *cfg) {
putsvc.WithNetworkMagic(mNumber),
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithContainerClient(c.cCli),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
putsvc.WithObjectStorage(storageEngine{engine: ls}),
putsvc.WithContainerSource(c.cfgObject.cnrSource),
Expand Down Expand Up @@ -349,7 +350,7 @@ func initObjectService(c *cfg) {

firstSvc := objectService.NewMetricCollector(signSvc, c.metricsCollector)

server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey))
server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey), c.cfgNetmap.state)

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/reputation.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func initReputationService(c *cfg) {
return
}

c.networkState.epochDuration.Store(duration)

iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations()
if err != nil {
log.Debug("could not fetch iteration number", zap.Error(err))
Expand Down
27 changes: 5 additions & 22 deletions cmd/neofs-node/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"fmt"

objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/message"
"github.com/nspcc-dev/neofs-api-go/v2/status"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
)

type transport struct {
Expand All @@ -22,7 +20,7 @@ type transport struct {

// SendReplicationRequestToNode connects to described node and sends prepared
// replication request message to it.
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) (*neofscrypto.Signature, error) {
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) ([]byte, error) {
c, err := x.clients.Get(node)
if err != nil {
return nil, fmt.Errorf("connect to remote node: %w", err)
Expand All @@ -40,12 +38,12 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte
}
return resp.err
})
return resp.sig, err
return resp.sigs, err
}

type replicateResponse struct {
sig *neofscrypto.Signature
err error
sigs []byte
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) }
Expand All @@ -70,22 +68,7 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
return nil
}

sig := m.GetObjectSignature()
if sig == nil {
return nil
}

sigV2 := new(refs.Signature)
err := sigV2.Unmarshal(sig)
if err != nil {
return fmt.Errorf("decoding signature from proto message: %w", err)
}

x.sig = new(neofscrypto.Signature)
err = x.sig.ReadFromV2(*sigV2)
if err != nil {
return fmt.Errorf("invalid signature: %w", err)
}
x.sigs = m.GetObjectSignature()

return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/nspcc-dev/locode-db v0.6.0
github.com/nspcc-dev/neo-go v0.107.1
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea
github.com/nspcc-dev/neofs-contract v0.20.0
github.com/nspcc-dev/neofs-contract v0.20.1-0.20241220193924-4da43dfb5a65
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20240809202351-256513c1b29b
github.com/nspcc-dev/tzhash v1.8.0
github.com/olekukonko/tablewriter v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240729160116-d8e3e57f88f2 h1:tv
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240729160116-d8e3e57f88f2/go.mod h1:/vrbWSHc7YS1KSYhVOyyeucXW/e+1DkVBOgnBEXUCeY=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK0EMGLvunXcFyq7fBURS/CsN4MH+4nlYiqn6pTwWAU=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y=
github.com/nspcc-dev/neofs-contract v0.20.0 h1:ARE/3mSN+P9qi/10NBsf7QyPiYrvnxeEgYUN13vHRlo=
github.com/nspcc-dev/neofs-contract v0.20.0/go.mod h1:YxtKYE/5cMNiqwWcQWzeizbB9jizauLni+p8wXxfhsQ=
github.com/nspcc-dev/neofs-contract v0.20.1-0.20241220193924-4da43dfb5a65 h1:SruyrmzfmaIK+rx3EyLsG3hb9Ooh+cTkObucl0yTVxY=
github.com/nspcc-dev/neofs-contract v0.20.1-0.20241220193924-4da43dfb5a65/go.mod h1:fwM6QoYPnsIuUQ4/GOwgzfQ9qoDKknqYgf4XWOqEdJw=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20240809202351-256513c1b29b h1:/7jXQP5pf+M0kRFC1gg5GEdTPkvotpMHxjSXIbMZaGQ=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20240809202351-256513c1b29b/go.mod h1:ewV84r1NACvoBfbKQKzRLUun+Xn5+z9JVqsuCVgv9xI=
github.com/nspcc-dev/rfc6979 v0.2.3 h1:QNVykGZ3XjFwM/88rGfV3oj4rKNBy+nYI6jM7q19hDI=
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/netmap/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ type State interface {
// CurrentEpoch returns the number of the current NeoFS epoch.
CurrentEpoch() uint64
}

// StateDetailed groups block, epoch and its duration information about FS chain.
type StateDetailed interface {
State
CurrentBlock() uint32
CurrentEpochDuration() uint64
}
9 changes: 4 additions & 5 deletions pkg/core/object/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
)

const (
validInterval = 10 // in epochs
currentVersion = 7 // it is also a number of fields
currentVersion = 7 // it is also a number of fields
)

const (
Expand All @@ -34,19 +33,19 @@ const (
// "size": payload size
// "deleted": array of _raw_ object IDs
// "locked": array of _raw_ object IDs
// "validuntil": last valid epoch number for meta information
// "validuntil": last valid block number for meta information
//
// Last valid epoch is object's creation epoch + 10.
func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64,
deleted, locked []oid.ID, createdAt uint64, magicNumber uint32) []byte {
deleted, locked []oid.ID, vub uint64, magicNumber uint32) []byte {
kvs := []stackitem.MapElement{
kv(networkMagicKey, magicNumber),
kv(cidKey, cID[:]),
kv(oidKey, oID[:]),
kv(sizeKey, pSize),
oidsKV(deletedKey, deleted),
oidsKV(lockedKey, locked),
kv(validUntilKey, createdAt+validInterval),
kv(validUntilKey, vub),
}

result, err := stackitem.Serialize(stackitem.NewMapWithValue(kvs))
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/object/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMetaInfo(t *testing.T) {
require.Equal(t, locked, stackItemToOIDs(t, mm[5].Value))

require.Equal(t, validUntilKey, string(mm[6].Key.Value().([]byte)))
require.Equal(t, validUntil+validInterval, mm[6].Value.Value().(*big.Int).Uint64())
require.Equal(t, validUntil, mm[6].Value.Value().(*big.Int).Uint64())
}

func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID {
Expand Down
2 changes: 1 addition & 1 deletion pkg/innerring/processors/alphabet/process_emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (ap *Processor) processEmit() {
}

// there is no signature collecting, so we don't need extra fee
err := ap.morphClient.Invoke(contract, 0, emitMethod)
err := ap.morphClient.Invoke(contract, false, 0, emitMethod)
if err != nil {
ap.log.Warn("can't invoke alphabet emit method", zap.Error(err))

Expand Down
53 changes: 36 additions & 17 deletions pkg/innerring/processors/container/process_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ type putEvent interface {
type putContainerContext struct {
e putEvent

d containerSDK.Domain
// must be filled when verifying raw data from e
cID cid.ID
cnr containerSDK.Container
d containerSDK.Domain
}

// Process a new container from the user by checking the container sanity
Expand Down Expand Up @@ -55,15 +58,15 @@ func (cp *Processor) processContainerPut(put putEvent) {

func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
binCnr := ctx.e.Container()
var cnr containerSDK.Container
ctx.cID = cid.NewFromMarshalledContainer(binCnr)

err := cnr.Unmarshal(binCnr)
err := ctx.cnr.Unmarshal(binCnr)
if err != nil {
return fmt.Errorf("invalid binary container: %w", err)
}

err = cp.verifySignature(signatureVerificationData{
ownerContainer: cnr.Owner(),
ownerContainer: ctx.cnr.Owner(),
verb: session.VerbContainerPut,
binTokenSession: ctx.e.SessionToken(),
binPublicKey: ctx.e.PublicKey(),
Expand All @@ -75,13 +78,13 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
}

// check homomorphic hashing setting
err = checkHomomorphicHashing(cp.netState, cnr)
err = checkHomomorphicHashing(cp.netState, ctx.cnr)
if err != nil {
return fmt.Errorf("incorrect homomorphic hashing setting: %w", err)
}

// check native name and zone
err = checkNNS(ctx, cnr)
err = checkNNS(ctx, ctx.cnr)
if err != nil {
return fmt.Errorf("NNS: %w", err)
}
Expand All @@ -94,22 +97,38 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) {

var err error

prm := cntClient.PutPrm{}

prm.SetContainer(e.Container())
prm.SetKey(e.PublicKey())
prm.SetSignature(e.Signature())
prm.SetToken(e.SessionToken())
prm.SetName(ctx.d.Name())
prm.SetZone(ctx.d.Zone())

nr := e.NotaryRequest()
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, true)

if err != nil {
cp.log.Error("could not approve put container",
zap.Error(err),
)
return
}

nm, err := cp.netState.NetMap()
if err != nil {
cp.log.Error("could not get netmap for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err))
return
}

policy := ctx.cnr.PlacementPolicy()
vectors, err := nm.ContainerNodes(policy, ctx.cID)
if err != nil {
cp.log.Error("could not build placement for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err))
return
}

replicas := make([]uint32, 0, policy.NumberOfReplicas())
for i := range vectors {
replicas = append(replicas, policy.ReplicaNumberByIndex(i))
}

err = cp.cnrClient.UpdateContainerPlacement(ctx.cID, vectors, replicas)
if err != nil {
cp.log.Error("could not update Container contract", zap.Stringer("cid", ctx.cID), zap.Error(err))
return
}
}

Expand Down Expand Up @@ -175,7 +194,7 @@ func (cp *Processor) approveDeleteContainer(e *containerEvent.Delete) {
prm.SetToken(e.SessionToken())

nr := e.NotaryRequest()
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false)

if err != nil {
cp.log.Error("could not approve delete container",
Expand Down
2 changes: 1 addition & 1 deletion pkg/innerring/processors/container/process_eacl.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (cp *Processor) approveSetEACL(e container.SetEACL) {
prm.SetToken(e.SessionToken())

nr := e.NotaryRequest()
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false)

if err != nil {
cp.log.Error("could not approve set EACL",
Expand Down
4 changes: 4 additions & 0 deletions pkg/innerring/processors/container/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -55,6 +56,9 @@ type NetworkState interface {
//
// which did not allow reading the value.
HomomorphicHashDisabled() (bool, error)

// NetMap must return actual network map.
NetMap() (*netmap.NetMap, error)
}

// New creates a container contract processor instance.
Expand Down
16 changes: 14 additions & 2 deletions pkg/innerring/processors/netmap/cleanup_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netmap

import (
"bytes"
"slices"
"sync"

"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand All @@ -13,6 +14,8 @@ type (
enabled bool
threshold uint64
lastAccess map[string]epochStampWithNodeInfo

prev netmap.NetMap
}

epochStamp struct {
Expand All @@ -36,8 +39,9 @@ func newCleanupTable(enabled bool, threshold uint64) cleanupTable {
}
}

// Update cleanup table based on on-chain information about netmap.
func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) {
// Update cleanup table based on on-chain information about netmap. Returned
// value indicates if the composition of network map memebers has changed.
func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) bool {
c.Lock()
defer c.Unlock()

Expand All @@ -64,6 +68,14 @@ func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) {
}

c.lastAccess = newMap

// order is expected to be the same from epoch to epoch
mapChanged := !slices.EqualFunc(c.prev.Nodes(), nmNodes, func(i1 netmap.NodeInfo, i2 netmap.NodeInfo) bool {
return bytes.Equal(i1.PublicKey(), i2.PublicKey())
})
c.prev = snapshot

return mapChanged
}

// updates last access time of the netmap node by string public key.
Expand Down
Loading

0 comments on commit 3a10bc1

Please sign in to comment.