Skip to content

Commit

Permalink
node/object/put: use block as validUntil, not epoches
Browse files Browse the repository at this point in the history
It is more natural for on-chain operations. Also, it requires handling sync
problems when different blocks are used for the same objects on different nodes:
two signatures are always attached with epoch duration difference.
Refs nspcc-dev/neofs-contract#451.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Dec 23, 2024
1 parent 70bd9ea commit 8679044
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 79 deletions.
2 changes: 2 additions & 0 deletions cmd/neofs-node/morph.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func listenMorphNotifications(c *cfg) {
registerBlockHandler(lis, func(block *block.Block) {
c.log.Debug("new block", zap.Uint32("index", block.Index))

c.networkState.block.Store(block.Index)

Check warning on line 101 in cmd/neofs-node/morph.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/morph.go#L100-L101

Added lines #L100 - L101 were not covered by tests
err = c.persistate.SetUInt32(persistateFSChainLastBlockKey, block.Index)
if err != nil {
c.log.Warn("can't update persistent state",
Expand Down
12 changes: 11 additions & 1 deletion cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

// primary solution of local network state dump.
type networkState struct {
epoch atomic.Uint64
epoch atomic.Uint64
block atomic.Uint32
epochDuration atomic.Uint64

controlNetStatus atomic.Value // control.NetmapStatus

Expand All @@ -46,6 +48,14 @@ func (s *networkState) CurrentEpoch() uint64 {
return s.epoch.Load()
}

func (s *networkState) CurrentBlock() uint32 {
return s.block.Load()

Check warning on line 52 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (s *networkState) CurrentEpochDuration() uint64 {
return s.epochDuration.Load()

Check warning on line 56 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

func (s *networkState) setCurrentEpoch(v uint64) {
s.epoch.Store(v)

Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func initObjectService(c *cfg) {

firstSvc := objectService.NewMetricCollector(signSvc, c.metricsCollector)

server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey))
server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey), c.cfgNetmap.state)

Check warning on line 353 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L353

Added line #L353 was not covered by tests

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/reputation.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func initReputationService(c *cfg) {
return
}

c.networkState.epochDuration.Store(duration)

Check warning on line 234 in cmd/neofs-node/reputation.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/reputation.go#L233-L234

Added lines #L233 - L234 were not covered by tests
iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations()
if err != nil {
log.Debug("could not fetch iteration number", zap.Error(err))
Expand Down
27 changes: 5 additions & 22 deletions cmd/neofs-node/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"fmt"

objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/message"
"github.com/nspcc-dev/neofs-api-go/v2/status"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
)

type transport struct {
Expand All @@ -22,7 +20,7 @@ type transport struct {

// SendReplicationRequestToNode connects to described node and sends prepared
// replication request message to it.
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) (*neofscrypto.Signature, error) {
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) ([]byte, error) {

Check warning on line 23 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L23

Added line #L23 was not covered by tests
c, err := x.clients.Get(node)
if err != nil {
return nil, fmt.Errorf("connect to remote node: %w", err)
Expand All @@ -40,12 +38,12 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte
}
return resp.err
})
return resp.sig, err
return resp.sigs, err

Check warning on line 41 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L41

Added line #L41 was not covered by tests
}

type replicateResponse struct {
sig *neofscrypto.Signature
err error
sigs []byte
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) }
Expand All @@ -70,22 +68,7 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
return nil
}

sig := m.GetObjectSignature()
if sig == nil {
return nil
}

sigV2 := new(refs.Signature)
err := sigV2.Unmarshal(sig)
if err != nil {
return fmt.Errorf("decoding signature from proto message: %w", err)
}

x.sig = new(neofscrypto.Signature)
err = x.sig.ReadFromV2(*sigV2)
if err != nil {
return fmt.Errorf("invalid signature: %w", err)
}
x.sigs = m.GetObjectSignature()

Check warning on line 71 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L71

Added line #L71 was not covered by tests

return nil
}
7 changes: 7 additions & 0 deletions pkg/core/netmap/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ type State interface {
// CurrentEpoch returns the number of the current NeoFS epoch.
CurrentEpoch() uint64
}

// StateDetailed groups block, epoch and its duration information about FS chain.
type StateDetailed interface {
State
CurrentBlock() uint32
CurrentEpochDuration() uint64
}
9 changes: 4 additions & 5 deletions pkg/core/object/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
)

const (
validInterval = 10 // in epochs
currentVersion = 7 // it is also a number of fields
currentVersion = 7 // it is also a number of fields
)

const (
Expand All @@ -34,19 +33,19 @@ const (
// "size": payload size
// "deleted": array of _raw_ object IDs
// "locked": array of _raw_ object IDs
// "validuntil": last valid epoch number for meta information
// "validuntil": last valid block number for meta information
//
// Last valid epoch is object's creation epoch + 10.
func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64,
deleted, locked []oid.ID, createdAt uint64, magicNumber uint32) []byte {
deleted, locked []oid.ID, vub uint64, magicNumber uint32) []byte {
kvs := []stackitem.MapElement{
kv(networkMagicKey, magicNumber),
kv(cidKey, cID[:]),
kv(oidKey, oID[:]),
kv(sizeKey, pSize),
oidsKV(deletedKey, deleted),
oidsKV(lockedKey, locked),
kv(validUntilKey, createdAt+validInterval),
kv(validUntilKey, vub),
}

result, err := stackitem.Serialize(stackitem.NewMapWithValue(kvs))
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/object/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMetaInfo(t *testing.T) {
require.Equal(t, locked, stackItemToOIDs(t, mm[5].Value))

require.Equal(t, validUntilKey, string(mm[6].Key.Value().([]byte)))
require.Equal(t, validUntil+validInterval, mm[6].Value.Value().(*big.Int).Uint64())
require.Equal(t, validUntil, mm[6].Value.Value().(*big.Int).Uint64())
}

func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID {
Expand Down
45 changes: 38 additions & 7 deletions pkg/network/transport/object/grpc/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"

Expand Down Expand Up @@ -228,17 +229,47 @@ func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) {
default:
}

metaInfo := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked,
o.CreationEpoch(), s.mNumber)
currentBlock := s.nmState.CurrentBlock()
currentEpochDuration := s.nmState.CurrentEpochDuration()
firstBlock := (uint64(currentBlock)/currentEpochDuration + 1) * currentEpochDuration
secondBlock := firstBlock + currentEpochDuration
thirdBlock := secondBlock + currentEpochDuration

var sig neofscrypto.Signature
err := sig.Calculate(s.signer, metaInfo)
firstMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, firstBlock, s.mNumber)
secondMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, secondBlock, s.mNumber)
thirdMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, thirdBlock, s.mNumber)

var firstSig neofscrypto.Signature
var secondSig neofscrypto.Signature
var thirdSig neofscrypto.Signature

err := firstSig.Calculate(s.signer, firstMeta)
if err != nil {
return nil, fmt.Errorf("signature failure: %w", err)
}

Check warning on line 249 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L248-L249

Added lines #L248 - L249 were not covered by tests
err = secondSig.Calculate(s.signer, secondMeta)
if err != nil {
return nil, fmt.Errorf("signature failure: %w", err)
}
err = thirdSig.Calculate(s.signer, thirdMeta)
if err != nil {
return nil, fmt.Errorf("signature failure: %w", err)
}

Check warning on line 257 in pkg/network/transport/object/grpc/replication.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/transport/object/grpc/replication.go#L256-L257

Added lines #L256 - L257 were not covered by tests

firstSigV2 := new(refsv2.Signature)
firstSig.WriteToV2(firstSigV2)
secondSigV2 := new(refsv2.Signature)
secondSig.WriteToV2(secondSigV2)
thirdSigV2 := new(refsv2.Signature)
thirdSig.WriteToV2(thirdSigV2)

sigV2 := new(refsv2.Signature)
sig.WriteToV2(sigV2)
res := make([]byte, 0, 4+firstSigV2.StableSize()+4+secondSigV2.StableSize()+4+thirdSigV2.StableSize())
res = binary.LittleEndian.AppendUint32(res, uint32(firstSigV2.StableSize()))
res = append(res, firstSigV2.StableMarshal(nil)...)
res = binary.LittleEndian.AppendUint32(res, uint32(secondSigV2.StableSize()))
res = append(res, secondSigV2.StableMarshal(nil)...)
res = binary.LittleEndian.AppendUint32(res, uint32(thirdSigV2.StableSize()))
res = append(res, thirdSigV2.StableMarshal(nil)...)

return sigV2.StableMarshal(nil), nil
return res, nil
}
51 changes: 38 additions & 13 deletions pkg/network/transport/object/grpc/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"encoding/binary"
"errors"
"testing"

Expand Down Expand Up @@ -164,7 +165,7 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID
func TestServer_Replicate(t *testing.T) {
var noCallNode noCallTestNode
var noCallObjSvc noCallObjectService
noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer())
noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer(), netmapStateDetailed{})
clientSigner := neofscryptotest.Signer()
clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public())
serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public())
Expand Down Expand Up @@ -328,7 +329,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("apply storage policy failure", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{})

node.cnrErr = errors.New("any error")

Expand All @@ -340,7 +341,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("client or server mismatches object's storage policy", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{})

node.serverOutsideCnr = true
node.clientOutsideCnr = true
Expand All @@ -360,7 +361,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("local storage failure", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{})

node.storeErr = errors.New("any error")

Expand All @@ -375,7 +376,7 @@ func TestServer_Replicate(t *testing.T) {
signer := neofscryptotest.Signer()
reqForSignature, o := anyValidRequest(t, clientSigner, cnr, objID)
node := newTestNode(t, serverPubKey, clientPubKey, cnr, reqForSignature.Object)
srv := New(noCallObjSvc, mNumber, node, signer)
srv := New(noCallObjSvc, mNumber, node, signer, netmapStateDetailed{})

t.Run("signature not requested", func(t *testing.T) {
resp, err := srv.Replicate(context.Background(), reqForSignature)
Expand All @@ -394,20 +395,30 @@ func TestServer_Replicate(t *testing.T) {
require.Empty(t, resp.GetStatus().GetMessage())
require.NotNil(t, resp.GetObjectSignature())

var sigV2 refsv2.Signature
require.NoError(t, sigV2.Unmarshal(resp.GetObjectSignature()))
sigsRaw := resp.GetObjectSignature()

var sig neofscrypto.Signature
require.NoError(t, sig.ReadFromV2(sigV2))
for i := range 1 {
var sigV2 refsv2.Signature
l := binary.LittleEndian.Uint32(sigsRaw)

require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes())
require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, o.CreationEpoch(), mNumber)))
require.NoError(t, sigV2.Unmarshal(sigsRaw[4:4+l]))

var sig neofscrypto.Signature
require.NoError(t, sig.ReadFromV2(sigV2))

require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes())
require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo(
o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil,
uint64((123+1+i)*240), mNumber)))

sigsRaw = sigsRaw[:4+l]
}
})
})

t.Run("OK", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{})

resp, err := srv.Replicate(context.Background(), req)
require.NoError(t, err)
Expand All @@ -416,6 +427,20 @@ func TestServer_Replicate(t *testing.T) {
})
}

type netmapStateDetailed struct{}

func (n netmapStateDetailed) CurrentEpoch() uint64 {
return 123
}

func (n netmapStateDetailed) CurrentBlock() uint32 {
return 123 * 240
}

func (n netmapStateDetailed) CurrentEpochDuration() uint64 {
return 240
}

type nopNode struct{}

func (x nopNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error {
Expand All @@ -434,7 +459,7 @@ func BenchmarkServer_Replicate(b *testing.B) {
ctx := context.Background()
var node nopNode

srv := New(nil, 0, node, neofscryptotest.Signer())
srv := New(nil, 0, node, neofscryptotest.Signer(), netmapStateDetailed{})

for _, tc := range []struct {
name string
Expand Down
5 changes: 4 additions & 1 deletion pkg/network/transport/object/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand Down Expand Up @@ -51,15 +52,17 @@ type Server struct {
node Node
signer neofscrypto.Signer
mNumber uint32
nmState netmap.StateDetailed
}

// New creates, initializes and returns Server instance.
func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer) *Server {
func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer, nmState netmap.StateDetailed) *Server {
return &Server{
srv: c,
node: node,
signer: signer,
mNumber: magicNumber,
nmState: nmState,
}
}

Expand Down
Loading

0 comments on commit 8679044

Please sign in to comment.