Skip to content

Commit

Permalink
slicer: Allow to specify known size of the full payload
Browse files Browse the repository at this point in the history
Previously, there was no ability to specify pre-known size of payload of
the object to be sliced. The main drawback was the slicer's inability to
determine the optimal buffer size needed to read the payload.
Therefore, the slicer always allocated a buffer of `MaxObjectSize` size.
With this behavior, the smaller the size of the loaded payload (up to 0),
the more memory was wasted. For example, this could lead to 64MB
allocations for 1K objects which is a 65,000-fold excess.

Now the slicer supports an optional fixed payload size via method
`Options.SetPayloadSize` method. When used, this option tunes behavior
to allocate payload buffer according to the size. The option could be
used with files, in-memory slices and other cases to improve application
performance.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 15, 2024
1 parent eb9cf2d commit 4aebe56
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 15 deletions.
24 changes: 20 additions & 4 deletions object/slicer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Options struct {

sessionToken *session.Object
bearerToken *bearer.Token

payloadSizeKnown bool
payloadSize uint64
}

// SetObjectPayloadLimit specifies data size limit for produced physically
Expand Down Expand Up @@ -55,16 +58,29 @@ func (x *Options) SetCopiesNumber(copiesNumber uint32) {
x.copiesNumber = copiesNumber
}

// SetPayloadBuffer sets pre-allocated payloadBuffer to be used to object uploading.
// The payloadBuffer should have length at least MaxObjectSize+1 from NeoFS,
// otherwise, it does not affect anything.
// SetPayloadBuffer sets pre-allocated payloadBuffer to be used to object
// uploading. The payloadBuffer should have length at least
// [Options.ObjectPayloadLimit] + 1. If [Options.SetPayloadSize] is called, its
// argument is used as buffer size requirement instead. Any smaller buffer is
// ignored.
func (x *Options) SetPayloadBuffer(payloadBuffer []byte) {
x.payloadBuffer = payloadBuffer
}

// SetPayloadSize allows to specify object's payload size known in advance. if
// the size is fixed, the option is recommended as it improves the performance
// of the application using the [Slicer].
func (x *Options) SetPayloadSize(size uint64) {
x.payloadSizeKnown = true
x.payloadSize = size
}

// ObjectPayloadLimit returns required max object size.
func (x *Options) ObjectPayloadLimit() uint64 {
return x.objectPayloadLimit
if x.objectPayloadLimit > 0 {
return x.objectPayloadLimit
}
return defaultPayloadSizeLimit
}

// CurrentNeoFSEpoch returns epoch.
Expand Down
28 changes: 18 additions & 10 deletions object/slicer/slicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,6 @@ func InitPut(ctx context.Context, ow ObjectWriter, header object.Object, signer

const defaultPayloadSizeLimit = 1 << 20

// childPayloadSizeLimit returns configured size limit of the child object's
// payload which defaults to 1MB.
func childPayloadSizeLimit(opts Options) uint64 {
if opts.objectPayloadLimit > 0 {
return opts.objectPayloadLimit
}
return defaultPayloadSizeLimit
}

func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.Reader, signer user.Signer, opts Options) (oid.ID, error) {
var rootID oid.ID
var n int
Expand Down Expand Up @@ -259,7 +250,15 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec
stubObject: &stubObject,
}

maxObjSize := childPayloadSizeLimit(opts)
maxObjSize := opts.ObjectPayloadLimit()
if opts.payloadSizeKnown {
res.payloadSizeKnown = true
res.payloadSize = opts.payloadSize

if opts.payloadSize < maxObjSize {
maxObjSize = opts.payloadSize
}
}

if uint64(len(opts.payloadBuffer)) > maxObjSize {
res.buf = opts.payloadBuffer[:maxObjSize+1]
Expand Down Expand Up @@ -302,8 +301,13 @@ type PayloadWriter struct {
writtenChildren []oid.ID
prmObjectPutInit client.PrmObjectPutInit
stubObject *object.Object

payloadSizeKnown bool
payloadSize uint64
}

var errPayloadSizeExceeded = errors.New("payload size exceeded")

// Write writes next chunk of the object data. Concatenation of all chunks forms
// the payload of the final object. When the data is over, the PayloadWriter
// should be closed.
Expand All @@ -313,6 +317,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {
return 0, nil
}

if x.payloadSizeKnown && x.rootMeta.length+uint64(len(chunk)) > x.payloadSize {
return 0, errPayloadSizeExceeded
}

n, err := x.currentWriter.Write(chunk)
if err == nil || !errors.Is(err, errOverflow) {
return n, err
Expand Down
108 changes: 107 additions & 1 deletion object/slicer/slicer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,17 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options)
}

func testSlicer(t *testing.T, size, sizeLimit uint64) {
testSlicerWithKnownSize(t, size, sizeLimit, true)
testSlicerWithKnownSize(t, size, sizeLimit, false)
}

func testSlicerWithKnownSize(t *testing.T, size, sizeLimit uint64, known bool) {
in, opts := randomInput(t, size, sizeLimit)

if known {
opts.SetPayloadSize(uint64(len(in.payload)))
}

checker := &slicedObjectChecker{
opts: opts,
tb: t,
Expand All @@ -266,7 +275,7 @@ func testSlicer(t *testing.T, size, sizeLimit uint64) {
for i := object.TypeRegular; i <= object.TypeLock; i++ {
in.objectType = i

t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) {
t.Run(fmt.Sprintf("slicer with %s,known_size=%t", i.EncodeToString(), known), func(t *testing.T) {
testSlicerByHeaderType(t, checker, in, opts)
})
}
Expand Down Expand Up @@ -953,3 +962,100 @@ func BenchmarkReadPayloadBuffer(b *testing.B) {
})
}
}

func TestKnownPayloadSize(t *testing.T) {
ctx := context.Background()
t.Run("overflow", func(t *testing.T) {
t.Run("read", func(t *testing.T) {
in, opts := randomInput(t, 1, 1)
obj := objecttest.Object(t)
hdr := *obj.CutPayload()

opts.SetPayloadSize(20)
r := bytes.NewReader(make([]byte, 21))

_, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts)
require.ErrorContains(t, err, "payload size exceeded")
})

t.Run("write", func(t *testing.T) {
in, opts := randomInput(t, 1, 1)
obj := objecttest.Object(t)
hdr := *obj.CutPayload()

opts.SetPayloadSize(20)

w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts)
require.NoError(t, err)

for i := byte(0); i < 21; i++ {
_, err = w.Write([]byte{1})
if i < 20 {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, "payload size exceeded")
}
}
})
})
}

func BenchmarkKnownPayloadSize(b *testing.B) {
ctx := context.Background()
for _, tc := range []struct {
sizeLimit uint64
size uint64
}{
{sizeLimit: 1 << 10, size: 1},
{sizeLimit: 1 << 10, size: 1 << 10},
{sizeLimit: 1 << 10, size: 10 << 10},
} {
b.Run(fmt.Sprintf("limit=%d,size=%d", tc.sizeLimit, tc.size), func(b *testing.B) {
b.Run("read", func(b *testing.B) {
obj := objecttest.Object(b)
hdr := *obj.CutPayload()
signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b))
payload := make([]byte, tc.size)
rand.Read(payload)

var opts slicer.Options
opts.SetObjectPayloadLimit(tc.sizeLimit)
opts.SetPayloadSize(tc.size)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, signer, bytes.NewReader(payload), opts)
require.NoError(b, err)
}
})

b.Run("write", func(b *testing.B) {
obj := objecttest.Object(b)
hdr := *obj.CutPayload()
signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b))
payload := make([]byte, tc.size)
rand.Read(payload)

var opts slicer.Options
opts.SetObjectPayloadLimit(tc.sizeLimit)
opts.SetPayloadSize(tc.size)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, signer, opts)
require.NoError(b, err)

_, err = w.Write(payload)
if err == nil {
err = w.Close()
}
require.NoError(b, err)
}
})
})
}
}

0 comments on commit 4aebe56

Please sign in to comment.