Skip to content

Commit

Permalink
Add flow IDs to log lines. (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
pboothe authored Dec 2, 2019
1 parent 626397f commit f168cf9
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 16 deletions.
5 changes: 5 additions & 0 deletions demuxer/flowkey.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package demuxer

import (
"fmt"
"net"

"github.com/google/gopacket"
Expand All @@ -16,6 +17,10 @@ type FlowKey struct {
loP, hiP uint16
}

func (f *FlowKey) String() string {
return fmt.Sprintf("%s:%d<->%s:%d", net.IP([]byte(f.lo)).String(), f.loP, net.IP([]byte(f.hi)).String(), f.hiP)
}

// fromPacket converts a packet's TCP 4-tuple into a FlowKey suitable for being
// a map key. Never pass fromPacket a non-TCP/IP packet - it will crash.
func fromPacket(p gopacket.Packet) FlowKey {
Expand Down
8 changes: 8 additions & 0 deletions demuxer/flowkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,39 @@ func TestFlowKeyFrom4Tuple(t *testing.T) {
srcPort uint16
dstIP net.IP
dstPort uint16
str string
}{
{
name: "Different hosts",
srcIP: net.ParseIP("10.1.1.1").To4(),
srcPort: 2000,
dstIP: net.ParseIP("192.168.0.1").To4(),
dstPort: 1000,
str: "10.1.1.1:2000<->192.168.0.1:1000",
},
{
name: "Same host, different ports",
srcIP: net.ParseIP("10.2.3.4").To4(),
srcPort: 2000,
dstIP: net.ParseIP("10.2.3.4").To4(),
dstPort: 1000,
str: "10.2.3.4:1000<->10.2.3.4:2000",
},
{
name: "Different v6 hosts",
srcIP: net.ParseIP("2:3::").To16(),
srcPort: 2000,
dstIP: net.ParseIP("4:5::").To16(),
dstPort: 1000,
str: "2:3:::2000<->4:5:::1000",
},
{
name: "Same v6 host, different ports",
srcIP: net.ParseIP("1::").To16(),
srcPort: 2000,
dstIP: net.ParseIP("1::").To16(),
dstPort: 1000,
str: "1:::1000<->1:::2000",
},
}
for _, tt := range tests {
Expand All @@ -49,6 +54,9 @@ func TestFlowKeyFrom4Tuple(t *testing.T) {
if f1 != f2 {
t.Errorf("%+v != %+v", f1, f2)
}
if f1.String() != tt.str || f2.String() != tt.str {
t.Errorf("Strings should be equal: %q, %q, %q", f1.String(), f2.String(), tt.str)
}
})
}
}
2 changes: 1 addition & 1 deletion demuxer/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
if ok {
delete(d.oldFlows, flow)
} else {
t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration)
t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration, flow.String())
}
d.currentFlows[flow] = t
}
Expand Down
14 changes: 9 additions & 5 deletions saver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type TCP struct {
state statusSetter
anon anonymize.IPAnonymizer

id string

stopOnce sync.Once
}

Expand Down Expand Up @@ -153,6 +155,7 @@ func (t *TCP) savePackets(ctx context.Context, uuidDelay, duration time.Duration
// Read the first packet to determine the TCP+IP header size (as IPv6 is variable in size)
p, ok := t.readPacket(derivedCtx)
if !ok {
log.Println("PCAP capture cancelled with no packets for flow", t.id)
t.error("nopackets")
return
}
Expand Down Expand Up @@ -200,12 +203,12 @@ func (t *TCP) savePackets(ctx context.Context, uuidDelay, duration time.Duration
select {
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
log.Println("UUID channel closed, PCAP capture cancelled with no UUID")
log.Println("UUID channel closed, PCAP capture cancelled with no UUID for flow", t.id)
t.error("uuidchan")
return
}
default:
log.Println("UUID did not arrive; PCAP capture cancelled with no UUID")
log.Println("UUID did not arrive; PCAP capture cancelled with no UUID for flow", t.id)
t.error("uuid")
return
}
Expand Down Expand Up @@ -294,7 +297,7 @@ func (t *TCP) State() string {

// newTCP makes a new saver.TCP but does not start it. It is here as its own
// function to enable whitebox testing and instrumentation.
func newTCP(dir string, anon anonymize.IPAnonymizer) *TCP {
func newTCP(dir string, anon anonymize.IPAnonymizer, id string) *TCP {
// With a 1500 byte MTU, this is a ~10 millisecond buffer at a line rate of
// 10Gbps:
//
Expand Down Expand Up @@ -324,6 +327,7 @@ func newTCP(dir string, anon anonymize.IPAnonymizer) *TCP {
dir: dir,
state: newStatus("notstarted"),
anon: anon,
id: id,
}
}

Expand All @@ -335,8 +339,8 @@ func newTCP(dir string, anon anonymize.IPAnonymizer) *TCP {
//
// It is the caller's responsibility to close Pchan or cancel the context.
// uuidDelay must be smaller than maxDuration.
func StartNew(ctx context.Context, anon anonymize.IPAnonymizer, dir string, uuidDelay, maxDuration time.Duration) *TCP {
s := newTCP(dir, anon)
func StartNew(ctx context.Context, anon anonymize.IPAnonymizer, dir string, uuidDelay, maxDuration time.Duration, id string) *TCP {
s := newTCP(dir, anon, id)
go s.start(ctx, uuidDelay, maxDuration)
return s
}
20 changes: 10 additions & 10 deletions saver/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestSaverDryRun(t *testing.T) {
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

s := newTCP(dir, anonymize.New(anonymize.None))
s := newTCP(dir, anonymize.New(anonymize.None), "TestSaverDryRun")
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

Expand Down Expand Up @@ -117,11 +117,11 @@ func TestSaverDryRun(t *testing.T) {
}

func TestSaverWithUUID(t *testing.T) {
dir, err := ioutil.TempDir("", "TestSaverNoUUID")
dir, err := ioutil.TempDir("", "TestSaverWithUUID")
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

s := newTCP(dir, anonymize.New(anonymize.None))
s := newTCP(dir, anonymize.New(anonymize.None), "TestSaverWithUUID")
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

Expand Down Expand Up @@ -173,7 +173,7 @@ func TestSaverNoUUID(t *testing.T) {
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

s := newTCP(dir, anonymize.New(anonymize.None))
s := newTCP(dir, anonymize.New(anonymize.None), "TestSaverNoUUID")
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

Expand All @@ -200,11 +200,11 @@ func TestSaverNoUUID(t *testing.T) {
}

func TestSaverNoUUIDClosedUUIDChan(t *testing.T) {
dir, err := ioutil.TempDir("", "TestSaverNoUUID")
dir, err := ioutil.TempDir("", "TestSaverNoUUIDClosedUUIDChan")
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

s := newTCP(dir, anonymize.New(anonymize.None))
s := newTCP(dir, anonymize.New(anonymize.None), "TestSaverNoUUIDClosedUUIDChan")
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

Expand Down Expand Up @@ -236,7 +236,7 @@ func TestSaverCantMkdir(t *testing.T) {
rtx.Must(os.Chmod(dir, 0111), "Could not chmod dir to unwriteable")
defer os.RemoveAll(dir)

s := newTCP(dir, anonymize.New(anonymize.None))
s := newTCP(dir, anonymize.New(anonymize.None), "TestSaverCantMkdir")
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

Expand Down Expand Up @@ -269,7 +269,7 @@ func TestSaverCantCreate(t *testing.T) {
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

s := newTCP(dir, anonymize.New(anonymize.None))
s := newTCP(dir, anonymize.New(anonymize.None), "TestSaverCantCreate")
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

Expand Down Expand Up @@ -311,7 +311,7 @@ func TestSaverWithRealv4Data(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

s := StartNew(ctx, anonymize.New(anonymize.Netblock), dir, 5*time.Second, 10*time.Second)
s := StartNew(ctx, anonymize.New(anonymize.Netblock), dir, 5*time.Second, 10*time.Second, "TestSaverWithRealv4Data")

tstamp := time.Date(2000, 1, 2, 3, 4, 5, 6, time.UTC)
s.UUIDchan <- UUIDEvent{"testUUID", tstamp}
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestSaverWithRealv6Data(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

s := StartNew(ctx, anonymize.New(anonymize.Netblock), dir, 5*time.Second, 10*time.Second)
s := StartNew(ctx, anonymize.New(anonymize.Netblock), dir, 5*time.Second, 10*time.Second, "TestSaverWithRealv6Data")

tstamp := time.Date(2000, 1, 2, 3, 4, 5, 6, time.UTC)
s.UUIDchan <- UUIDEvent{"testUUID", tstamp}
Expand Down

0 comments on commit f168cf9

Please sign in to comment.