diff --git a/client/object_replicate.go b/client/object_replicate.go index a7d9b05d..57c1ce73 100644 --- a/client/object_replicate.go +++ b/client/object_replicate.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "sync" objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" @@ -36,6 +37,8 @@ import ( // does not reset src to start after the call. If it is needed, do not forget to // Seek. // +// See also [DemuxReplicatedObject]. +// // Return errors: // - [apistatus.ErrServerInternal]: internal server error described in the text message; // - [apistatus.ErrObjectAccessDenied]: the signer does not authenticate any @@ -77,7 +80,49 @@ func (c *Client) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer return resp.err } +// DemuxReplicatedObject allows to share same argument between multiple +// [Client.ReplicateObject] calls for deduplication of network messages. This +// option should be used with caution and only to achieve traffic demux +// optimization goals. +func DemuxReplicatedObject(src io.ReadSeeker) io.ReadSeeker { + return &demuxReplicationMessage{ + rs: src, + } +} + +type demuxReplicationMessage struct { + rs io.ReadSeeker + + mtx sync.Mutex + msg []byte + err error +} + +func (x *demuxReplicationMessage) Read(p []byte) (n int, err error) { + return x.rs.Read(p) +} + +func (x *demuxReplicationMessage) Seek(offset int64, whence int) (int64, error) { + return x.rs.Seek(offset, whence) +} + func prepareReplicateMessage(src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) { + srm, ok := src.(*demuxReplicationMessage) + if !ok { + return newReplicateMessage(src, signer) + } + + srm.mtx.Lock() + defer srm.mtx.Unlock() + + if srm.msg == nil && srm.err == nil { + srm.msg, srm.err = newReplicateMessage(src, signer) + } + + return srm.msg, srm.err +} + +func newReplicateMessage(src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) { var objSize uint64 switch v := src.(type) { default: diff --git a/client/object_replicate_test.go b/client/object_replicate_test.go index b9d22e2e..bc0a0424 100644 --- a/client/object_replicate_test.go +++ b/client/object_replicate_test.go @@ -6,8 +6,10 @@ import ( "crypto/rand" "fmt" "net" + "sync" "testing" + "github.com/nspcc-dev/neo-go/pkg/util/slice" objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" @@ -178,4 +180,32 @@ func TestClient_ReplicateObject(t *testing.T) { require.ErrorIs(t, err, tc.expErr, tc.desc) } }) + + t.Run("demux", func(t *testing.T) { + demuxObj := DemuxReplicatedObject(bytes.NewReader(bObj)) + _, cli := serveObjectReplication(t, signer, obj) + + err := cli.ReplicateObject(ctx, demuxObj, signer) + require.NoError(t, err) + + msgCp := slice.Copy(demuxObj.(*demuxReplicationMessage).msg) + initBufPtr := &demuxObj.(*demuxReplicationMessage).msg[0] + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + err := cli.ReplicateObject(ctx, demuxObj, signer) + fmt.Println(err) + require.NoError(t, err) + }() + } + + wg.Wait() + + require.Equal(t, msgCp, demuxObj.(*demuxReplicationMessage).msg) + require.Equal(t, initBufPtr, &demuxObj.(*demuxReplicationMessage).msg[0]) + }) }