Skip to content

Commit

Permalink
slicer: Use payload buffer in write flow and reduce buffer allocations
Browse files Browse the repository at this point in the history
Previously, `slicer` package functions unconditionally allocated buffer
of M size, where M is a size limit of produced objects. In practice, M
is a NeoFS network setting, usually = 64MB. This led to a huge waste of
resources, the costs of which grew with a decrease of real user data.

The solution is smarter buffering based on actual incoming data. One of
the buffers is a user selectable option. It can act as a swap and reduce
internal costs. The secondary disadvantage was that this buffer did not
help in any way with user-write functions, while it is an option for
them as well.

Now:
 * the external buffer is always used as a swap;
 * maximum of M data is buffered
 * if the external buffer is of sufficient size, the entire process is
   utilized in it without additional allocations

Benchmark results:
```
goos: linux
goarch: amd64
pkg: github.com/nspcc-dev/neofs-sdk-go/object/slicer
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
                                                                     │     old.txt     │                new.txt                 │
                                                                     │     sec/op      │     sec/op      vs base                │
SliceDataIntoObjects/slice_1-128/reader-8                                46.75µ ±   8%    57.38µ ±  90%        ~ (p=0.063 n=10)
SliceDataIntoObjects/slice_1-128/writer-8                                54.51µ ±  14%    52.50µ ± 124%        ~ (p=0.912 n=10)
SliceDataIntoObjects/slice_4-128/reader-8                                46.71µ ±  16%    45.40µ ±   5%        ~ (p=0.481 n=10)
SliceDataIntoObjects/slice_4-128/writer-8                                45.39µ ±   3%    47.78µ ±  16%   +5.28% (p=0.001 n=10)
SliceDataIntoObjects/slice_16-128/reader-8                               49.58µ ±  15%    74.13µ ±  58%  +49.54% (p=0.011 n=10)
SliceDataIntoObjects/slice_16-128/writer-8                               46.16µ ±  13%    49.42µ ±  13%   +7.07% (p=0.019 n=10)
SliceDataIntoObjects/slice_64-128/reader-8                               46.36µ ±   2%    42.61µ ±   8%   -8.10% (p=0.002 n=10)
SliceDataIntoObjects/slice_64-128/writer-8                               55.61µ ±  28%    46.24µ ±  32%  -16.86% (p=0.009 n=10)
SliceDataIntoObjects/slice_256-128/reader-8                              53.38µ ±  11%    45.47µ ±   8%  -14.81% (p=0.000 n=10)
SliceDataIntoObjects/slice_256-128/writer-8                              203.2µ ±  75%    193.2µ ±  11%        ~ (p=0.218 n=10)
SliceDataIntoObjects/slice_1024-128/reader-8                             51.81µ ±  56%    46.07µ ±   6%        ~ (p=0.436 n=10)
SliceDataIntoObjects/slice_1024-128/writer-8                             415.9µ ± 168%    357.3µ ±   1%  -14.08% (p=0.000 n=10)
SliceDataIntoObjects/slice_4096-128/reader-8                             54.70µ ±   2%    45.83µ ±   2%  -16.21% (p=0.000 n=10)
SliceDataIntoObjects/slice_4096-128/writer-8                             1.484m ±   2%    1.141m ±  17%  -23.12% (p=0.000 n=10)
SliceDataIntoObjects/slice_16384-128/reader-8                            54.37µ ± 105%    43.75µ ±  16%  -19.53% (p=0.001 n=10)
SliceDataIntoObjects/slice_16384-128/writer-8                            5.243m ±  12%    4.156m ±   2%  -20.73% (p=0.000 n=10)
SliceDataIntoObjects/slice_65536-128/reader-8                            62.57µ ±  27%    46.63µ ±   2%  -25.48% (p=0.000 n=10)
SliceDataIntoObjects/slice_65536-128/writer-8                            27.31m ±  28%    18.46m ±   4%  -32.38% (p=0.000 n=10)
SliceDataIntoObjects/slice_262144-128/reader-8                           52.19µ ±  70%    46.50µ ±   3%  -10.91% (p=0.000 n=10)
SliceDataIntoObjects/slice_262144-128/writer-8                           85.62m ±   3%    67.92m ±   3%  -20.67% (p=0.000 n=10)
SliceDataIntoObjects/slice_1048576-128/reader-8                          73.47µ ± 330%    57.18µ ±   3%  -22.17% (p=0.000 n=10)
SliceDataIntoObjects/slice_1048576-128/writer-8                          337.2m ±   6%    273.3m ±   3%  -18.93% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1/with_payload_buffer-8               46.32µ ±   1%    45.41µ ±  11%        ~ (p=0.218 n=10)
WritePayloadBuffer/limit=1024,size=1/without_payload_buffer-8            46.58µ ±   3%    44.93µ ±   4%   -3.53% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1024/with_payload_buffer-8           110.99µ ±  45%    56.69µ ±   6%  -48.93% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1024/without_payload_buffer-8         66.34µ ±  58%    54.58µ ±   2%  -17.73% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=10240/with_payload_buffer-8           851.3µ ±   9%    678.4µ ±   3%  -20.31% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=10240/without_payload_buffer-8        989.7µ ±  22%    673.9µ ±   2%  -31.91% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=204800/with_payload_buffer-8         15.805m ±  33%    6.964m ±   7%  -55.94% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=204800/without_payload_buffer-8      17.788m ±  53%    7.122m ±   6%  -59.96% (p=0.000 n=10)
WritePayloadBuffer/limit=67108864,size=1024/with_payload_buffer-8      8488.84µ ±  22%    57.68µ ±   6%  -99.32% (p=0.000 n=10)
WritePayloadBuffer/limit=67108864,size=1024/without_payload_buffer-8   9891.51µ ±  93%    56.74µ ±   6%  -99.43% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1/with_payload_buffer-8                47.42µ ±   1%    41.46µ ±   3%  -12.56% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1/without_payload_buffer-8             47.62µ ±   3%    46.37µ ±   6%        ~ (p=0.075 n=10)
ReadPayloadBuffer/limit=1024,size=1024/with_payload_buffer-8             66.20µ ±   2%    41.14µ ±   5%  -37.86% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1024/without_payload_buffer-8          66.26µ ±   2%    42.24µ ±   6%  -36.24% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=10240/with_payload_buffer-8            883.7µ ± 168%    431.6µ ±   5%  -51.16% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=10240/without_payload_buffer-8         855.8µ ±   1%    450.6µ ±   6%  -47.35% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=204800/with_payload_buffer-8           11.22m ± 129%    13.46m ±   8%        ~ (p=0.579 n=10)
ReadPayloadBuffer/limit=1024,size=204800/without_payload_buffer-8        9.052m ± 154%   13.502m ±   3%        ~ (p=0.481 n=10)
ReadPayloadBuffer/limit=67108864,size=1024/with_payload_buffer-8       8148.08µ ±  66%    38.62µ ±   8%  -99.53% (p=0.000 n=10)
ReadPayloadBuffer/limit=67108864,size=1024/without_payload_buffer-8     16.850m ±  85%    5.632m ±  15%  -66.58% (p=0.000 n=10)
geomean                                                                  440.2µ           247.4µ         -43.80%

                                                                     │      old.txt      │               new.txt                │
                                                                     │       B/op        │     B/op      vs base                │
SliceDataIntoObjects/slice_1-128/reader-8                                  8.817Ki ±  0%   8.638Ki ± 0%   -2.04% (p=0.000 n=10)
SliceDataIntoObjects/slice_1-128/writer-8                                  8.692Ki ±  0%   8.521Ki ± 0%   -1.98% (p=0.000 n=10)
SliceDataIntoObjects/slice_4-128/reader-8                                  7.066Ki ±  0%   7.700Ki ± 0%   +8.97% (p=0.000 n=10)
SliceDataIntoObjects/slice_4-128/writer-8                                  6.941Ki ±  0%   7.582Ki ± 0%   +9.23% (p=0.000 n=10)
SliceDataIntoObjects/slice_16-128/reader-8                                 7.535Ki ±  0%   8.981Ki ± 0%  +19.19% (p=0.000 n=10)
SliceDataIntoObjects/slice_16-128/writer-8                                 7.410Ki ±  0%   8.872Ki ± 0%  +19.73% (p=0.000 n=10)
SliceDataIntoObjects/slice_64-128/reader-8                                 9.161Ki ±  0%   6.887Ki ± 0%  -24.83% (p=0.000 n=10)
SliceDataIntoObjects/slice_64-128/writer-8                                 9.036Ki ±  0%   6.824Ki ± 0%  -24.48% (p=0.000 n=10)
SliceDataIntoObjects/slice_256-128/reader-8                                7.942Ki ±  0%   7.763Ki ± 0%   -2.26% (p=0.000 n=10)
SliceDataIntoObjects/slice_256-128/writer-8                                23.71Ki ±  0%   23.64Ki ± 0%   -0.29% (p=0.000 n=10)
SliceDataIntoObjects/slice_1024-128/reader-8                               7.067Ki ±  0%   8.702Ki ± 0%  +23.13% (p=0.000 n=10)
SliceDataIntoObjects/slice_1024-128/writer-8                               45.36Ki ±  0%   51.61Ki ± 0%  +13.78% (p=0.000 n=10)
SliceDataIntoObjects/slice_4096-128/reader-8                               8.824Ki ±  0%   8.706Ki ± 0%   -1.34% (p=0.000 n=10)
SliceDataIntoObjects/slice_4096-128/writer-8                               156.8Ki ±  0%   150.6Ki ± 0%   -3.95% (p=0.000 n=10)
SliceDataIntoObjects/slice_16384-128/reader-8                              7.966Ki ±  0%   6.906Ki ± 0%  -13.30% (p=0.000 n=10)
SliceDataIntoObjects/slice_16384-128/writer-8                              543.5Ki ±  0%   540.6Ki ± 0%   -0.54% (p=0.000 n=10)
SliceDataIntoObjects/slice_65536-128/reader-8                              7.520Ki ±  0%   9.076Ki ± 0%  +20.69% (p=0.000 n=10)
SliceDataIntoObjects/slice_65536-128/writer-8                              2.185Mi ±  0%   2.191Mi ± 0%   +0.27% (p=0.000 n=10)
SliceDataIntoObjects/slice_262144-128/reader-8                             7.812Ki ±  1%   7.688Ki ± 0%   -1.59% (p=0.000 n=10)
SliceDataIntoObjects/slice_262144-128/writer-8                             8.810Mi ±  0%   8.373Mi ± 0%   -4.95% (p=0.000 n=10)
SliceDataIntoObjects/slice_1048576-128/reader-8                           11.718Ki ± 44%   8.762Ki ± 2%  -25.23% (p=0.000 n=10)
SliceDataIntoObjects/slice_1048576-128/writer-8                            35.20Mi ±  0%   33.44Mi ± 0%   -5.00% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1/with_payload_buffer-8                 8.907Ki ±  0%   7.853Ki ± 0%  -11.84% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1/without_payload_buffer-8              8.907Ki ±  0%   7.856Ki ± 0%  -11.80% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1024/with_payload_buffer-8              7.422Ki ±  0%   6.367Ki ± 0%  -14.21% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1024/without_payload_buffer-8           7.422Ki ±  0%   7.367Ki ± 0%   -0.74% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=10240/with_payload_buffer-8             60.79Ki ±  0%   52.64Ki ± 0%  -13.41% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=10240/without_payload_buffer-8          60.79Ki ±  0%   53.64Ki ± 0%  -11.76% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=204800/with_payload_buffer-8            889.5Ki ±  0%   748.3Ki ± 0%  -15.88% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=204800/without_payload_buffer-8         889.5Ki ±  0%   749.2Ki ± 0%  -15.77% (p=0.000 n=10)
WritePayloadBuffer/limit=67108864,size=1024/with_payload_buffer-8      65543.045Ki ±  0%   6.363Ki ± 0%  -99.99% (p=0.000 n=10)
WritePayloadBuffer/limit=67108864,size=1024/without_payload_buffer-8   65543.528Ki ±  0%   7.367Ki ± 0%  -99.99% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1/with_payload_buffer-8                  7.469Ki ±  0%   8.181Ki ± 0%   +9.53% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1/without_payload_buffer-8               8.469Ki ±  0%   9.181Ki ± 0%   +8.41% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1024/with_payload_buffer-8               7.469Ki ±  0%   6.132Ki ± 0%  -17.90% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1024/without_payload_buffer-8            8.469Ki ±  0%   7.137Ki ± 0%  -15.73% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=10240/with_payload_buffer-8              60.83Ki ±  0%   49.73Ki ± 0%  -18.25% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=10240/without_payload_buffer-8           61.83Ki ±  0%   50.74Ki ± 0%  -17.94% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=204800/with_payload_buffer-8             845.4Ki ±  0%   792.6Ki ± 0%   -6.24% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=204800/without_payload_buffer-8          846.4Ki ±  0%   793.8Ki ± 0%   -6.21% (p=0.000 n=10)
ReadPayloadBuffer/limit=67108864,size=1024/with_payload_buffer-8       65542.802Ki ±  0%   6.129Ki ± 0%  -99.99% (p=0.000 n=10)
ReadPayloadBuffer/limit=67108864,size=1024/without_payload_buffer-8       128.01Mi ±  0%   64.01Mi ± 0%  -50.00% (p=0.000 n=10)
geomean                                                                    76.88Ki         37.42Ki       -51.32%

                                                                     │   old.txt    │               new.txt               │
                                                                     │  allocs/op   │  allocs/op   vs base                │
SliceDataIntoObjects/slice_1-128/reader-8                               134.0 ±  0%    131.0 ± 0%   -2.24% (p=0.000 n=10)
SliceDataIntoObjects/slice_1-128/writer-8                               133.0 ±  0%    131.0 ± 0%   -1.50% (p=0.000 n=10)
SliceDataIntoObjects/slice_4-128/reader-8                               110.0 ±  0%    119.0 ± 0%   +8.18% (p=0.000 n=10)
SliceDataIntoObjects/slice_4-128/writer-8                               109.0 ±  0%    119.0 ± 0%   +9.17% (p=0.000 n=10)
SliceDataIntoObjects/slice_16-128/reader-8                              117.0 ±  0%    137.0 ± 0%  +17.09% (p=0.000 n=10)
SliceDataIntoObjects/slice_16-128/writer-8                              116.0 ±  0%    137.0 ± 0%  +18.10% (p=0.000 n=10)
SliceDataIntoObjects/slice_64-128/reader-8                              140.0 ±  0%    107.0 ± 0%  -23.57% (p=0.000 n=10)
SliceDataIntoObjects/slice_64-128/writer-8                              139.0 ±  0%    107.0 ± 0%  -23.02% (p=0.000 n=10)
SliceDataIntoObjects/slice_256-128/reader-8                             123.0 ±  0%    120.0 ± 0%   -2.44% (p=0.000 n=10)
SliceDataIntoObjects/slice_256-128/writer-8                             348.0 ±  0%    346.0 ± 0%   -0.57% (p=0.000 n=10)
SliceDataIntoObjects/slice_1024-128/reader-8                            110.0 ±  0%    132.0 ± 0%  +20.00% (p=0.000 n=10)
SliceDataIntoObjects/slice_1024-128/writer-8                            642.0 ±  0%    745.0 ± 0%  +16.04% (p=0.000 n=10)
SliceDataIntoObjects/slice_4096-128/reader-8                            134.0 ±  0%    132.0 ± 0%   -1.49% (p=0.000 n=10)
SliceDataIntoObjects/slice_4096-128/writer-8                           2.207k ±  0%   2.127k ± 0%   -3.62% (p=0.000 n=10)
SliceDataIntoObjects/slice_16384-128/reader-8                           123.0 ±  0%    107.0 ± 0%  -13.01% (p=0.000 n=10)
SliceDataIntoObjects/slice_16384-128/writer-8                          7.601k ±  0%   7.546k ± 0%   -0.72% (p=0.000 n=10)
SliceDataIntoObjects/slice_65536-128/reader-8                           117.0 ±  1%    138.0 ± 0%  +17.95% (p=0.000 n=10)
SliceDataIntoObjects/slice_65536-128/writer-8                          31.18k ±  0%   31.29k ± 0%   +0.33% (p=0.000 n=10)
SliceDataIntoObjects/slice_262144-128/reader-8                          121.0 ±  1%    119.0 ± 0%   -1.65% (p=0.000 n=10)
SliceDataIntoObjects/slice_262144-128/writer-8                         124.1k ±  0%   118.0k ± 0%   -4.93% (p=0.000 n=10)
SliceDataIntoObjects/slice_1048576-128/reader-8                         175.5 ± 41%    133.0 ± 2%  -24.22% (p=0.000 n=10)
SliceDataIntoObjects/slice_1048576-128/writer-8                        496.1k ±  0%   471.4k ± 0%   -4.98% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1/with_payload_buffer-8              117.0 ±  0%    114.0 ± 0%   -2.56% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1/without_payload_buffer-8           117.0 ±  0%    115.0 ± 0%   -1.71% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1024/with_payload_buffer-8           85.00 ±  0%    82.00 ± 0%   -3.53% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=1024/without_payload_buffer-8        85.00 ±  0%    83.00 ± 0%   -2.35% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=10240/with_payload_buffer-8          822.0 ±  0%    727.0 ± 0%  -11.56% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=10240/without_payload_buffer-8       822.0 ±  0%    728.0 ± 0%  -11.44% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=204800/with_payload_buffer-8        12.32k ±  0%   10.67k ± 0%  -13.41% (p=0.000 n=10)
WritePayloadBuffer/limit=1024,size=204800/without_payload_buffer-8     12.32k ±  0%   10.67k ± 0%  -13.40% (p=0.000 n=10)
WritePayloadBuffer/limit=67108864,size=1024/with_payload_buffer-8       88.00 ±  0%    82.00 ± 0%   -6.82% (p=0.000 n=10)
WritePayloadBuffer/limit=67108864,size=1024/without_payload_buffer-8    90.00 ±  1%    83.00 ± 0%   -7.78% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1/with_payload_buffer-8               86.00 ±  0%   120.00 ± 0%  +39.53% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1/without_payload_buffer-8            87.00 ±  0%   121.00 ± 0%  +39.08% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1024/with_payload_buffer-8            86.00 ±  0%    78.00 ± 0%   -9.30% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=1024/without_payload_buffer-8         87.00 ±  0%    80.00 ± 0%   -8.05% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=10240/with_payload_buffer-8           823.0 ±  0%    690.0 ± 0%  -16.16% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=10240/without_payload_buffer-8        824.0 ±  0%    701.0 ± 0%  -14.93% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=204800/with_payload_buffer-8         11.72k ±  0%   11.28k ± 0%   -3.73% (p=0.000 n=10)
ReadPayloadBuffer/limit=1024,size=204800/without_payload_buffer-8      11.72k ±  0%   11.48k ± 0%   -2.02% (p=0.000 n=10)
ReadPayloadBuffer/limit=67108864,size=1024/with_payload_buffer-8        84.00 ±  0%    78.00 ± 0%   -7.14% (p=0.000 n=10)
ReadPayloadBuffer/limit=67108864,size=1024/without_payload_buffer-8     87.00 ±  0%    85.00 ± 1%   -2.30% (p=0.000 n=10)
geomean                                                                 435.5          425.3        -2.36%
```

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 25, 2024
1 parent e2666d8 commit 3902eb9
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 34 deletions.
159 changes: 125 additions & 34 deletions object/slicer/slicer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package slicer

import (
"bytes"
"context"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -164,20 +163,44 @@ func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.R
objectPayloadLimit := childPayloadSizeLimit(opts)

var n int
bChunk := opts.payloadBuffer
if bChunk == nil {
bChunk = make([]byte, objectPayloadLimit)
}

writer, err := initPayloadStream(ctx, ow, header, signer, opts)
if err != nil {
return rootID, fmt.Errorf("init writter: %w", err)
}

var buf []byte
var buffered uint64

for {
n, err = data.Read(bChunk)
buffered = writer.rootMeta.length
if writer.withSplit {
buffered = writer.childMeta.length
}

if buffered == objectPayloadLimit && uint64(len(writer.payloadBuffer)) <= objectPayloadLimit {
// in this case, the read buffers are exhausted, and it is unclear whether there
// will be more data. We need to know this right away, because the format of the
// final objects depends on it. So read to temp buffer
buf = []byte{0}
} else {
payloadBufferLen := uint64(len(writer.payloadBuffer))
if payloadBufferLen > buffered {
buf = writer.payloadBuffer[buffered:]
} else {
if writer.extraPayloadBuffer == nil {
// TODO(#544): support external buffer pools
writer.extraPayloadBuffer = make([]byte, writer.payloadSizeLimit-payloadBufferLen)
buf = writer.extraPayloadBuffer
} else {
buf = writer.extraPayloadBuffer[buffered-payloadBufferLen:]
}
}
}

n, err = data.Read(buf)
if n > 0 {
if _, err = writer.Write(bChunk[:n]); err != nil {
if _, err = writer.Write(buf[:n]); err != nil {
return oid.ID{}, err
}
}
Expand Down Expand Up @@ -268,9 +291,9 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec
stubObject: &stubObject,
}

res.buf.Grow(int(res.payloadSizeLimit))
res.payloadBuffer = opts.payloadBuffer
res.rootMeta.reset()
res.currentWriter = io.MultiWriter(&res.buf, &res.rootMeta)
res.metaWriter = &res.rootMeta

return res, nil
}
Expand All @@ -289,15 +312,16 @@ type PayloadWriter struct {
currentEpoch uint64
sessionToken *session.Object

buf bytes.Buffer
payloadBuffer []byte
extraPayloadBuffer []byte

rootMeta dynamicObjectMetadata
childMeta dynamicObjectMetadata

// max payload size of produced objects in bytes
payloadSizeLimit uint64

currentWriter io.Writer
metaWriter io.Writer

withSplit bool
splitID *object.SplitID
Expand All @@ -316,18 +340,67 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {
return 0, nil
}

alreadyWritten := x.rootMeta.length
buffered := x.rootMeta.length
if x.withSplit {
alreadyWritten = x.childMeta.length
buffered = x.childMeta.length
}

if alreadyWritten+uint64(len(chunk)) <= x.payloadSizeLimit {
return x.currentWriter.Write(chunk)
if buffered+uint64(len(chunk)) <= x.payloadSizeLimit {
// buffer data to produce as few objects as possible for better storage efficiency
_, err := x.metaWriter.Write(chunk)
if err != nil {
return 0, err
}

var n int
payloadBufferLen := uint64(len(x.payloadBuffer))
if payloadBufferLen > buffered {
n = copy(x.payloadBuffer[buffered:], chunk)
if n == len(chunk) {
return n, nil
}

chunk = chunk[n:]
buffered += uint64(n)
}

if x.extraPayloadBuffer == nil {
// if here for the first time, then allocate the minimum buffer sufficient for
// writing: user may do one Write followed by Close. In such cases there is no
// point in allocating a buffer of payloadSizeLimit size.
// TODO(#544): support external buffer pools
x.extraPayloadBuffer = make([]byte, len(chunk))
} else if payloadBufferLen+uint64(len(x.extraPayloadBuffer)) < x.payloadSizeLimit {
b := make([]byte, uint64(len(x.extraPayloadBuffer))+x.payloadSizeLimit-buffered)
copy(b, x.extraPayloadBuffer)
x.extraPayloadBuffer = b
}

return n + copy(x.extraPayloadBuffer[buffered-payloadBufferLen:], chunk), nil
}

n, err := x.currentWriter.Write(chunk[:x.payloadSizeLimit-alreadyWritten])
// at this point there is enough data to flush the buffer by sending the next
n := int(x.payloadSizeLimit - buffered)
_, err := x.metaWriter.Write(chunk[:n])
if err != nil {
return n, err
return 0, err
}

payloadBuffers := make([][]byte, 0, 3)
if buffered > 0 {
if len(x.payloadBuffer) > 0 {
if uint64(len(x.payloadBuffer)) >= buffered {
payloadBuffers = append(payloadBuffers, x.payloadBuffer[:buffered])
} else {
payloadBuffers = append(payloadBuffers, x.payloadBuffer, x.extraPayloadBuffer[:buffered-uint64(len(x.payloadBuffer))])
}
} else {
payloadBuffers = append(payloadBuffers, x.extraPayloadBuffer[:buffered])
}
}

if n > 0 {
payloadBuffers = append(payloadBuffers, chunk[:n])
}

if !x.withSplit {
Expand All @@ -336,20 +409,19 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {
// to fill splitInfo in all child objects.
x.withSplit = true

err = x.writeIntermediateChild(x.ctx, x.rootMeta)
err := x.writeIntermediateChild(x.ctx, x.rootMeta, payloadBuffers)
if err != nil {
return n, fmt.Errorf("write 1st child: %w", err)
}

x.currentWriter = io.MultiWriter(&x.buf, &x.rootMeta, &x.childMeta)
x.metaWriter = io.MultiWriter(&x.rootMeta, &x.childMeta)
} else {
err = x.writeIntermediateChild(x.ctx, x.childMeta)
err := x.writeIntermediateChild(x.ctx, x.childMeta, payloadBuffers)
if err != nil {
return n, fmt.Errorf("write next child: %w", err)
}
}

x.buf.Reset()
x.childMeta.reset()

n2, err := x.Write(chunk[n:]) // here n > 0 so infinite recursion shouldn't occur
Expand All @@ -360,10 +432,28 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {
// Close finalizes object with written payload data, saves the object and closes
// the stream. Reference to the stored object can be obtained by ID method.
func (x *PayloadWriter) Close() error {
buffered := x.rootMeta.length
if x.withSplit {
buffered = x.childMeta.length
}

var payloadBuffers [][]byte
if buffered > 0 {
if len(x.payloadBuffer) > 0 {
if uint64(len(x.payloadBuffer)) >= buffered {
payloadBuffers = [][]byte{x.payloadBuffer[:buffered]}
} else {
payloadBuffers = [][]byte{x.payloadBuffer, x.extraPayloadBuffer[:buffered-uint64(len(x.payloadBuffer))]}
}
} else {
payloadBuffers = [][]byte{x.extraPayloadBuffer[:buffered]}
}
}

if x.withSplit {
return x.writeLastChild(x.ctx, x.childMeta, x.setID)
return x.writeLastChild(x.ctx, x.childMeta, payloadBuffers, x.setID)
}
return x.writeLastChild(x.ctx, x.rootMeta, x.setID)
return x.writeLastChild(x.ctx, x.rootMeta, payloadBuffers, x.setID)
}

func (x *PayloadWriter) setID(id oid.ID) {
Expand All @@ -380,18 +470,18 @@ func (x *PayloadWriter) ID() oid.ID {

// writeIntermediateChild writes intermediate split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter.
func (x *PayloadWriter) writeIntermediateChild(ctx context.Context, meta dynamicObjectMetadata) error {
return x._writeChild(ctx, meta, false, nil)
func (x *PayloadWriter) writeIntermediateChild(ctx context.Context, meta dynamicObjectMetadata, payloadBuffers [][]byte) error {
return x._writeChild(ctx, meta, payloadBuffers, false, nil)
}

// writeIntermediateChild writes last split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter. If rootIDHandler is
// specified, ID of the resulting root object is passed into it.
func (x *PayloadWriter) writeLastChild(ctx context.Context, meta dynamicObjectMetadata, rootIDHandler func(id oid.ID)) error {
return x._writeChild(ctx, meta, true, rootIDHandler)
func (x *PayloadWriter) writeLastChild(ctx context.Context, meta dynamicObjectMetadata, payloadBuffers [][]byte, rootIDHandler func(id oid.ID)) error {
return x._writeChild(ctx, meta, payloadBuffers, true, rootIDHandler)
}

func (x *PayloadWriter) _writeChild(ctx context.Context, meta dynamicObjectMetadata, last bool, rootIDHandler func(id oid.ID)) error {
func (x *PayloadWriter) _writeChild(ctx context.Context, meta dynamicObjectMetadata, payloadBuffers [][]byte, last bool, rootIDHandler func(id oid.ID)) error {
obj := *x.stubObject
obj.SetSplitID(nil)
obj.ResetPreviousID()
Expand Down Expand Up @@ -427,8 +517,7 @@ func (x *PayloadWriter) _writeChild(ctx context.Context, meta dynamicObjectMetad
var id oid.ID
var err error

id, err = writeInMemObject(ctx, x.signer, x.stream, obj, x.buf.Bytes(), meta, x.prmObjectPutInit)

id, err = writeInMemObject(ctx, x.signer, x.stream, obj, payloadBuffers, meta, x.prmObjectPutInit)
if err != nil {
return fmt.Errorf("write formed object: %w", err)
}
Expand Down Expand Up @@ -495,7 +584,7 @@ func flushObjectMetadata(signer neofscrypto.Signer, meta dynamicObjectMetadata,
return id, nil
}

func writeInMemObject(ctx context.Context, signer user.Signer, w ObjectWriter, header object.Object, payload []byte, meta dynamicObjectMetadata, prm client.PrmObjectPutInit) (oid.ID, error) {
func writeInMemObject(ctx context.Context, signer user.Signer, w ObjectWriter, header object.Object, payloadBuffers [][]byte, meta dynamicObjectMetadata, prm client.PrmObjectPutInit) (oid.ID, error) {
var (
id oid.ID
err error
Expand All @@ -516,9 +605,11 @@ func writeInMemObject(ctx context.Context, signer user.Signer, w ObjectWriter, h
return id, fmt.Errorf("init data stream for next object: %w", err)
}

_, err = stream.Write(payload)
if err != nil {
return id, fmt.Errorf("write object payload: %w", err)
for i := range payloadBuffers {
_, err = stream.Write(payloadBuffers[i])
if err != nil {
return id, fmt.Errorf("write object payload: %w", err)
}
}

if c, ok := stream.(io.Closer); ok {
Expand Down
55 changes: 55 additions & 0 deletions object/slicer/slicer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,3 +953,58 @@ func BenchmarkReadPayloadBuffer(b *testing.B) {
})
}
}

func TestOptions_SetPayloadBuffer(t *testing.T) {
for _, tc := range []struct {
dataSize uint64
payloadLimit uint64
bufSize int
}{
// buffer smaller than limit
{dataSize: 0, payloadLimit: 10, bufSize: 5},
{dataSize: 1, payloadLimit: 10, bufSize: 5},
{dataSize: 5, payloadLimit: 10, bufSize: 5},
{dataSize: 6, payloadLimit: 10, bufSize: 5},
{dataSize: 10, payloadLimit: 10, bufSize: 5},
{dataSize: 12, payloadLimit: 10, bufSize: 5},
{dataSize: 15, payloadLimit: 10, bufSize: 5},
{dataSize: 20, payloadLimit: 10, bufSize: 5},
{dataSize: 21, payloadLimit: 10, bufSize: 5},
// buffer of limit size
{dataSize: 0, payloadLimit: 10, bufSize: 10},
{dataSize: 1, payloadLimit: 10, bufSize: 10},
{dataSize: 5, payloadLimit: 10, bufSize: 10},
{dataSize: 6, payloadLimit: 10, bufSize: 10},
{dataSize: 10, payloadLimit: 10, bufSize: 10},
{dataSize: 12, payloadLimit: 10, bufSize: 10},
{dataSize: 15, payloadLimit: 10, bufSize: 10},
{dataSize: 20, payloadLimit: 10, bufSize: 10},
{dataSize: 21, payloadLimit: 10, bufSize: 10},
// buffer bigger than limit
{dataSize: 0, payloadLimit: 10, bufSize: 11},
{dataSize: 1, payloadLimit: 10, bufSize: 11},
{dataSize: 5, payloadLimit: 10, bufSize: 11},
{dataSize: 6, payloadLimit: 10, bufSize: 11},
{dataSize: 10, payloadLimit: 10, bufSize: 11},
{dataSize: 12, payloadLimit: 10, bufSize: 11},
{dataSize: 15, payloadLimit: 10, bufSize: 11},
{dataSize: 20, payloadLimit: 10, bufSize: 11},
{dataSize: 21, payloadLimit: 10, bufSize: 11},
} {
t.Run(fmt.Sprintf("with_buffer=%d_data=%d_limit=%d", tc.bufSize, tc.dataSize, tc.payloadLimit), func(t *testing.T) {
in, opts := randomInput(t, tc.dataSize, tc.payloadLimit)
if tc.bufSize > 0 {
opts.SetPayloadBuffer(make([]byte, tc.bufSize))
}

checker := &slicedObjectChecker{
opts: opts,
tb: t,
input: in,
chainCollector: newChainCollector(t),
}

testSlicerByHeaderType(t, checker, in, opts)
})
}
}

0 comments on commit 3902eb9

Please sign in to comment.