diff --git a/client/common.go b/client/common.go index e5aa86bb..0147771e 100644 --- a/client/common.go +++ b/client/common.go @@ -11,6 +11,13 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/version" ) +// Various field numbers in from NeoFS API definitions. +const ( + fieldNumSigPubKey = 1 + fieldNumSigVal = 2 + fieldNumSigScheme = 3 +) + // groups meta parameters shared between all Client operations. type prmCommonMeta struct { // NeoFS request X-Headers diff --git a/client/object_put.go b/client/object_put.go index 46d9855f..5616bfbe 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -5,17 +5,26 @@ import ( "errors" "fmt" "io" + "math" + "os" "github.com/nspcc-dev/neofs-api-go/v2/acl" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/message" + "github.com/nspcc-dev/neofs-api-go/v2/status" "github.com/nspcc-dev/neofs-sdk-go/bearer" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/protobuf/encoding/protowire" ) var ( @@ -325,3 +334,171 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer us return &w, nil } + +// ReplicateObjectOptions groups [Client.ReplicateObject] options tuning the +// default behavior. +type ReplicateObjectOptions struct{} + +// ReplicateObject sends binary-encoded NeoFS object from the given +// [io.ReadSeeker] to remote server for local storage. The signer must +// authenticate a subject that matches the object's storage policy. Since this +// property can change over NeoFS system time, compliance with the policy is +// checked back to foreseeable moment in the past. The recipient must comply +// with the current object's storage policy. +// +// ReplicateObject is intended for maintaining data storage by NeoFS system +// nodes only, not for regular use. +// +// Object must be encoded in compliance with Protocol Buffers V3 format. +// TODO: more requirements? +// +// Source [io.ReadSeeker] must point to the start. If it provides Size() int64 +// method, it is used as a definition of the object size. +// NOTE: ReplicateObject does not reset src to start after the call. If it is +// needed, do not forget to Seek. +// +// See ReplicateObjectOptions to tune the behavior. +func (c *Client) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer, _ ReplicateObjectOptions) error { + if signer == nil { + // note that we don't stat this error + return ErrMissingSigner + } + + var err error + defer func() { + c.sendStatistic(stat.MethodObjectPut, err)() + }() + + const svcName = "neo.fs.v2.object.ObjectService" + const opName = "Replicate" + stream, err := c.c.Init(common.CallMethodInfoUnary(svcName, opName), + client.WithContext(ctx), client.AllowBinarySendingOnly()) + if err != nil { + return fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err) + } + + var objSize int64 + switch v := src.(type) { + default: + objSize, err = src.Seek(0, io.SeekEnd) + if err != nil { + return fmt.Errorf("seek to end: %w", err) + } else if objSize < 0 { + return fmt.Errorf("seek to end returned negative value %d", objSize) + } + + _, err = src.Seek(-objSize, io.SeekCurrent) + if err != nil { + return fmt.Errorf("seek to back to initial pos: %w", err) + } + case *os.File: + fi, err := v.Stat() + if err != nil { + return fmt.Errorf("get file info: %w", err) + } + + objSize = fi.Size() + if objSize > math.MaxUint32 { + return fmt.Errorf("too big file %d bytes", objSize) + } + case interface{ Size() int64 }: // e.g. *bytes.Reader + objSize = v.Size() + if objSize < 0 { + return fmt.Errorf("negative Size return %d", objSize) + } + } + + // calculate template signature to know its size + sigTmpl, err := signer.Sign(nil) + if err != nil { + return fmt.Errorf("calculate signature template: %w", err) + } + + bPubKey := neofscrypto.PublicKeyBytes(signer.Public()) + sigScheme := uint64(signer.Scheme()) + + const fieldNumObject = 1 + const fieldNumSignature = 2 + + sigSize := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(bPubKey)) + + protowire.SizeTag(fieldNumSigVal) + +protowire.SizeBytes(len(sigTmpl)) + + protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(sigScheme) + + // FIXME: invalid num casts + msgSize := protowire.SizeTag(fieldNumObject) + protowire.SizeBytes(int(objSize)) + + protowire.SizeTag(fieldNumSignature) + sigSize + + // TODO: support custom allocation (e.g. use sync.Pool) + msg := make([]byte, 0, msgSize) + + msg = protowire.AppendTag(msg, fieldNumObject, protowire.BytesType) + msg = protowire.AppendVarint(msg, uint64(objSize)) + msg = msg[:int64(len(msg))+objSize] + + bufObj := msg[int64(len(msg))-objSize:] + _, err = io.ReadFull(src, bufObj) + if err != nil { + return fmt.Errorf("read full object into message buffer: %w", err) + } + + objSig, err := signer.Sign(bufObj) + if err != nil { + return fmt.Errorf("sign binary object: %w", err) + } + + msg = protowire.AppendTag(msg, fieldNumSignature, protowire.BytesType) + msg = protowire.AppendVarint(msg, uint64(sigSize)) + msg = protowire.AppendTag(msg, fieldNumSigPubKey, protowire.BytesType) + msg = protowire.AppendBytes(msg, bPubKey) + msg = protowire.AppendTag(msg, fieldNumSigVal, protowire.BytesType) + msg = protowire.AppendBytes(msg, objSig) + msg = protowire.AppendTag(msg, fieldNumSigScheme, protowire.VarintType) + msg = protowire.AppendVarint(msg, sigScheme) + + err = stream.WriteMessage(client.BinaryMessage(msg)) + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("send request: %w", err) + } + + var resp replicateResponse + err = stream.ReadMessage(&resp) + if err != nil { + if errors.Is(err, io.EOF) { + err = io.ErrUnexpectedEOF + } + + return fmt.Errorf("recv response: %w", err) + } + + _ = stream.Close() + + return resp.err +} + +type replicateResponse struct { + err error +} + +func (x replicateResponse) ToGRPCMessage() grpc.Message { + return new(objectgrpc.ReplicationResponse) +} + +func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error { + m, ok := gm.(*objectgrpc.ReplicationResponse) + if !ok { + return message.NewUnexpectedMessageType(gm, m) + } + + var st *status.Status + if mst := m.GetStatus(); mst != nil { + st = new(status.Status) + err := st.FromGRPCMessage(mst) + if err != nil { + return fmt.Errorf("decode response status: %w", err) + } + } + + x.err = apistatus.ErrorFromV2(st) + + return nil +} diff --git a/go.mod b/go.mod index 4b13a185..0872a5b5 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,12 @@ require ( github.com/mr-tron/base58 v1.2.0 github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 github.com/nspcc-dev/neo-go v0.102.0 - github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 + github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c github.com/nspcc-dev/tzhash v1.7.1 github.com/stretchr/testify v1.8.4 github.com/testcontainers/testcontainers-go v0.24.1 go.uber.org/zap v1.26.0 + google.golang.org/protobuf v1.31.0 ) require ( @@ -60,12 +61,11 @@ require ( golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.13.0 // indirect - golang.org/x/net v0.16.0 // indirect + golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect - google.golang.org/grpc v1.57.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a6abce8d..a2323c0b 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,8 @@ github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 h1:zXkCRGTHqhkBRJ github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43/go.mod h1:BGU4YsuoFXjQddsCfUXpq5uNr2A8W4PrWbiljdD/TpU= github.com/nspcc-dev/neo-go v0.102.0 h1:O2Gt4JPOWmp0c+PnPWwd2wPI74BKSwkaNCEyvyQTWJw= github.com/nspcc-dev/neo-go v0.102.0/go.mod h1:QXxpZxJT2KedwM0Nlj8UO0/fZN2WIe4h/i03uBHKbnc= -github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 h1:jhuN8Ldqz7WApvUJRFY0bjRXE1R3iCkboMX5QVZhHVk= -github.com/nspcc-dev/neofs-api-go/v2 v2.14.0/go.mod h1:DRIr0Ic1s+6QgdqmNFNLIqMqd7lNMJfYwkczlm1hDtM= +github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c h1:c6cdp97RLYk90gfM7zk5Odb5PwWggP3UFkxFPACVQLE= +github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c/go.mod h1:eaffSBIGhXUIMYvRBGXmlgQRLyyCWlzOft9jGYlqwrw= github.com/nspcc-dev/neofs-crypto v0.4.0 h1:5LlrUAM5O0k1+sH/sktBtrgfWtq1pgpDs09fZo+KYi4= github.com/nspcc-dev/neofs-crypto v0.4.0/go.mod h1:6XJ8kbXgOfevbI2WMruOtI+qUJXNwSGM/E9eClXxPHs= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= @@ -91,7 +91,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/shirou/gopsutil/v3 v3.23.7 h1:C+fHO8hfIppoJ1WdsVm1RoI0RwXoNdfTK7yWXV0wVj4= github.com/shirou/gopsutil/v3 v3.23.7/go.mod h1:c4gnmoRC0hQuaLqvxnx1//VXQ0Ms/X9UnJF8pddY5z4= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= @@ -140,8 +140,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= -golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -172,10 +172,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 h1:eSaPbMR4T7WfH9FvABk36NBMacoTUKdWCvV0dx+KfOg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5/go.mod h1:zBEcrKX2ZOcEkHWxBPAIvYUWOKKMIhYcmNiUIu2ji3I= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=