diff --git a/zio/anyio/gzip.go b/zio/anyio/gzip.go index e5b09ece1b..2af21fb9b8 100644 --- a/zio/anyio/gzip.go +++ b/zio/anyio/gzip.go @@ -17,20 +17,18 @@ func GzipReader(r io.Reader) (io.Reader, error) { return rs, nil } } - recorder := NewRecorder(r) - track := NewTrack(recorder) + track := NewTrack(r) // gzip.NewReader blocks until it reads ten bytes. readGzipID only // reads two bytes. if !readGzipID(track) { - return recorder, nil + return track.Reader(), nil } track.Reset() _, err := gzip.NewReader(track) if err == nil { - // create a new reader from recorder (track keeps a copy of read data) - return gzip.NewReader(recorder) + return gzip.NewReader(track.Reader()) } - return recorder, nil + return track.Reader(), nil } // RFC 1952, Section 2.3.1 diff --git a/zio/anyio/reader.go b/zio/anyio/reader.go index dfd948f4bb..c2bc85b839 100644 --- a/zio/anyio/reader.go +++ b/zio/anyio/reader.go @@ -65,26 +65,25 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea vngErr = errors.New("vng: auto-detection requires seekable input") } - recorder := NewRecorder(r) - track := NewTrack(recorder) + track := NewTrack(r) arrowsErr := isArrowStream(track) if arrowsErr == nil { - return arrowio.NewReader(zctx, recorder) + return arrowio.NewReader(zctx, track.Reader()) } arrowsErr = fmt.Errorf("arrows: %w", arrowsErr) track.Reset() zeekErr := match(zeekio.NewReader(zed.NewContext(), track), "zeek", 1) if zeekErr == nil { - return zio.NopReadCloser(zeekio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(zeekio.NewReader(zctx, track.Reader())), nil } track.Reset() // ZJSON must come before JSON and ZSON since it is a subset of both. zjsonErr := match(zjsonio.NewReader(zed.NewContext(), track), "zjson", 1) if zjsonErr == nil { - return zio.NopReadCloser(zjsonio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(zjsonio.NewReader(zctx, track.Reader())), nil } track.Reset() @@ -93,13 +92,13 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea // sake of tests. jsonErr := match(jsonio.NewReader(zed.NewContext(), track), "json", 10) if jsonErr == nil { - return zio.NopReadCloser(jsonio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(jsonio.NewReader(zctx, track.Reader())), nil } track.Reset() zsonErr := match(zsonio.NewReader(zed.NewContext(), track), "zson", 1) if zsonErr == nil { - return zio.NopReadCloser(zsonio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(zsonio.NewReader(zctx, track.Reader())), nil } track.Reset() @@ -113,7 +112,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea // Close zngReader to ensure that it does not continue to call track.Read. zngReader.Close() if zngErr == nil { - return zngio.NewReaderWithOpts(zctx, recorder, opts.ZNG), nil + return zngio.NewReaderWithOpts(zctx, track.Reader(), opts.ZNG), nil } track.Reset() @@ -126,7 +125,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea track.Reset() csvErr = match(csvio.NewReader(zed.NewContext(), track, opts.CSV), "csv", 1) if csvErr == nil { - return zio.NopReadCloser(csvio.NewReader(zctx, recorder, opts.CSV)), nil + return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), opts.CSV)), nil } } track.Reset() diff --git a/zio/anyio/track.go b/zio/anyio/track.go index 547d9d2fa2..aacdd09b1d 100644 --- a/zio/anyio/track.go +++ b/zio/anyio/track.go @@ -1,24 +1,51 @@ package anyio +import "io" + const TrackSize = InitBufferSize type Track struct { + rs io.ReadSeeker + initial int64 + recorder *Recorder off int } -func NewTrack(r *Recorder) *Track { +func NewTrack(r io.Reader) *Track { + if rs, ok := r.(io.ReadSeeker); ok { + if n, err := rs.Seek(0, io.SeekCurrent); err == nil { + return &Track{rs: rs, initial: n} + } + } return &Track{ - recorder: r, + recorder: NewRecorder(r), } } func (t *Track) Reset() { + if t.rs != nil { + // We ignore errors here under the assumption that a subsequent + // call to Read will also fail. + t.rs.Seek(t.initial, io.SeekStart) + return + } t.off = 0 } func (t *Track) Read(b []byte) (int, error) { + if t.rs != nil { + return t.rs.Read(b) + } n, err := t.recorder.ReadAt(t.off, b) t.off += n return n, err } + +func (t *Track) Reader() io.Reader { + if t.rs != nil { + t.Reset() + return t.rs + } + return t.recorder +} diff --git a/zio/anyio/ztests/huge.yaml b/zio/anyio/ztests/huge.yaml new file mode 100644 index 0000000000..de809b288b --- /dev/null +++ b/zio/anyio/ztests/huge.yaml @@ -0,0 +1,23 @@ +script: | + ! yes ' ' | head -c $((11 * 1024 * 1024)) > huge.zson + echo 0 >> huge.zson + zq -z huge.zson + ! cat huge.zson | zq -z - + +outputs: + - name: stdout + data: | + 0 + - name: stderr + data: | + stdio:stdin: format detection error + arrows: schema message length exceeds 1 MiB + csv: line 1: no comma found + json: buffer exceeded max size trying to infer input format + line: auto-detection not supported + parquet: auto-detection requires seekable input + vng: auto-detection requires seekable input + zeek: line 1: bad types/fields definition in zeek header + zjson: line 1: malformed ZJSON: bad type object: "": unpacker error parsing JSON: unexpected end of JSON input + zng: buffer exceeded max size trying to infer input format + zson: buffer exceeded max size trying to infer input format