Skip to content

Commit

Permalink
lineprotocol: fix Decoder bug when decoding from an io.Reader
Browse files Browse the repository at this point in the history
When we needed more input at the end of the buffer, we were sliding
the existing data to the front of the buffer, but ignoring the fact that
existing tokens could be pointing to that data and hence overwrite it.

Avoid that possibility by sliding data only when we reset the decoder
after each entry has been entirely decoded.

It's likely that this was the cause of #50,
although I haven't managed to recreate the panic. Specifically, in [`Decoder.NextField`](https://github.com/influxdata/line-protocol/blob/e5cb25aaf8da4a0058b516f769b1804713e2de58/lineprotocol/decoder.go#L399-L412),
this scenario could have happened:

- we decode the field tag
- we decode the field value, triggering a read request which corrupts the field tag by overwriting it.
- there's an error decoding the value
- we calculate the original encoded length of the tag by looking at its data, but the corrupted data now contains a character that needs escaping, which it didn't before, so the calculated length is longer than the original.
- this results in us passing a `startIndex` value that's too large to `d.syntaxErrorf`
- this results in the index out-of-bounds panic

The reason this issue wasn't caught by fuzzing was that we were only fuzzing with
`NewDecoderWithBytes`, not with `NewDecoder` itself.

Performance seems largely unaffected:

```
name                                                           old time/op    new time/op    delta
DecodeEntriesSkipping/long-lines-8                               25.4ms ± 1%    25.4ms ± 1%       ~     (p=0.421 n=5+5)
DecodeEntriesSkipping/long-lines-with-escapes-8                  29.6ms ± 0%    29.4ms ± 0%     -0.60%  (p=0.008 n=5+5)
DecodeEntriesSkipping/single-short-line-8                         408ns ± 1%     407ns ± 1%       ~     (p=0.690 n=5+5)
DecodeEntriesSkipping/single-short-line-with-escapes-8            415ns ± 2%     418ns ± 2%       ~     (p=0.548 n=5+5)
DecodeEntriesSkipping/many-short-lines-8                          178ms ± 1%     175ms ± 1%     -1.49%  (p=0.008 n=5+5)
DecodeEntriesSkipping/field-key-escape-not-escapable-8            369ns ± 2%     367ns ± 0%       ~     (p=0.690 n=5+5)
DecodeEntriesSkipping/tag-value-triple-escape-space-8             447ns ± 2%     442ns ± 0%       ~     (p=0.151 n=5+5)
DecodeEntriesSkipping/procstat-8                                 3.43µs ± 2%    3.35µs ± 1%     -2.29%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/long-lines-8                        25.4ms ± 1%    25.2ms ± 0%     -0.68%  (p=0.016 n=5+5)
DecodeEntriesWithoutSkipping/long-lines-with-escapes-8            101ms ± 1%     101ms ± 0%       ~     (p=0.310 n=5+5)
DecodeEntriesWithoutSkipping/single-short-line-8                  442ns ± 2%     438ns ± 1%       ~     (p=0.310 n=5+5)
DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8     467ns ± 2%     465ns ± 2%       ~     (p=1.000 n=5+5)
DecodeEntriesWithoutSkipping/many-short-lines-8                   205ms ± 2%     207ms ± 0%       ~     (p=0.222 n=5+5)
DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8     516ns ± 6%     420ns ± 2%    -18.60%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8      586ns ± 1%     510ns ± 1%    -13.05%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/procstat-8                          5.76µs ± 2%    6.23µs ± 0%     +8.11%  (p=0.016 n=5+4)

name                                                           old speed      new speed      delta
DecodeEntriesSkipping/long-lines-8                             1.03GB/s ± 1%  1.03GB/s ± 1%       ~     (p=0.421 n=5+5)
DecodeEntriesSkipping/long-lines-with-escapes-8                 886MB/s ± 0%   891MB/s ± 0%     +0.61%  (p=0.008 n=5+5)
DecodeEntriesSkipping/single-short-line-8                      71.0MB/s ± 1%  71.2MB/s ± 1%       ~     (p=0.690 n=5+5)
DecodeEntriesSkipping/single-short-line-with-escapes-8         77.2MB/s ± 2%  76.6MB/s ± 2%       ~     (p=0.548 n=5+5)
DecodeEntriesSkipping/many-short-lines-8                        147MB/s ± 1%   150MB/s ± 1%     +1.52%  (p=0.008 n=5+5)
DecodeEntriesSkipping/field-key-escape-not-escapable-8         89.4MB/s ± 2%  90.0MB/s ± 0%       ~     (p=0.690 n=5+5)
DecodeEntriesSkipping/tag-value-triple-escape-space-8           112MB/s ± 2%   113MB/s ± 0%       ~     (p=0.151 n=5+5)
DecodeEntriesSkipping/procstat-8                                387MB/s ± 1%   396MB/s ± 1%     +2.34%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/long-lines-8                      1.03GB/s ± 1%  1.04GB/s ± 0%     +0.68%  (p=0.016 n=5+5)
DecodeEntriesWithoutSkipping/long-lines-with-escapes-8          259MB/s ± 1%   261MB/s ± 0%       ~     (p=0.286 n=5+5)
DecodeEntriesWithoutSkipping/single-short-line-8               65.6MB/s ± 2%  66.2MB/s ± 1%       ~     (p=0.310 n=5+5)
DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8  68.5MB/s ± 2%  68.9MB/s ± 2%       ~     (p=1.000 n=5+5)
DecodeEntriesWithoutSkipping/many-short-lines-8                 128MB/s ± 2%   126MB/s ± 0%       ~     (p=0.222 n=5+5)
DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8  64.1MB/s ± 6%  78.6MB/s ± 2%    +22.60%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8   85.3MB/s ± 1%  98.1MB/s ± 1%    +15.02%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/procstat-8                         230MB/s ± 2%   213MB/s ± 0%     -7.51%  (p=0.016 n=5+4)

name                                                           old alloc/op   new alloc/op   delta
DecodeEntriesSkipping/long-lines-8                                 512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/long-lines-with-escapes-8                    512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/single-short-line-8                          512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/single-short-line-with-escapes-8             512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/many-short-lines-8                           512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/field-key-escape-not-escapable-8             512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/tag-value-triple-escape-space-8              512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesSkipping/procstat-8                                   512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/long-lines-8                          512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/long-lines-with-escapes-8           17.4kB ± 0%    17.4kB ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/single-short-line-8                   512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8      512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/many-short-lines-8                    512B ± 0%      512B ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8      512B ± 0%      514B ± 0%     +0.39%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8       512B ± 0%      514B ± 0%     +0.39%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/procstat-8                            512B ± 0%      784B ± 0%    +53.12%  (p=0.008 n=5+5)

name                                                           old allocs/op  new allocs/op  delta
DecodeEntriesSkipping/long-lines-8                                 1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/long-lines-with-escapes-8                    1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/single-short-line-8                          1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/single-short-line-with-escapes-8             1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/many-short-lines-8                           1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/field-key-escape-not-escapable-8             1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/tag-value-triple-escape-space-8              1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesSkipping/procstat-8                                   1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/long-lines-8                          1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/long-lines-with-escapes-8             7.00 ± 0%      7.00 ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/single-short-line-8                   1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8      1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/many-short-lines-8                    1.00 ± 0%      1.00 ± 0%       ~     (all equal)
DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8      1.00 ± 0%      2.00 ± 0%   +100.00%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8       1.00 ± 0%      2.00 ± 0%   +100.00%  (p=0.008 n=5+5)
DecodeEntriesWithoutSkipping/procstat-8                            1.00 ± 0%     30.00 ± 0%  +2900.00%  (p=0.008 n=5+5)
```
  • Loading branch information
rogpeppe committed Nov 23, 2021
1 parent e5cb25a commit 6e963d9
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 24 deletions.
52 changes: 28 additions & 24 deletions lineprotocol/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ import (
const (
// When the buffer is grown, it will be grown by a minimum of 8K.
minGrow = 8192

// The buffer will be grown if there's less than minRead space available
// to read into.
minRead = minGrow / 2

// maxSlide is the maximum number of bytes that will
// be copied to the start of the buffer when reset is called.
// This is a trade-off between copy overhead and the likelihood
// that a complete line-protocol entry will fit into this size.
maxSlide = 256
)

var (
Expand Down Expand Up @@ -720,11 +727,18 @@ func (d *Decoder) at(i int) byte {

// reset discards all the data up to d.r1 and data in d.escBuf
func (d *Decoder) reset() {
if d.r1 == len(d.buf) {
if unread := len(d.buf) - d.r1; unread == 0 {
// No bytes in the buffer, so we can start from the beginning without
// needing to copy anything (and get better cache behaviour too).
d.buf = d.buf[:0]
d.r1 = 0
} else if !d.complete && unread <= maxSlide {
// Slide the unread portion of the buffer to the
// start so that when we read more data,
// there's less chance that we'll need to grow the buffer.
copy(d.buf, d.buf[d.r1:])
d.r1 = 0
d.buf = d.buf[:unread]
}
d.r0 = d.r1
d.escBuf = d.escBuf[:0]
Expand Down Expand Up @@ -771,30 +785,20 @@ func (d *Decoder) readMore() {
}
n := cap(d.buf) - len(d.buf)
if n < minRead {
// There's not enough available space at the end of the buffer to read into.
if d.r0+n >= minRead {
// There's enough space when we take into account already-used
// part of buf, so slide the data to the front.
copy(d.buf, d.buf[d.r0:])
d.buf = d.buf[:len(d.buf)-d.r0]
d.r1 -= d.r0
d.r0 = 0
} else {
// We need to grow the buffer. Note that we don't have to copy
// the unused part of the buffer (d.buf[:d.r0]).
// TODO provide a way to limit the maximum size that
// the buffer can grow to.
used := len(d.buf) - d.r0
n1 := cap(d.buf) * 2
if n1-used < minGrow {
n1 = used + minGrow
}
buf1 := make([]byte, used, n1)
copy(buf1, d.buf[d.r0:])
d.buf = buf1
d.r1 -= d.r0
d.r0 = 0
// We need to grow the buffer. Note that we don't have to copy
// the unused part of the buffer (d.buf[:d.r0]).
// TODO provide a way to limit the maximum size that
// the buffer can grow to.
used := len(d.buf) - d.r0
n1 := cap(d.buf) * 2
if n1-used < minGrow {
n1 = used + minGrow
}
buf1 := make([]byte, used, n1)
copy(buf1, d.buf[d.r0:])
d.buf = buf1
d.r1 -= d.r0
d.r0 = 0
}
n, err := d.rd.Read(d.buf[len(d.buf):cap(d.buf)])
d.buf = d.buf[:len(d.buf)+n]
Expand Down
120 changes: 120 additions & 0 deletions lineprotocol/decoder_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package lineprotocol

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"math/rand"
"regexp"
"strings"
"testing"
"testing/iotest"
"time"
"unicode/utf8"

qt "github.com/frankban/quicktest"
Expand Down Expand Up @@ -1351,3 +1354,120 @@ func TestErrorPositions(t *testing.T) {
})
}
}

func TestDecodeLargeDataWithReader(t *testing.T) {
c := qt.New(t)
r, w := io.Pipe()
const maxTagCount = 9
const maxFieldCount = 4
const npoints = 2000
go func() {
defer w.Close()
bw := bufio.NewWriter(w)
defer bw.Flush()
g := newTokenGenerator()
var enc Encoder
enc.SetLax(true) // Allow out-of-order tag keys.
for i := 0; i < npoints; i++ {
ntags := g.rand.Intn(maxTagCount + 1)
nfields := g.rand.Intn(maxFieldCount) + 1
timestamp := g.rand.Int63n(0xffff_ffff_ffff)
enc.StartLineRaw(g.token())
for j := 0; j < ntags; j++ {
enc.AddTagRaw(g.token(), g.token())
}
for j := 0; j < nfields; j++ {
key, val := g.token(), g.token()
v, err := NewValueFromBytes(String, val)
if err != nil {
panic(err)
}
enc.AddFieldRaw(key, v)
}
enc.EndLine(time.Unix(0, timestamp))
bw.Write(enc.Bytes())
enc.Reset()
}
}()
g := newTokenGenerator()
var wc writeCounter
dec := NewDecoder(io.TeeReader(r, &wc))
n := 0
for ; dec.Next(); n++ {
if n >= npoints {
c.Fatalf("too many points decoded")
}
wantNtags := g.rand.Intn(maxTagCount + 1)
wantNfields := g.rand.Intn(maxFieldCount) + 1
wantTimestamp := g.rand.Int63n(0xffff_ffff_ffff)
m, err := dec.Measurement()
c.Assert(err, qt.IsNil)
c.Check(m, qt.DeepEquals, g.token(), qt.Commentf("n %d", n))
tagi := 0
for {
key, val, err := dec.NextTag()
c.Assert(err, qt.IsNil)
if key == nil {
break
}
if tagi >= wantNtags {
c.Fatalf("too many tags found on entry %d", n)
}
wantKey, wantVal := g.token(), g.token()
c.Check(key, qt.DeepEquals, wantKey)
c.Check(val, qt.DeepEquals, wantVal)
tagi++
}
c.Assert(tagi, qt.Equals, wantNtags)
fieldi := 0
for {
key, val, err := dec.NextField()
c.Assert(err, qt.IsNil)
if key == nil {
break
}
if fieldi >= wantNfields {
c.Fatalf("too many tags found on entry %d", n)
}
wantKey, wantVal := g.token(), g.token()
c.Check(key, qt.DeepEquals, wantKey)
c.Check(val.Interface(), qt.Equals, string(wantVal))
fieldi++
}
c.Assert(fieldi, qt.Equals, wantNfields)
t, err := dec.Time(Nanosecond, time.Time{})
c.Check(err, qt.IsNil)
c.Check(t.UnixNano(), qt.Equals, wantTimestamp)
}
c.Assert(n, qt.Equals, npoints)
c.Logf("total bytes: %v", wc.n)
}

type writeCounter struct {
n int
}

func (w *writeCounter) Write(buf []byte) (int, error) {
w.n += len(buf)
return len(buf), nil
}

func newTokenGenerator() *tokenGenerator {
return &tokenGenerator{
rand: rand.New(rand.NewSource(0)),
}
}

type tokenGenerator struct {
rand *rand.Rand
}

const alphabet = "abcdefghijklmnopqrstuvwxyz =, =, =,"

func (g *tokenGenerator) token() []byte {
data := make([]byte, g.rand.Intn(40)+1)
for i := range data {
data[i] = alphabet[g.rand.Intn(len(alphabet))]
}
return data
}

0 comments on commit 6e963d9

Please sign in to comment.