Skip to content

Commit

Permalink
slicer: Allow to specify known size of the full payload
Browse files Browse the repository at this point in the history
Previously, there was no ability to specify pre-known size of payload of
the object to be sliced. The main drawback was the slicer's inability to
determine the optimal buffer size needed to read the payload.
Therefore, the slicer always allocated a buffer of `MaxObjectSize` size.
With this behavior, the smaller the size of the loaded payload (down to
0), the more memory was wasted. For example, this could lead to 64MB
allocations for 1K objects which is a 65,000-fold excess.

Now the slicer supports an optional fixed payload size via
`Options.SetPayloadSize` method. When used, this option tunes behavior
to allocate payload buffer according to the size. The option could be
used with files, in-memory data and other cases to improve application
performance.

Closes #540.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 25, 2024
1 parent 3902eb9 commit 4a8baaa
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 1 deletion.
14 changes: 14 additions & 0 deletions object/slicer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Options struct {

sessionToken *session.Object
bearerToken *bearer.Token

payloadSizeFixed bool
payloadSize uint64
}

// SetObjectPayloadLimit specifies data size limit for produced physically
Expand Down Expand Up @@ -61,6 +64,17 @@ func (x *Options) SetPayloadBuffer(payloadBuffer []byte) {
x.payloadBuffer = payloadBuffer
}

// SetPayloadSize allows to specify object's payload size known in advance. If
// set, reading functions will read at least size bytes while writing functions
// will expect exactly size bytes.
//
// If the size is known, the option is recommended as it improves the
// performance of the application using the [Slicer].
func (x *Options) SetPayloadSize(size uint64) {
x.payloadSizeFixed = true
x.payloadSize = size
}

// ObjectPayloadLimit returns required max object size.
func (x *Options) ObjectPayloadLimit() uint64 {
return x.objectPayloadLimit
Expand Down
22 changes: 22 additions & 0 deletions object/slicer/slicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.R
return rootID, fmt.Errorf("read payload chunk: %w", err)
}

if writer.payloadSizeFixed && writer.rootMeta.length < writer.payloadSize {
return oid.ID{}, io.ErrUnexpectedEOF
}

if err = writer.Close(); err != nil {
return rootID, fmt.Errorf("writer close: %w", err)
}
Expand Down Expand Up @@ -287,10 +291,16 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec
rootMeta: newDynamicObjectMetadata(opts.withHomoChecksum),
childMeta: newDynamicObjectMetadata(opts.withHomoChecksum),
payloadSizeLimit: childPayloadSizeLimit(opts),
payloadSizeFixed: opts.payloadSizeFixed,
payloadSize: opts.payloadSize,
prmObjectPutInit: prm,
stubObject: &stubObject,
}

if res.payloadSizeFixed && res.payloadSize < res.payloadSizeLimit {
res.payloadSizeLimit = res.payloadSize
}

res.payloadBuffer = opts.payloadBuffer
res.rootMeta.reset()
res.metaWriter = &res.rootMeta
Expand Down Expand Up @@ -320,6 +330,8 @@ type PayloadWriter struct {

// max payload size of produced objects in bytes
payloadSizeLimit uint64
payloadSizeFixed bool
payloadSize uint64

metaWriter io.Writer

Expand All @@ -331,6 +343,8 @@ type PayloadWriter struct {
stubObject *object.Object
}

var errPayloadSizeExceeded = errors.New("payload size exceeded")

// Write writes next chunk of the object data. Concatenation of all chunks forms
// the payload of the final object. When the data is over, the PayloadWriter
// should be closed.
Expand All @@ -340,6 +354,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {
return 0, nil
}

if x.payloadSizeFixed && x.rootMeta.length+uint64(len(chunk)) > x.payloadSize {
return 0, errPayloadSizeExceeded
}

buffered := x.rootMeta.length
if x.withSplit {
buffered = x.childMeta.length
Expand Down Expand Up @@ -432,6 +450,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {
// Close finalizes object with written payload data, saves the object and closes
// the stream. Reference to the stored object can be obtained by ID method.
func (x *PayloadWriter) Close() error {
if x.payloadSizeFixed && x.rootMeta.length < x.payloadSize {
return io.ErrUnexpectedEOF
}

buffered := x.rootMeta.length
if x.withSplit {
buffered = x.childMeta.length
Expand Down
139 changes: 138 additions & 1 deletion object/slicer/slicer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,17 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options)
}

func testSlicer(t *testing.T, size, sizeLimit uint64) {
testSlicerWithKnownSize(t, size, sizeLimit, true)
testSlicerWithKnownSize(t, size, sizeLimit, false)
}

func testSlicerWithKnownSize(t *testing.T, size, sizeLimit uint64, known bool) {
in, opts := randomInput(t, size, sizeLimit)

if known {
opts.SetPayloadSize(uint64(len(in.payload)))
}

checker := &slicedObjectChecker{
opts: opts,
tb: t,
Expand All @@ -266,7 +275,7 @@ func testSlicer(t *testing.T, size, sizeLimit uint64) {
for i := object.TypeRegular; i <= object.TypeLock; i++ {
in.objectType = i

t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) {
t.Run(fmt.Sprintf("slicer with %s,known_size=%t", i.EncodeToString(), known), func(t *testing.T) {
testSlicerByHeaderType(t, checker, in, opts)
})
}
Expand Down Expand Up @@ -1008,3 +1017,131 @@ func TestOptions_SetPayloadBuffer(t *testing.T) {
})
}
}

func TestKnownPayloadSize(t *testing.T) {
ctx := context.Background()
t.Run("overflow", func(t *testing.T) {
t.Run("read", func(t *testing.T) {
in, opts := randomInput(t, 1, 1)
obj := objecttest.Object(t)
hdr := *obj.CutPayload()

opts.SetPayloadSize(20)
r := bytes.NewReader(make([]byte, 21))

_, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts)
require.ErrorContains(t, err, "payload size exceeded")
})

t.Run("write", func(t *testing.T) {
in, opts := randomInput(t, 1, 1)
obj := objecttest.Object(t)
hdr := *obj.CutPayload()

opts.SetPayloadSize(20)

w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts)
require.NoError(t, err)

for i := byte(0); i < 21; i++ {
_, err = w.Write([]byte{1})
if i < 20 {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, "payload size exceeded")
}
}
})
})

t.Run("flaw", func(t *testing.T) {
t.Run("read", func(t *testing.T) {
in, opts := randomInput(t, 1, 1)
obj := objecttest.Object(t)
hdr := *obj.CutPayload()

opts.SetPayloadSize(20)
r := bytes.NewReader(make([]byte, 19))

_, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts)
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
})

t.Run("write", func(t *testing.T) {
in, opts := randomInput(t, 1, 1)
obj := objecttest.Object(t)
hdr := *obj.CutPayload()

opts.SetPayloadSize(20)

w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts)
require.NoError(t, err)

_, err = w.Write(make([]byte, 19))
require.NoError(t, err)

err = w.Close()
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
})
})
}

func BenchmarkKnownPayloadSize(b *testing.B) {
ctx := context.Background()
for _, tc := range []struct {
sizeLimit uint64
size uint64
}{
{sizeLimit: 1 << 10, size: 1},
{sizeLimit: 1 << 10, size: 1 << 10},
{sizeLimit: 1 << 10, size: 10 << 10},
} {
b.Run(fmt.Sprintf("limit=%d,size=%d", tc.sizeLimit, tc.size), func(b *testing.B) {
b.Run("read", func(b *testing.B) {
obj := objecttest.Object(b)
hdr := *obj.CutPayload()
signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b))
payload := make([]byte, tc.size)
rand.Read(payload)

var opts slicer.Options
opts.SetObjectPayloadLimit(tc.sizeLimit)
opts.SetPayloadSize(tc.size)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, signer, bytes.NewReader(payload), opts)
require.NoError(b, err)
}
})

b.Run("write", func(b *testing.B) {
obj := objecttest.Object(b)
hdr := *obj.CutPayload()
signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b))
payload := make([]byte, tc.size)
rand.Read(payload)

var opts slicer.Options
opts.SetObjectPayloadLimit(tc.sizeLimit)
opts.SetPayloadSize(tc.size)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, signer, opts)
require.NoError(b, err)

_, err = w.Write(payload)
if err == nil {
err = w.Close()
}
require.NoError(b, err)
}
})
})
}
}

0 comments on commit 4a8baaa

Please sign in to comment.