Skip to content

Commit

Permalink
client: Provide method for binary object replication
Browse files Browse the repository at this point in the history
NeoFS storage nodes replicate objects between each other to follow the
objects' storage policies. The nodes store objects in Protocol Buffers
V3 binary format. Previously, to transmit them, there was a need to
almost completely decode the object. NeoFS API V2 protocol was recently
extended with `neo.fs.v2.object.ObjectService.Replicate` RPC that
allows to transmit binary object in one message (unlike complex `Put`
streaming RPC). SDK should provide API for new service RPC clients.

Add `ReplicateObject` method that works with binary objects accessed
through `io.ReadSeeker` stream. The method is optimized to allocate
buffer for the whole object once.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Dec 14, 2023
1 parent d72d76c commit f984d48
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 14 deletions.
7 changes: 7 additions & 0 deletions client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/version"
)

// Various field numbers in from NeoFS API definitions.
const (
fieldNumSigPubKey = 1
fieldNumSigVal = 2
fieldNumSigScheme = 3
)

// groups meta parameters shared between all Client operations.
type prmCommonMeta struct {
// NeoFS request X-Headers
Expand Down
177 changes: 177 additions & 0 deletions client/object_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@ import (
"errors"
"fmt"
"io"
"math"
"os"

"github.com/nspcc-dev/neofs-api-go/v2/acl"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/message"
"github.com/nspcc-dev/neofs-api-go/v2/status"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"google.golang.org/protobuf/encoding/protowire"
)

var (
Expand Down Expand Up @@ -325,3 +334,171 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer us

return &w, nil
}

// ReplicateObjectOptions groups [Client.ReplicateObject] options tuning the
// default behavior.
type ReplicateObjectOptions struct{}

// ReplicateObject sends binary-encoded NeoFS object from the given
// [io.ReadSeeker] to remote server for local storage. The signer must
// authenticate a subject that matches the object's storage policy. Since this
// property can change over NeoFS system time, compliance with the policy is
// checked back to foreseeable moment in the past. The recipient must comply
// with the current object's storage policy.
//
// ReplicateObject is intended for maintaining data storage by NeoFS system
// nodes only, not for regular use.
//
// Object must be encoded in compliance with Protocol Buffers V3 format.
// TODO: more requirements?
//
// Source [io.ReadSeeker] must point to the start. If it provides Size() int64
// method, it is used as a definition of the object size.
// NOTE: ReplicateObject does not reset src to start after the call. If it is
// needed, do not forget to Seek.
//
// See ReplicateObjectOptions to tune the behavior.
func (c *Client) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer, _ ReplicateObjectOptions) error {
if signer == nil {
// note that we don't stat this error
return ErrMissingSigner
}

var err error
defer func() {
c.sendStatistic(stat.MethodObjectPut, err)()
}()

const svcName = "neo.fs.v2.object.ObjectService"
const opName = "Replicate"
stream, err := c.c.Init(common.CallMethodInfoUnary(svcName, opName),
client.WithContext(ctx), client.AllowBinarySendingOnly())
if err != nil {
return fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err)
}

var objSize int64
switch v := src.(type) {
default:
objSize, err = src.Seek(0, io.SeekEnd)
if err != nil {
return fmt.Errorf("seek to end: %w", err)
} else if objSize < 0 {
return fmt.Errorf("seek to end returned negative value %d", objSize)
}

_, err = src.Seek(-objSize, io.SeekCurrent)
if err != nil {
return fmt.Errorf("seek to back to initial pos: %w", err)
}
case *os.File:
fi, err := v.Stat()
if err != nil {
return fmt.Errorf("get file info: %w", err)
}

objSize = fi.Size()
if objSize > math.MaxUint32 {
return fmt.Errorf("too big file %d bytes", objSize)
}
case interface{ Size() int64 }: // e.g. *bytes.Reader
objSize = v.Size()
if objSize < 0 {
return fmt.Errorf("negative Size return %d", objSize)
}
}

// calculate template signature to know its size
sigTmpl, err := signer.Sign(nil)
if err != nil {
return fmt.Errorf("calculate signature template: %w", err)
}

bPubKey := neofscrypto.PublicKeyBytes(signer.Public())
sigScheme := uint64(signer.Scheme())

const fieldNumObject = 1
const fieldNumSignature = 2

sigSize := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(bPubKey)) +
protowire.SizeTag(fieldNumSigVal) + +protowire.SizeBytes(len(sigTmpl)) +
protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(sigScheme)

// FIXME: invalid num casts
msgSize := protowire.SizeTag(fieldNumObject) + protowire.SizeBytes(int(objSize)) +
protowire.SizeTag(fieldNumSignature) + sigSize

// TODO: support custom allocation (e.g. use sync.Pool)
msg := make([]byte, 0, msgSize)

msg = protowire.AppendTag(msg, fieldNumObject, protowire.BytesType)
msg = protowire.AppendVarint(msg, uint64(objSize))
msg = msg[:int64(len(msg))+objSize]

bufObj := msg[int64(len(msg))-objSize:]
_, err = io.ReadFull(src, bufObj)
if err != nil {
return fmt.Errorf("read full object into message buffer: %w", err)
}

objSig, err := signer.Sign(bufObj)
if err != nil {
return fmt.Errorf("sign binary object: %w", err)
}

msg = protowire.AppendTag(msg, fieldNumSignature, protowire.BytesType)
msg = protowire.AppendVarint(msg, uint64(sigSize))
msg = protowire.AppendTag(msg, fieldNumSigPubKey, protowire.BytesType)
msg = protowire.AppendBytes(msg, bPubKey)
msg = protowire.AppendTag(msg, fieldNumSigVal, protowire.BytesType)
msg = protowire.AppendBytes(msg, objSig)
msg = protowire.AppendTag(msg, fieldNumSigScheme, protowire.VarintType)
msg = protowire.AppendVarint(msg, sigScheme)

err = stream.WriteMessage(client.BinaryMessage(msg))
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("send request: %w", err)
}

var resp replicateResponse
err = stream.ReadMessage(&resp)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.ErrUnexpectedEOF
}

return fmt.Errorf("recv response: %w", err)
}

_ = stream.Close()

return resp.err
}

type replicateResponse struct {
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message {
return new(objectgrpc.ReplicationResponse)
}

func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
m, ok := gm.(*objectgrpc.ReplicationResponse)
if !ok {
return message.NewUnexpectedMessageType(gm, m)
}

var st *status.Status
if mst := m.GetStatus(); mst != nil {
st = new(status.Status)
err := st.FromGRPCMessage(mst)
if err != nil {
return fmt.Errorf("decode response status: %w", err)
}
}

x.err = apistatus.ErrorFromV2(st)

return nil
}
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ require (
github.com/mr-tron/base58 v1.2.0
github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43
github.com/nspcc-dev/neo-go v0.102.0
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c
github.com/nspcc-dev/tzhash v1.7.1
github.com/stretchr/testify v1.8.4
github.com/testcontainers/testcontainers-go v0.24.1
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
)

require (
Expand Down Expand Up @@ -60,12 +61,11 @@ require (
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.16.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 9 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 h1:zXkCRGTHqhkBRJ
github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43/go.mod h1:BGU4YsuoFXjQddsCfUXpq5uNr2A8W4PrWbiljdD/TpU=
github.com/nspcc-dev/neo-go v0.102.0 h1:O2Gt4JPOWmp0c+PnPWwd2wPI74BKSwkaNCEyvyQTWJw=
github.com/nspcc-dev/neo-go v0.102.0/go.mod h1:QXxpZxJT2KedwM0Nlj8UO0/fZN2WIe4h/i03uBHKbnc=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 h1:jhuN8Ldqz7WApvUJRFY0bjRXE1R3iCkboMX5QVZhHVk=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0/go.mod h1:DRIr0Ic1s+6QgdqmNFNLIqMqd7lNMJfYwkczlm1hDtM=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c h1:c6cdp97RLYk90gfM7zk5Odb5PwWggP3UFkxFPACVQLE=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c/go.mod h1:eaffSBIGhXUIMYvRBGXmlgQRLyyCWlzOft9jGYlqwrw=
github.com/nspcc-dev/neofs-crypto v0.4.0 h1:5LlrUAM5O0k1+sH/sktBtrgfWtq1pgpDs09fZo+KYi4=
github.com/nspcc-dev/neofs-crypto v0.4.0/go.mod h1:6XJ8kbXgOfevbI2WMruOtI+qUJXNwSGM/E9eClXxPHs=
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
Expand All @@ -91,7 +91,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/shirou/gopsutil/v3 v3.23.7 h1:C+fHO8hfIppoJ1WdsVm1RoI0RwXoNdfTK7yWXV0wVj4=
github.com/shirou/gopsutil/v3 v3.23.7/go.mod h1:c4gnmoRC0hQuaLqvxnx1//VXQ0Ms/X9UnJF8pddY5z4=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
Expand Down Expand Up @@ -140,8 +140,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -172,10 +172,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 h1:eSaPbMR4T7WfH9FvABk36NBMacoTUKdWCvV0dx+KfOg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5/go.mod h1:zBEcrKX2ZOcEkHWxBPAIvYUWOKKMIhYcmNiUIu2ji3I=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
Expand Down

0 comments on commit f984d48

Please sign in to comment.