From 6e963d9112f1882630fbfae02ce6de0c539948a3 Mon Sep 17 00:00:00 2001 From: Roger Peppe Date: Tue, 23 Nov 2021 14:18:50 +0000 Subject: [PATCH] lineprotocol: fix Decoder bug when decoding from an io.Reader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When we needed more input at the end of the buffer, we were sliding the existing data to the front of the buffer, but ignoring the fact that existing tokens could be pointing to that data and hence overwrite it. Avoid that possibility by sliding data only when we reset the decoder after each entry has been entirely decoded. It's likely that this was the cause of https://github.com/influxdata/line-protocol/issues/50, although I haven't managed to recreate the panic. Specifically, in [`Decoder.NextField`](https://github.com/influxdata/line-protocol/blob/e5cb25aaf8da4a0058b516f769b1804713e2de58/lineprotocol/decoder.go#L399-L412), this scenario could have happened: - we decode the field tag - we decode the field value, triggering a read request which corrupts the field tag by overwriting it. - there's an error decoding the value - we calculate the original encoded length of the tag by looking at its data, but the corrupted data now contains a character that needs escaping, which it didn't before, so the calculated length is longer than the original. - this results in us passing a `startIndex` value that's too large to `d.syntaxErrorf` - this results in the index out-of-bounds panic The reason this issue wasn't caught by fuzzing was that we were only fuzzing with `NewDecoderWithBytes`, not with `NewDecoder` itself. Performance seems largely unaffected: ``` name old time/op new time/op delta DecodeEntriesSkipping/long-lines-8 25.4ms ± 1% 25.4ms ± 1% ~ (p=0.421 n=5+5) DecodeEntriesSkipping/long-lines-with-escapes-8 29.6ms ± 0% 29.4ms ± 0% -0.60% (p=0.008 n=5+5) DecodeEntriesSkipping/single-short-line-8 408ns ± 1% 407ns ± 1% ~ (p=0.690 n=5+5) DecodeEntriesSkipping/single-short-line-with-escapes-8 415ns ± 2% 418ns ± 2% ~ (p=0.548 n=5+5) DecodeEntriesSkipping/many-short-lines-8 178ms ± 1% 175ms ± 1% -1.49% (p=0.008 n=5+5) DecodeEntriesSkipping/field-key-escape-not-escapable-8 369ns ± 2% 367ns ± 0% ~ (p=0.690 n=5+5) DecodeEntriesSkipping/tag-value-triple-escape-space-8 447ns ± 2% 442ns ± 0% ~ (p=0.151 n=5+5) DecodeEntriesSkipping/procstat-8 3.43µs ± 2% 3.35µs ± 1% -2.29% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/long-lines-8 25.4ms ± 1% 25.2ms ± 0% -0.68% (p=0.016 n=5+5) DecodeEntriesWithoutSkipping/long-lines-with-escapes-8 101ms ± 1% 101ms ± 0% ~ (p=0.310 n=5+5) DecodeEntriesWithoutSkipping/single-short-line-8 442ns ± 2% 438ns ± 1% ~ (p=0.310 n=5+5) DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8 467ns ± 2% 465ns ± 2% ~ (p=1.000 n=5+5) DecodeEntriesWithoutSkipping/many-short-lines-8 205ms ± 2% 207ms ± 0% ~ (p=0.222 n=5+5) DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8 516ns ± 6% 420ns ± 2% -18.60% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8 586ns ± 1% 510ns ± 1% -13.05% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/procstat-8 5.76µs ± 2% 6.23µs ± 0% +8.11% (p=0.016 n=5+4) name old speed new speed delta DecodeEntriesSkipping/long-lines-8 1.03GB/s ± 1% 1.03GB/s ± 1% ~ (p=0.421 n=5+5) DecodeEntriesSkipping/long-lines-with-escapes-8 886MB/s ± 0% 891MB/s ± 0% +0.61% (p=0.008 n=5+5) DecodeEntriesSkipping/single-short-line-8 71.0MB/s ± 1% 71.2MB/s ± 1% ~ (p=0.690 n=5+5) DecodeEntriesSkipping/single-short-line-with-escapes-8 77.2MB/s ± 2% 76.6MB/s ± 2% ~ (p=0.548 n=5+5) DecodeEntriesSkipping/many-short-lines-8 147MB/s ± 1% 150MB/s ± 1% +1.52% (p=0.008 n=5+5) DecodeEntriesSkipping/field-key-escape-not-escapable-8 89.4MB/s ± 2% 90.0MB/s ± 0% ~ (p=0.690 n=5+5) DecodeEntriesSkipping/tag-value-triple-escape-space-8 112MB/s ± 2% 113MB/s ± 0% ~ (p=0.151 n=5+5) DecodeEntriesSkipping/procstat-8 387MB/s ± 1% 396MB/s ± 1% +2.34% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/long-lines-8 1.03GB/s ± 1% 1.04GB/s ± 0% +0.68% (p=0.016 n=5+5) DecodeEntriesWithoutSkipping/long-lines-with-escapes-8 259MB/s ± 1% 261MB/s ± 0% ~ (p=0.286 n=5+5) DecodeEntriesWithoutSkipping/single-short-line-8 65.6MB/s ± 2% 66.2MB/s ± 1% ~ (p=0.310 n=5+5) DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8 68.5MB/s ± 2% 68.9MB/s ± 2% ~ (p=1.000 n=5+5) DecodeEntriesWithoutSkipping/many-short-lines-8 128MB/s ± 2% 126MB/s ± 0% ~ (p=0.222 n=5+5) DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8 64.1MB/s ± 6% 78.6MB/s ± 2% +22.60% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8 85.3MB/s ± 1% 98.1MB/s ± 1% +15.02% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/procstat-8 230MB/s ± 2% 213MB/s ± 0% -7.51% (p=0.016 n=5+4) name old alloc/op new alloc/op delta DecodeEntriesSkipping/long-lines-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/long-lines-with-escapes-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/single-short-line-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/single-short-line-with-escapes-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/many-short-lines-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/field-key-escape-not-escapable-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/tag-value-triple-escape-space-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesSkipping/procstat-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/long-lines-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/long-lines-with-escapes-8 17.4kB ± 0% 17.4kB ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/single-short-line-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/many-short-lines-8 512B ± 0% 512B ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8 512B ± 0% 514B ± 0% +0.39% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8 512B ± 0% 514B ± 0% +0.39% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/procstat-8 512B ± 0% 784B ± 0% +53.12% (p=0.008 n=5+5) name old allocs/op new allocs/op delta DecodeEntriesSkipping/long-lines-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/long-lines-with-escapes-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/single-short-line-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/single-short-line-with-escapes-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/many-short-lines-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/field-key-escape-not-escapable-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/tag-value-triple-escape-space-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesSkipping/procstat-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/long-lines-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/long-lines-with-escapes-8 7.00 ± 0% 7.00 ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/single-short-line-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/single-short-line-with-escapes-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/many-short-lines-8 1.00 ± 0% 1.00 ± 0% ~ (all equal) DecodeEntriesWithoutSkipping/field-key-escape-not-escapable-8 1.00 ± 0% 2.00 ± 0% +100.00% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/tag-value-triple-escape-space-8 1.00 ± 0% 2.00 ± 0% +100.00% (p=0.008 n=5+5) DecodeEntriesWithoutSkipping/procstat-8 1.00 ± 0% 30.00 ± 0% +2900.00% (p=0.008 n=5+5) ``` --- lineprotocol/decoder.go | 52 ++++++++------- lineprotocol/decoder_test.go | 120 +++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 24 deletions(-) diff --git a/lineprotocol/decoder.go b/lineprotocol/decoder.go index 0ef1bee0..4cee557b 100644 --- a/lineprotocol/decoder.go +++ b/lineprotocol/decoder.go @@ -11,9 +11,16 @@ import ( const ( // When the buffer is grown, it will be grown by a minimum of 8K. minGrow = 8192 + // The buffer will be grown if there's less than minRead space available // to read into. minRead = minGrow / 2 + + // maxSlide is the maximum number of bytes that will + // be copied to the start of the buffer when reset is called. + // This is a trade-off between copy overhead and the likelihood + // that a complete line-protocol entry will fit into this size. + maxSlide = 256 ) var ( @@ -720,11 +727,18 @@ func (d *Decoder) at(i int) byte { // reset discards all the data up to d.r1 and data in d.escBuf func (d *Decoder) reset() { - if d.r1 == len(d.buf) { + if unread := len(d.buf) - d.r1; unread == 0 { // No bytes in the buffer, so we can start from the beginning without // needing to copy anything (and get better cache behaviour too). d.buf = d.buf[:0] d.r1 = 0 + } else if !d.complete && unread <= maxSlide { + // Slide the unread portion of the buffer to the + // start so that when we read more data, + // there's less chance that we'll need to grow the buffer. + copy(d.buf, d.buf[d.r1:]) + d.r1 = 0 + d.buf = d.buf[:unread] } d.r0 = d.r1 d.escBuf = d.escBuf[:0] @@ -771,30 +785,20 @@ func (d *Decoder) readMore() { } n := cap(d.buf) - len(d.buf) if n < minRead { - // There's not enough available space at the end of the buffer to read into. - if d.r0+n >= minRead { - // There's enough space when we take into account already-used - // part of buf, so slide the data to the front. - copy(d.buf, d.buf[d.r0:]) - d.buf = d.buf[:len(d.buf)-d.r0] - d.r1 -= d.r0 - d.r0 = 0 - } else { - // We need to grow the buffer. Note that we don't have to copy - // the unused part of the buffer (d.buf[:d.r0]). - // TODO provide a way to limit the maximum size that - // the buffer can grow to. - used := len(d.buf) - d.r0 - n1 := cap(d.buf) * 2 - if n1-used < minGrow { - n1 = used + minGrow - } - buf1 := make([]byte, used, n1) - copy(buf1, d.buf[d.r0:]) - d.buf = buf1 - d.r1 -= d.r0 - d.r0 = 0 + // We need to grow the buffer. Note that we don't have to copy + // the unused part of the buffer (d.buf[:d.r0]). + // TODO provide a way to limit the maximum size that + // the buffer can grow to. + used := len(d.buf) - d.r0 + n1 := cap(d.buf) * 2 + if n1-used < minGrow { + n1 = used + minGrow } + buf1 := make([]byte, used, n1) + copy(buf1, d.buf[d.r0:]) + d.buf = buf1 + d.r1 -= d.r0 + d.r0 = 0 } n, err := d.rd.Read(d.buf[len(d.buf):cap(d.buf)]) d.buf = d.buf[:len(d.buf)+n] diff --git a/lineprotocol/decoder_test.go b/lineprotocol/decoder_test.go index d4cd77ee..d26f2499 100644 --- a/lineprotocol/decoder_test.go +++ b/lineprotocol/decoder_test.go @@ -1,14 +1,17 @@ package lineprotocol import ( + "bufio" "bytes" "errors" "fmt" "io" + "math/rand" "regexp" "strings" "testing" "testing/iotest" + "time" "unicode/utf8" qt "github.com/frankban/quicktest" @@ -1351,3 +1354,120 @@ func TestErrorPositions(t *testing.T) { }) } } + +func TestDecodeLargeDataWithReader(t *testing.T) { + c := qt.New(t) + r, w := io.Pipe() + const maxTagCount = 9 + const maxFieldCount = 4 + const npoints = 2000 + go func() { + defer w.Close() + bw := bufio.NewWriter(w) + defer bw.Flush() + g := newTokenGenerator() + var enc Encoder + enc.SetLax(true) // Allow out-of-order tag keys. + for i := 0; i < npoints; i++ { + ntags := g.rand.Intn(maxTagCount + 1) + nfields := g.rand.Intn(maxFieldCount) + 1 + timestamp := g.rand.Int63n(0xffff_ffff_ffff) + enc.StartLineRaw(g.token()) + for j := 0; j < ntags; j++ { + enc.AddTagRaw(g.token(), g.token()) + } + for j := 0; j < nfields; j++ { + key, val := g.token(), g.token() + v, err := NewValueFromBytes(String, val) + if err != nil { + panic(err) + } + enc.AddFieldRaw(key, v) + } + enc.EndLine(time.Unix(0, timestamp)) + bw.Write(enc.Bytes()) + enc.Reset() + } + }() + g := newTokenGenerator() + var wc writeCounter + dec := NewDecoder(io.TeeReader(r, &wc)) + n := 0 + for ; dec.Next(); n++ { + if n >= npoints { + c.Fatalf("too many points decoded") + } + wantNtags := g.rand.Intn(maxTagCount + 1) + wantNfields := g.rand.Intn(maxFieldCount) + 1 + wantTimestamp := g.rand.Int63n(0xffff_ffff_ffff) + m, err := dec.Measurement() + c.Assert(err, qt.IsNil) + c.Check(m, qt.DeepEquals, g.token(), qt.Commentf("n %d", n)) + tagi := 0 + for { + key, val, err := dec.NextTag() + c.Assert(err, qt.IsNil) + if key == nil { + break + } + if tagi >= wantNtags { + c.Fatalf("too many tags found on entry %d", n) + } + wantKey, wantVal := g.token(), g.token() + c.Check(key, qt.DeepEquals, wantKey) + c.Check(val, qt.DeepEquals, wantVal) + tagi++ + } + c.Assert(tagi, qt.Equals, wantNtags) + fieldi := 0 + for { + key, val, err := dec.NextField() + c.Assert(err, qt.IsNil) + if key == nil { + break + } + if fieldi >= wantNfields { + c.Fatalf("too many tags found on entry %d", n) + } + wantKey, wantVal := g.token(), g.token() + c.Check(key, qt.DeepEquals, wantKey) + c.Check(val.Interface(), qt.Equals, string(wantVal)) + fieldi++ + } + c.Assert(fieldi, qt.Equals, wantNfields) + t, err := dec.Time(Nanosecond, time.Time{}) + c.Check(err, qt.IsNil) + c.Check(t.UnixNano(), qt.Equals, wantTimestamp) + } + c.Assert(n, qt.Equals, npoints) + c.Logf("total bytes: %v", wc.n) +} + +type writeCounter struct { + n int +} + +func (w *writeCounter) Write(buf []byte) (int, error) { + w.n += len(buf) + return len(buf), nil +} + +func newTokenGenerator() *tokenGenerator { + return &tokenGenerator{ + rand: rand.New(rand.NewSource(0)), + } +} + +type tokenGenerator struct { + rand *rand.Rand +} + +const alphabet = "abcdefghijklmnopqrstuvwxyz =, =, =," + +func (g *tokenGenerator) token() []byte { + data := make([]byte, g.rand.Intn(40)+1) + for i := range data { + data[i] = alphabet[g.rand.Intn(len(alphabet))] + } + return data +}