diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index cc40c1331a..b87ca464bf 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -97,6 +97,8 @@ func listenMorphNotifications(c *cfg) { registerBlockHandler(lis, func(block *block.Block) { c.log.Debug("new block", zap.Uint32("index", block.Index)) + c.networkState.block.Store(block.Index) + err = c.persistate.SetUInt32(persistateFSChainLastBlockKey, block.Index) if err != nil { c.log.Warn("can't update persistent state", diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index e03c2ecf5d..3a197ef08b 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -24,7 +24,9 @@ import ( // primary solution of local network state dump. type networkState struct { - epoch atomic.Uint64 + epoch atomic.Uint64 + block atomic.Uint32 + epochDuration atomic.Uint64 controlNetStatus atomic.Value // control.NetmapStatus @@ -46,6 +48,14 @@ func (s *networkState) CurrentEpoch() uint64 { return s.epoch.Load() } +func (s *networkState) CurrentBlock() uint32 { + return s.block.Load() +} + +func (s *networkState) CurrentEpochDuration() uint64 { + return s.epochDuration.Load() +} + func (s *networkState) setCurrentEpoch(v uint64) { s.epoch.Store(v) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index c683074503..de391b3304 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -350,7 +350,7 @@ func initObjectService(c *cfg) { firstSvc := objectService.NewMetricCollector(signSvc, c.metricsCollector) - server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey)) + server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey), c.cfgNetmap.state) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 37e2e9fd84..d2ea3ad09f 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -230,6 +230,8 @@ func initReputationService(c *cfg) { return } + c.networkState.epochDuration.Store(duration) + iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations() if err != nil { log.Debug("could not fetch iteration number", zap.Error(err)) diff --git a/cmd/neofs-node/transport.go b/cmd/neofs-node/transport.go index 156be9a54c..2382e17242 100644 --- a/cmd/neofs-node/transport.go +++ b/cmd/neofs-node/transport.go @@ -5,7 +5,6 @@ import ( "fmt" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" - "github.com/nspcc-dev/neofs-api-go/v2/refs" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" @@ -13,7 +12,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/status" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" ) type transport struct { @@ -22,7 +20,7 @@ type transport struct { // SendReplicationRequestToNode connects to described node and sends prepared // replication request message to it. -func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) (*neofscrypto.Signature, error) { +func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) ([]byte, error) { c, err := x.clients.Get(node) if err != nil { return nil, fmt.Errorf("connect to remote node: %w", err) @@ -40,12 +38,12 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte } return resp.err }) - return resp.sig, err + return resp.sigs, err } type replicateResponse struct { - sig *neofscrypto.Signature - err error + sigs []byte + err error } func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) } @@ -70,22 +68,7 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error { return nil } - sig := m.GetObjectSignature() - if sig == nil { - return nil - } - - sigV2 := new(refs.Signature) - err := sigV2.Unmarshal(sig) - if err != nil { - return fmt.Errorf("decoding signature from proto message: %w", err) - } - - x.sig = new(neofscrypto.Signature) - err = x.sig.ReadFromV2(*sigV2) - if err != nil { - return fmt.Errorf("invalid signature: %w", err) - } + x.sigs = m.GetObjectSignature() return nil } diff --git a/pkg/core/netmap/state.go b/pkg/core/netmap/state.go index 9b5b441a10..6482dc332e 100644 --- a/pkg/core/netmap/state.go +++ b/pkg/core/netmap/state.go @@ -5,3 +5,10 @@ type State interface { // CurrentEpoch returns the number of the current NeoFS epoch. CurrentEpoch() uint64 } + +// StateDetailed groups block, epoch and its duration information about FS chain. +type StateDetailed interface { + State + CurrentBlock() uint32 + CurrentEpochDuration() uint64 +} diff --git a/pkg/core/object/replicate.go b/pkg/core/object/replicate.go index 194b99cd0b..05d62b423a 100644 --- a/pkg/core/object/replicate.go +++ b/pkg/core/object/replicate.go @@ -9,8 +9,7 @@ import ( ) const ( - validInterval = 10 // in epochs - currentVersion = 7 // it is also a number of fields + currentVersion = 7 // it is also a number of fields ) const ( @@ -34,11 +33,11 @@ const ( // "size": payload size // "deleted": array of _raw_ object IDs // "locked": array of _raw_ object IDs -// "validuntil": last valid epoch number for meta information +// "validuntil": last valid block number for meta information // // Last valid epoch is object's creation epoch + 10. func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64, - deleted, locked []oid.ID, createdAt uint64, magicNumber uint32) []byte { + deleted, locked []oid.ID, vub uint64, magicNumber uint32) []byte { kvs := []stackitem.MapElement{ kv(networkMagicKey, magicNumber), kv(cidKey, cID[:]), @@ -46,7 +45,7 @@ func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64, kv(sizeKey, pSize), oidsKV(deletedKey, deleted), oidsKV(lockedKey, locked), - kv(validUntilKey, createdAt+validInterval), + kv(validUntilKey, vub), } result, err := stackitem.Serialize(stackitem.NewMapWithValue(kvs)) diff --git a/pkg/core/object/replicate_test.go b/pkg/core/object/replicate_test.go index 3f14669163..22ad851916 100644 --- a/pkg/core/object/replicate_test.go +++ b/pkg/core/object/replicate_test.go @@ -50,7 +50,7 @@ func TestMetaInfo(t *testing.T) { require.Equal(t, locked, stackItemToOIDs(t, mm[5].Value)) require.Equal(t, validUntilKey, string(mm[6].Key.Value().([]byte))) - require.Equal(t, validUntil+validInterval, mm[6].Value.Value().(*big.Int).Uint64()) + require.Equal(t, validUntil, mm[6].Value.Value().(*big.Int).Uint64()) } func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID { diff --git a/pkg/network/transport/object/grpc/replication.go b/pkg/network/transport/object/grpc/replication.go index e73f89c1ba..4c99e5cf89 100644 --- a/pkg/network/transport/object/grpc/replication.go +++ b/pkg/network/transport/object/grpc/replication.go @@ -3,6 +3,7 @@ package object import ( "bytes" "context" + "encoding/binary" "errors" "fmt" @@ -228,17 +229,47 @@ func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) { default: } - metaInfo := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, - o.CreationEpoch(), s.mNumber) + currentBlock := s.nmState.CurrentBlock() + currentEpochDuration := s.nmState.CurrentEpochDuration() + firstBlock := (uint64(currentBlock)/currentEpochDuration + 1) * currentEpochDuration + secondBlock := firstBlock + currentEpochDuration + thirdBlock := secondBlock + currentEpochDuration - var sig neofscrypto.Signature - err := sig.Calculate(s.signer, metaInfo) + firstMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, firstBlock, s.mNumber) + secondMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, secondBlock, s.mNumber) + thirdMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, thirdBlock, s.mNumber) + + var firstSig neofscrypto.Signature + var secondSig neofscrypto.Signature + var thirdSig neofscrypto.Signature + + err := firstSig.Calculate(s.signer, firstMeta) + if err != nil { + return nil, fmt.Errorf("signature failure: %w", err) + } + err = secondSig.Calculate(s.signer, secondMeta) if err != nil { return nil, fmt.Errorf("signature failure: %w", err) } + err = thirdSig.Calculate(s.signer, thirdMeta) + if err != nil { + return nil, fmt.Errorf("signature failure: %w", err) + } + + firstSigV2 := new(refsv2.Signature) + firstSig.WriteToV2(firstSigV2) + secondSigV2 := new(refsv2.Signature) + secondSig.WriteToV2(secondSigV2) + thirdSigV2 := new(refsv2.Signature) + thirdSig.WriteToV2(thirdSigV2) - sigV2 := new(refsv2.Signature) - sig.WriteToV2(sigV2) + res := make([]byte, 0, 4+firstSigV2.StableSize()+4+secondSigV2.StableSize()+4+thirdSigV2.StableSize()) + res = binary.LittleEndian.AppendUint32(res, uint32(firstSigV2.StableSize())) + res = append(res, firstSigV2.StableMarshal(nil)...) + res = binary.LittleEndian.AppendUint32(res, uint32(secondSigV2.StableSize())) + res = append(res, secondSigV2.StableMarshal(nil)...) + res = binary.LittleEndian.AppendUint32(res, uint32(thirdSigV2.StableSize())) + res = append(res, thirdSigV2.StableMarshal(nil)...) - return sigV2.StableMarshal(nil), nil + return res, nil } diff --git a/pkg/network/transport/object/grpc/replication_test.go b/pkg/network/transport/object/grpc/replication_test.go index ba4c31e2ec..6dbd1dbae7 100644 --- a/pkg/network/transport/object/grpc/replication_test.go +++ b/pkg/network/transport/object/grpc/replication_test.go @@ -6,6 +6,7 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" + "encoding/binary" "errors" "testing" @@ -164,7 +165,7 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID func TestServer_Replicate(t *testing.T) { var noCallNode noCallTestNode var noCallObjSvc noCallObjectService - noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer()) + noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer(), netmapStateDetailed{}) clientSigner := neofscryptotest.Signer() clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public()) @@ -328,7 +329,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("apply storage policy failure", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) node.cnrErr = errors.New("any error") @@ -340,7 +341,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("client or server mismatches object's storage policy", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) node.serverOutsideCnr = true node.clientOutsideCnr = true @@ -360,7 +361,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("local storage failure", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) node.storeErr = errors.New("any error") @@ -375,7 +376,7 @@ func TestServer_Replicate(t *testing.T) { signer := neofscryptotest.Signer() reqForSignature, o := anyValidRequest(t, clientSigner, cnr, objID) node := newTestNode(t, serverPubKey, clientPubKey, cnr, reqForSignature.Object) - srv := New(noCallObjSvc, mNumber, node, signer) + srv := New(noCallObjSvc, mNumber, node, signer, netmapStateDetailed{}) t.Run("signature not requested", func(t *testing.T) { resp, err := srv.Replicate(context.Background(), reqForSignature) @@ -394,20 +395,30 @@ func TestServer_Replicate(t *testing.T) { require.Empty(t, resp.GetStatus().GetMessage()) require.NotNil(t, resp.GetObjectSignature()) - var sigV2 refsv2.Signature - require.NoError(t, sigV2.Unmarshal(resp.GetObjectSignature())) + sigsRaw := resp.GetObjectSignature() - var sig neofscrypto.Signature - require.NoError(t, sig.ReadFromV2(sigV2)) + for i := range 1 { + var sigV2 refsv2.Signature + l := binary.LittleEndian.Uint32(sigsRaw) - require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes()) - require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, o.CreationEpoch(), mNumber))) + require.NoError(t, sigV2.Unmarshal(sigsRaw[4:4+l])) + + var sig neofscrypto.Signature + require.NoError(t, sig.ReadFromV2(sigV2)) + + require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes()) + require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo( + o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, + uint64((123+1+i)*240), mNumber))) + + sigsRaw = sigsRaw[:4+l] + } }) }) t.Run("OK", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -416,6 +427,20 @@ func TestServer_Replicate(t *testing.T) { }) } +type netmapStateDetailed struct{} + +func (n netmapStateDetailed) CurrentEpoch() uint64 { + return 123 +} + +func (n netmapStateDetailed) CurrentBlock() uint32 { + return 123 * 240 +} + +func (n netmapStateDetailed) CurrentEpochDuration() uint64 { + return 240 +} + type nopNode struct{} func (x nopNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error { @@ -434,7 +459,7 @@ func BenchmarkServer_Replicate(b *testing.B) { ctx := context.Background() var node nopNode - srv := New(nil, 0, node, neofscryptotest.Signer()) + srv := New(nil, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) for _, tc := range []struct { name string diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 6ebe90c5c5..7f3d770cc6 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -51,15 +52,17 @@ type Server struct { node Node signer neofscrypto.Signer mNumber uint32 + nmState netmap.StateDetailed } // New creates, initializes and returns Server instance. -func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer) *Server { +func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer, nmState netmap.StateDetailed) *Server { return &Server{ srv: c, node: node, signer: signer, mNumber: magicNumber, + nmState: nmState, } } diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index c4582bf4fd..7e07d6bb34 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -2,12 +2,14 @@ package putsvc import ( "bytes" + "encoding/binary" "fmt" "math" "slices" "sync" "sync/atomic" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" chaincontainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" @@ -23,15 +25,17 @@ import ( type preparedObjectTarget interface { WriteObject(*objectSDK.Object, object.ContentMeta, encodedObject) error - Close() (oid.ID, *neofscrypto.Signature, error) + Close() (oid.ID, []byte, error) } type distributedTarget struct { placementIterator placementIterator - obj *objectSDK.Object - objMeta object.ContentMeta - networkMagicNumber uint32 + obj *objectSDK.Object + objMeta object.ContentMeta + networkMagicNumber uint32 + currentBlock uint32 + currentEpochDuration uint64 cnrClient *chaincontainer.Client metainfoConsistencyAttr string @@ -147,8 +151,9 @@ func (t *distributedTarget) Close() (oid.ID, error) { default: } + expectedVUB := (uint64(t.currentBlock)/t.currentEpochDuration + 2) * t.currentEpochDuration t.objSharedMeta = object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), t.obj.PayloadSize(), deletedObjs, - lockedObjs, t.obj.CreationEpoch(), t.networkMagicNumber) + lockedObjs, expectedVUB, t.networkMagicNumber) id := t.obj.GetID() err := t.placementIterator.iterateNodesForObject(id, t.sendObject) if err != nil { @@ -199,7 +204,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { return fmt.Errorf("could not write header: %w", err) } - _, sig, err := target.Close() + _, sigsRaw, err := target.Close() if err != nil { return fmt.Errorf("could not close object stream: %w", err) } @@ -207,28 +212,78 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { if t.localNodeInContainer && !node.local { // These should technically be errors, but we don't have // a complete implementation now, so errors are substituted with logs. - var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID())) + var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID()), + zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) - if sig == nil { - l.Info("missing object meta signature") + sigs, err := decodeSignatures(sigsRaw) + if err != nil { + l.Info("failed to decode signatures", zap.Error(err)) return nil } - if !bytes.Equal(sig.PublicKeyBytes(), node.info.PublicKey()) { - l.Info("public key differs in object meta signature") + for i, sig := range sigs { + if !bytes.Equal(sig.PublicKeyBytes(), node.info.PublicKey()) { + l.Info("public key differs in object meta signature", zap.Int("signature index", i)) + continue + } + + if !sig.Verify(t.objSharedMeta) { + l.Info("metadata: signature verification failed", zap.Int("signature index", i)) + continue + } + + t.metaMtx.Lock() + t.collectedSignatures = append(t.collectedSignatures, sig.Value()) + t.metaMtx.Unlock() + return nil } - if !sig.Verify(t.objSharedMeta) { - l.Info("meta signature verification failed", zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) + l.Info("metadata: verification failed: no valid signatures received") + } + + return nil +} + +func decodeSignatures(b []byte) ([]neofscrypto.Signature, error) { + res := make([]neofscrypto.Signature, 3) + for i := range res { + var offset int + var err error + + res[i], offset, err = decodeSignature(b) + if err != nil { + return nil, fmt.Errorf("decoding %d signature from proto message: %w", i, err) } - t.metaMtx.Lock() - t.collectedSignatures = append(t.collectedSignatures, sig.Value()) - t.metaMtx.Unlock() + b = b[offset:] } - return nil + return res, nil +} + +func decodeSignature(b []byte) (neofscrypto.Signature, int, error) { + if len(b) < 4 { + return neofscrypto.Signature{}, 0, fmt.Errorf("unexpected signature format: len: %d", len(b)) + } + l := int(binary.LittleEndian.Uint32(b[:4])) + if len(b) < 4+l { + return neofscrypto.Signature{}, 0, fmt.Errorf("unexpected signature format: len: %d, len claimed: %d", len(b), l) + } + + sig := new(refs.Signature) + err := sig.Unmarshal(b[4 : 4+l]) + if err != nil { + return neofscrypto.Signature{}, 0, fmt.Errorf("decoding signature from proto message: %w", err) + } + + var res neofscrypto.Signature + err = res.ReadFromV2(*sig) + if err != nil { + return neofscrypto.Signature{}, 0, fmt.Errorf("invalid signature: %w", err) + } + + return res, 4 + l, nil } type errNotEnoughNodes struct { diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index bf156d715a..77508e69b2 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -8,7 +8,6 @@ import ( objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-sdk-go/checksum" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/tzhash/tz" @@ -48,7 +47,7 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet return nil } -func (t *localTarget) Close() (oid.ID, *neofscrypto.Signature, error) { +func (t *localTarget) Close() (oid.ID, []byte, error) { err := putObjectLocally(t.storage, t.obj, t.meta, &t.enc) if err != nil { return oid.ID{}, nil, err diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index a484822741..256f7931dd 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -10,7 +10,6 @@ import ( objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -54,13 +53,13 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta, return nil } -func (t *remoteTarget) Close() (oid.ID, *neofscrypto.Signature, error) { +func (t *remoteTarget) Close() (oid.ID, []byte, error) { if t.enc.hdrOff > 0 { - sig, err := t.transport.SendReplicationRequestToNode(t.ctx, t.enc.b, t.nodeInfo) + sigs, err := t.transport.SendReplicationRequestToNode(t.ctx, t.enc.b, t.nodeInfo) if err != nil { return oid.ID{}, nil, fmt.Errorf("replicate object to remote node (key=%x): %w", t.nodeInfo.PublicKey(), err) } - return t.obj.GetID(), sig, nil + return t.obj.GetID(), sigs, nil } var sessionInfo *util.SessionInfo diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index faa87e02c0..8b7b6e72c0 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -11,7 +11,6 @@ import ( objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -37,7 +36,7 @@ type Option func(*cfg) type Transport interface { // SendReplicationRequestToNode sends a prepared replication request message to // the specified remote node. - SendReplicationRequestToNode(ctx context.Context, req []byte, node client.NodeInfo) (*neofscrypto.Signature, error) + SendReplicationRequestToNode(ctx context.Context, req []byte, node client.NodeInfo) ([]byte, error) } type ClientConstructor interface { @@ -96,7 +95,7 @@ type cfg struct { fmtValidatorOpts []object.FormatValidatorOption - networkState netmap.State + networkState netmap.StateDetailed clientConstructor ClientConstructor @@ -178,7 +177,7 @@ func WithWorkerPools(remote, local util.WorkerPool) Option { } } -func WithNetworkState(v netmap.State) Option { +func WithNetworkState(v netmap.StateDetailed) Option { return func(c *cfg) { c.networkState = v c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithNetState(v)) diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index fbaa631eaa..cec1c31b32 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -208,7 +208,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock) return &distributedTarget{ - networkMagicNumber: p.networkMagic, + currentBlock: p.networkState.CurrentBlock(), + currentEpochDuration: p.networkState.CurrentEpochDuration(), + networkMagicNumber: p.networkMagic, placementIterator: placementIterator{ log: p.log, neoFSNet: p.neoFSNet,