Skip to content

Commit

Permalink
Packet Post Endpoint: Stream status updates (#378)
Browse files Browse the repository at this point in the history
Posting large pcap files to a space can often take a long time as zeek slowly chugs away. When a pcap is posted to a space via the POST /space/:space/packet endpoint api now provides streaming updates in order to provide clients with an idea of how far along the ingest process is:

When a the request is initiated, a zeek subprocess is spawned, and the contents of the posted pcap is piped sequentially into zeek stdin and a pcap index writer. Assuming this all starts with no errors, a 200 ok status messaged is returned once the first zeek log files in temp zeek log file directory have been written to disk. A TaskStart response is transmitted over the response stream. What data that has been written is also transformed to sorted bzng. At this point the space is queryable.
On a hardwired two second interval, the server streams back piped json status update payloads that contain: The start timestamp of ingest, the update timestamp, the total size of pcap file and the size of bytes read from the pcap file. From this information a client should be able to approximate the percentage completion of the ingest process (this won't be entirely accurate, however, it doesn't account for the time it will take transform the zeek logs in to time sorted bzng at the end).
A TaskEnd message is sent at the completion of ingest. If TaskEnd.Error is not null users will know that an error occurred during ingest. If an error occurs during ingest, the process is aborted and the space is reset.
Also:

Add api.Stream/JSONPipe functionality to make it easier to read/write piped json files over the wire.
  • Loading branch information
mattnibs authored Mar 5, 2020
1 parent 0c9d143 commit 66b49b6
Show file tree
Hide file tree
Showing 13 changed files with 514 additions and 110 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f
github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/google/gopacket v1.1.17
github.com/gorilla/mux v1.7.4
github.com/mccanne/charm v0.0.3-0.20191224190439-b05e1b7b1be3
Expand All @@ -15,5 +16,6 @@ require (
github.com/yuin/goldmark v1.1.22
go.uber.org/zap v1.12.0
golang.org/x/text v0.3.0
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbBY=
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
Expand Down Expand Up @@ -80,6 +81,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71 h1:Xe2gvTZUJpsvOWUnvmL/tmhVBZUmHSvLbMjRj6NUUKo=
Expand Down
30 changes: 30 additions & 0 deletions pcap/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pcap
import (
"errors"
"io"
"sync"

"github.com/brimsec/zq/pkg/nano"
"github.com/brimsec/zq/pkg/ranger"
Expand Down Expand Up @@ -54,3 +55,32 @@ func CreateIndex(r io.Reader, limit int) (*Index, error) {
},
}, nil
}

type IndexWriter struct {
io.WriteCloser
err error
idx *Index
wg sync.WaitGroup
}

func (w *IndexWriter) run(r *io.PipeReader, limit int) {
w.idx, w.err = CreateIndex(r, limit)
if w.err != nil {
r.CloseWithError(w.err)
}
w.wg.Done()
}

func NewIndexWriter(limit int) *IndexWriter {
pr, pw := io.Pipe()
i := &IndexWriter{WriteCloser: pw}
i.wg.Add(1)
go i.run(pr, limit)
return i
}

func (w *IndexWriter) Close() (*Index, error) {
w.WriteCloser.Close()
w.wg.Wait()
return w.idx, w.err
}
20 changes: 20 additions & 0 deletions zqd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ type Error struct {
Info interface{} `json:"info,omitempty"`
}

// Error implements the error interface so this struct can be passed around
// as an error. The error string is the JSON encoding of the Error struct.
// with indentation.
func (e *Error) Error() string {
b, err := json.MarshalIndent(e, "", "\t")
if err != nil {
// this shouldn't happen
return e.Message
}
return string(b)
}

type TaskStart struct {
Type string `json:"type"`
TaskID int64 `json:"task_id"`
Expand Down Expand Up @@ -94,6 +106,14 @@ type PacketPostRequest struct {
Path string `json:"path"`
}

type PacketPostStatus struct {
Type string `json:"type"`
StartTime nano.Ts `json:"start_time"`
UpdateTime nano.Ts `json:"update_time"`
PacketSize int64 `json:"packet_total_size" unit:"bytes"`
PacketReadSize int64 `json:"packet_read_size" unit:"bytes"`
}

// PacketSearch are the query string args to the packet endpoint when searching
// for packets within a connection 5-tuple.
type PacketSearch struct {
Expand Down
4 changes: 2 additions & 2 deletions zqd/search/pipe.go → zqd/api/jsonpipe.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package search
package api

import (
"encoding/json"
Expand All @@ -23,7 +23,7 @@ func NewJSONPipe(w http.ResponseWriter) *JSONPipe {
p := &JSONPipe{
ResponseWriter: w,
encoder: json.NewEncoder(w),
separator: []byte("\n\n"),
separator: sep,
}
return p
}
Expand Down
31 changes: 31 additions & 0 deletions zqd/api/jsonpipescanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package api

import (
"bufio"
"bytes"
"io"
)

var sep = []byte("\n\n")

func NewJSONPipeScanner(r io.Reader) *bufio.Scanner {
s := bufio.NewScanner(r)
s.Split(splitJSONPipe)
return s
}

func splitJSONPipe(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.Index(data, sep); i >= 0 {
// We have a full newline-terminated line.
return i + 2, data[0:i], nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
47 changes: 47 additions & 0 deletions zqd/api/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package api

import (
"bufio"
"context"
"io"
)

type Stream struct {
scanner *bufio.Scanner
cancel context.CancelFunc
}

func NewStream(s *bufio.Scanner, c context.CancelFunc) *Stream {
return &Stream{s, c}
}

func (s *Stream) end() {
if s.cancel != nil {
s.cancel()
s.cancel = nil
}
}

func (s *Stream) Next() (interface{}, error) {
if s.scanner.Scan() {
v, err := unpack(s.scanner.Bytes())
if err != nil {
s.end()
return nil, err
}
end, ok := v.(*TaskEnd)
if ok {
if end.Error != nil {
err = end.Error
s.end()
}
}
return v, err
}
s.end()
err := s.scanner.Err()
if err != io.EOF {
return nil, err
}
return nil, nil
}
49 changes: 49 additions & 0 deletions zqd/api/unpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package api

import (
"encoding/json"
"fmt"
)

// unpack transforms a piped json stream into the appropriate api response
// and returns it as an empty interface so that the caller can receive
// a stream of objects, check their types, and process them accordingly.
func unpack(b []byte) (interface{}, error) {
var v interface{}
err := json.Unmarshal(b, &v)
if err != nil {
return nil, err
}
object, ok := v.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("bad json object: %s", string(b))
}
which, ok := object["type"]
if !ok {
return nil, fmt.Errorf("no type field in search result: %s", string(b))
}
var out interface{}
switch which {
case "TaskStart":
out = &TaskStart{}
case "TaskEnd":
out = &TaskEnd{}
case "SearchRecords":
out = &SearchRecords{}
case "SearchWarnings":
out = &SearchWarnings{}
case "SearchStats":
out = &SearchStats{}
case "SearchEnd":
out = &SearchEnd{}
case "PacketPostStatus":
out = &PacketPostStatus{}
default:
return nil, fmt.Errorf("unknown type in results stream: %s", which)
}
err = json.Unmarshal(b, out)
if err != nil {
return nil, err
}
return out, nil
}
Loading

0 comments on commit 66b49b6

Please sign in to comment.