From c95c75596f716b9a87aa0d0b1c8781704190126a Mon Sep 17 00:00:00 2001 From: Steve McCanne Date: Thu, 5 Mar 2020 15:27:27 -0800 Subject: [PATCH] move slicer to its own package (#390) This commit cleans up the slicer logic by moving the reader into its own package. We also moved LoadIndex from pcap/slicer.go to pcap/index.go. It also simplifies the slicer interface to take an io.ReadSeeker instead of *os.File (which implements ReadSeeker). This is the first refactor toward adding support for pcap-ng. --- pcap/index.go | 17 +++++++- pcap/slicer.go | 83 ++++----------------------------------- pkg/slicer/reader.go | 56 ++++++++++++++++++++++++++ pkg/slicer/slice.go | 10 +++++ pkg/slicer/slicer_test.go | 27 +++++++++++++ 5 files changed, 116 insertions(+), 77 deletions(-) create mode 100644 pkg/slicer/reader.go create mode 100644 pkg/slicer/slice.go create mode 100644 pkg/slicer/slicer_test.go diff --git a/pcap/index.go b/pcap/index.go index a59d391a2b..0b6f8d9581 100644 --- a/pcap/index.go +++ b/pcap/index.go @@ -1,12 +1,15 @@ package pcap import ( + "encoding/json" "errors" "io" + "io/ioutil" "sync" "github.com/brimsec/zq/pkg/nano" "github.com/brimsec/zq/pkg/ranger" + "github.com/brimsec/zq/pkg/slicer" ) type Index struct { @@ -17,7 +20,7 @@ type Index struct { // there is just one section at the beginning of the file. For nextgen pcaps, // there can be multiple sections. type Section struct { - Blocks []Slice + Blocks []slicer.Slice Index ranger.Envelope } @@ -47,7 +50,7 @@ func CreateIndex(r io.Reader, limit int) (*Index, error) { return nil, errors.New("no packets found") } // legacy pcap file has just the file header at the start of the file - blocks := []Slice{{0, fileHeaderLen}} + blocks := []slicer.Slice{{0, fileHeaderLen}} return &Index{ Sections: []Section{{ Blocks: blocks, @@ -84,3 +87,13 @@ func (w *IndexWriter) Close() (*Index, error) { w.wg.Wait() return w.idx, w.err } + +func LoadIndex(path string) (*Index, error) { + b, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + var index *Index + err = json.Unmarshal(b, &index) + return index, err +} diff --git a/pcap/slicer.go b/pcap/slicer.go index 80c7d5c8ef..017a1a8776 100644 --- a/pcap/slicer.go +++ b/pcap/slicer.go @@ -1,77 +1,20 @@ package pcap import ( - "encoding/json" "errors" - "io" - "io/ioutil" "os" "github.com/brimsec/zq/pkg/nano" "github.com/brimsec/zq/pkg/ranger" + "github.com/brimsec/zq/pkg/slicer" ) -// Slicer implements io.Reader reading the sliced regions provided to it from -// the underlying file thus extracting subsets of an underlying file without -// modifying or copying the file. -type Slicer struct { - slices []Slice - slice Slice - file *os.File - eof bool -} - -func NewSlicer(file *os.File, index *Index, span nano.Span) (*Slicer, error) { +func NewSlicer(file *os.File, index *Index, span nano.Span) (*slicer.Reader, error) { slices, err := GenerateSlices(index, span) if err != nil { return nil, err } - s := &Slicer{ - slices: slices, - file: file, - } - return s, s.next() -} - -func (s *Slicer) next() error { - if len(s.slices) == 0 { - s.eof = true - return nil - } - s.slice = s.slices[0] - s.slices = s.slices[1:] - _, err := s.file.Seek(int64(s.slice.Offset), 0) - return err -} - -func (s *Slicer) Read(b []byte) (int, error) { - if s.eof { - return 0, io.EOF - } - p := b - if uint64(len(p)) > s.slice.Length { - p = p[:s.slice.Length] - } - n, err := s.file.Read(p) - if n != 0 { - if err == io.EOF { - err = nil - } - s.slice.Length -= uint64(n) - if s.slice.Length == 0 { - err = s.next() - } - } - return n, err -} - -type Slice struct { - Offset uint64 - Length uint64 -} - -func (s Slice) Overlaps(x Slice) bool { - return x.Offset >= s.Offset && x.Offset < s.Offset+x.Length + return slicer.NewReader(file, slices) } // GenerateSlices takes an index and time span and generates a list of @@ -79,8 +22,8 @@ func (s Slice) Overlaps(x Slice) bool { // underlying pcap file. Extra packets may appear in the resulting stream // but all packets that fall within the time range will be produced, i.e., // another layering of time filtering should be applied to resulting packets. -func GenerateSlices(index *Index, span nano.Span) ([]Slice, error) { - var slices []Slice +func GenerateSlices(index *Index, span nano.Span) ([]slicer.Slice, error) { + var slices []slicer.Slice for _, section := range index.Sections { pslice, err := FindPacketSlice(section.Index, span) if err != nil { @@ -96,21 +39,11 @@ func GenerateSlices(index *Index, span nano.Span) ([]Slice, error) { return slices, nil } -func FindPacketSlice(e ranger.Envelope, span nano.Span) (Slice, error) { +func FindPacketSlice(e ranger.Envelope, span nano.Span) (slicer.Slice, error) { if len(e) == 0 { - return Slice{}, errors.New("no packets") + return slicer.Slice{}, errors.New("no packets") } d := e.FindSmallestDomain(ranger.Range{uint64(span.Ts), uint64(span.End())}) //XXX check for empty domain.. though seems like this will do the right thing - return Slice{d.X0, d.X1 - d.X0}, nil -} - -func LoadIndex(path string) (*Index, error) { - b, err := ioutil.ReadFile(path) - if err != nil { - return nil, err - } - var index *Index - err = json.Unmarshal(b, &index) - return index, err + return slicer.Slice{d.X0, d.X1 - d.X0}, nil } diff --git a/pkg/slicer/reader.go b/pkg/slicer/reader.go new file mode 100644 index 0000000000..ff1a5e2625 --- /dev/null +++ b/pkg/slicer/reader.go @@ -0,0 +1,56 @@ +// Package slicer provides an io.Reader that returns subsets of a file. +package slicer + +import ( + "io" +) + +// Reader implements io.Reader reading the sliced regions provided to it from +// the underlying file thus extracting subsets of an underlying file without +// modifying or copying the file. +type Reader struct { + slices []Slice + slice Slice + seeker io.ReadSeeker + eof bool +} + +func NewReader(seeker io.ReadSeeker, slices []Slice) (*Reader, error) { + r := &Reader{ + slices: slices, + seeker: seeker, + } + return r, r.next() +} + +func (r *Reader) next() error { + if len(r.slices) == 0 { + r.eof = true + return nil + } + r.slice = r.slices[0] + r.slices = r.slices[1:] + _, err := r.seeker.Seek(int64(r.slice.Offset), 0) + return err +} + +func (r *Reader) Read(b []byte) (int, error) { + if r.eof { + return 0, io.EOF + } + p := b + if uint64(len(p)) > r.slice.Length { + p = p[:r.slice.Length] + } + n, err := r.seeker.Read(p) + if n != 0 { + if err == io.EOF { + err = nil + } + r.slice.Length -= uint64(n) + if r.slice.Length == 0 { + err = r.next() + } + } + return n, err +} diff --git a/pkg/slicer/slice.go b/pkg/slicer/slice.go new file mode 100644 index 0000000000..705befd3bd --- /dev/null +++ b/pkg/slicer/slice.go @@ -0,0 +1,10 @@ +package slicer + +type Slice struct { + Offset uint64 + Length uint64 +} + +func (s Slice) Overlaps(x Slice) bool { + return x.Offset >= s.Offset && x.Offset < s.Offset+x.Length +} diff --git a/pkg/slicer/slicer_test.go b/pkg/slicer/slicer_test.go new file mode 100644 index 0000000000..8f496c84ce --- /dev/null +++ b/pkg/slicer/slicer_test.go @@ -0,0 +1,27 @@ +package slicer_test + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/brimsec/zq/pkg/slicer" + "github.com/stretchr/testify/assert" +) + +func TestSlicer(t *testing.T) { + in := []byte("abcdefghijklmnopqrstuvwxyz") + slices := []slicer.Slice{ + {0, 2}, + {0, 26}, + {3, 4}, + {25, 1}, + {25, 2}, + } + expected := []byte("ababcdefghijklmnopqrstuvwxyzdefgzz") + reader, err := slicer.NewReader(bytes.NewReader(in), slices) + assert.NoError(t, err) + out, err := ioutil.ReadAll(reader) + assert.NoError(t, err) + assert.Exactly(t, expected, out) +}