Skip to content

Commit

Permalink
Merge pull request #51 from influxdata/rog-037-fix-read-bug
Browse files Browse the repository at this point in the history
lineprotocol: fix Decoder bug when decoding from an io.Reader
  • Loading branch information
rogpeppe authored Nov 24, 2021
2 parents e5cb25a + 6e963d9 commit 8d23ab8
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 24 deletions.
52 changes: 28 additions & 24 deletions lineprotocol/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ import (
const (
// When the buffer is grown, it will be grown by a minimum of 8K.
minGrow = 8192

// The buffer will be grown if there's less than minRead space available
// to read into.
minRead = minGrow / 2

// maxSlide is the maximum number of bytes that will
// be copied to the start of the buffer when reset is called.
// This is a trade-off between copy overhead and the likelihood
// that a complete line-protocol entry will fit into this size.
maxSlide = 256
)

var (
Expand Down Expand Up @@ -720,11 +727,18 @@ func (d *Decoder) at(i int) byte {

// reset discards all the data up to d.r1 and data in d.escBuf
func (d *Decoder) reset() {
if d.r1 == len(d.buf) {
if unread := len(d.buf) - d.r1; unread == 0 {
// No bytes in the buffer, so we can start from the beginning without
// needing to copy anything (and get better cache behaviour too).
d.buf = d.buf[:0]
d.r1 = 0
} else if !d.complete && unread <= maxSlide {
// Slide the unread portion of the buffer to the
// start so that when we read more data,
// there's less chance that we'll need to grow the buffer.
copy(d.buf, d.buf[d.r1:])
d.r1 = 0
d.buf = d.buf[:unread]
}
d.r0 = d.r1
d.escBuf = d.escBuf[:0]
Expand Down Expand Up @@ -771,30 +785,20 @@ func (d *Decoder) readMore() {
}
n := cap(d.buf) - len(d.buf)
if n < minRead {
// There's not enough available space at the end of the buffer to read into.
if d.r0+n >= minRead {
// There's enough space when we take into account already-used
// part of buf, so slide the data to the front.
copy(d.buf, d.buf[d.r0:])
d.buf = d.buf[:len(d.buf)-d.r0]
d.r1 -= d.r0
d.r0 = 0
} else {
// We need to grow the buffer. Note that we don't have to copy
// the unused part of the buffer (d.buf[:d.r0]).
// TODO provide a way to limit the maximum size that
// the buffer can grow to.
used := len(d.buf) - d.r0
n1 := cap(d.buf) * 2
if n1-used < minGrow {
n1 = used + minGrow
}
buf1 := make([]byte, used, n1)
copy(buf1, d.buf[d.r0:])
d.buf = buf1
d.r1 -= d.r0
d.r0 = 0
// We need to grow the buffer. Note that we don't have to copy
// the unused part of the buffer (d.buf[:d.r0]).
// TODO provide a way to limit the maximum size that
// the buffer can grow to.
used := len(d.buf) - d.r0
n1 := cap(d.buf) * 2
if n1-used < minGrow {
n1 = used + minGrow
}
buf1 := make([]byte, used, n1)
copy(buf1, d.buf[d.r0:])
d.buf = buf1
d.r1 -= d.r0
d.r0 = 0
}
n, err := d.rd.Read(d.buf[len(d.buf):cap(d.buf)])
d.buf = d.buf[:len(d.buf)+n]
Expand Down
120 changes: 120 additions & 0 deletions lineprotocol/decoder_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package lineprotocol

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"math/rand"
"regexp"
"strings"
"testing"
"testing/iotest"
"time"
"unicode/utf8"

qt "github.com/frankban/quicktest"
Expand Down Expand Up @@ -1351,3 +1354,120 @@ func TestErrorPositions(t *testing.T) {
})
}
}

func TestDecodeLargeDataWithReader(t *testing.T) {
c := qt.New(t)
r, w := io.Pipe()
const maxTagCount = 9
const maxFieldCount = 4
const npoints = 2000
go func() {
defer w.Close()
bw := bufio.NewWriter(w)
defer bw.Flush()
g := newTokenGenerator()
var enc Encoder
enc.SetLax(true) // Allow out-of-order tag keys.
for i := 0; i < npoints; i++ {
ntags := g.rand.Intn(maxTagCount + 1)
nfields := g.rand.Intn(maxFieldCount) + 1
timestamp := g.rand.Int63n(0xffff_ffff_ffff)
enc.StartLineRaw(g.token())
for j := 0; j < ntags; j++ {
enc.AddTagRaw(g.token(), g.token())
}
for j := 0; j < nfields; j++ {
key, val := g.token(), g.token()
v, err := NewValueFromBytes(String, val)
if err != nil {
panic(err)
}
enc.AddFieldRaw(key, v)
}
enc.EndLine(time.Unix(0, timestamp))
bw.Write(enc.Bytes())
enc.Reset()
}
}()
g := newTokenGenerator()
var wc writeCounter
dec := NewDecoder(io.TeeReader(r, &wc))
n := 0
for ; dec.Next(); n++ {
if n >= npoints {
c.Fatalf("too many points decoded")
}
wantNtags := g.rand.Intn(maxTagCount + 1)
wantNfields := g.rand.Intn(maxFieldCount) + 1
wantTimestamp := g.rand.Int63n(0xffff_ffff_ffff)
m, err := dec.Measurement()
c.Assert(err, qt.IsNil)
c.Check(m, qt.DeepEquals, g.token(), qt.Commentf("n %d", n))
tagi := 0
for {
key, val, err := dec.NextTag()
c.Assert(err, qt.IsNil)
if key == nil {
break
}
if tagi >= wantNtags {
c.Fatalf("too many tags found on entry %d", n)
}
wantKey, wantVal := g.token(), g.token()
c.Check(key, qt.DeepEquals, wantKey)
c.Check(val, qt.DeepEquals, wantVal)
tagi++
}
c.Assert(tagi, qt.Equals, wantNtags)
fieldi := 0
for {
key, val, err := dec.NextField()
c.Assert(err, qt.IsNil)
if key == nil {
break
}
if fieldi >= wantNfields {
c.Fatalf("too many tags found on entry %d", n)
}
wantKey, wantVal := g.token(), g.token()
c.Check(key, qt.DeepEquals, wantKey)
c.Check(val.Interface(), qt.Equals, string(wantVal))
fieldi++
}
c.Assert(fieldi, qt.Equals, wantNfields)
t, err := dec.Time(Nanosecond, time.Time{})
c.Check(err, qt.IsNil)
c.Check(t.UnixNano(), qt.Equals, wantTimestamp)
}
c.Assert(n, qt.Equals, npoints)
c.Logf("total bytes: %v", wc.n)
}

type writeCounter struct {
n int
}

func (w *writeCounter) Write(buf []byte) (int, error) {
w.n += len(buf)
return len(buf), nil
}

func newTokenGenerator() *tokenGenerator {
return &tokenGenerator{
rand: rand.New(rand.NewSource(0)),
}
}

type tokenGenerator struct {
rand *rand.Rand
}

const alphabet = "abcdefghijklmnopqrstuvwxyz =, =, =,"

func (g *tokenGenerator) token() []byte {
data := make([]byte, g.rand.Intn(40)+1)
for i := range data {
data[i] = alphabet[g.rand.Intn(len(alphabet))]
}
return data
}

0 comments on commit 8d23ab8

Please sign in to comment.