Skip to content

Commit

Permalink
feat(decoders.sflow): Add decoding of drop packets (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jul 18, 2024
1 parent b19557c commit e171a04
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 2 deletions.
14 changes: 14 additions & 0 deletions decoders/sflow/datastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ type ExtendedGateway struct {
LocalPref uint32 `json:"local-pref"`
}

type EgressQueue struct {
Queue uint32 `json:"queue"`
}

type ExtendedACL struct {
Number uint32 `json:"number"`
Name string `json:"name"`
Direction uint32 `json:"direction"` // 0:unknown, 1:ingress, 2:egress
}

type ExtendedFunction struct {
Symbol string `json:"symbol"`
}

type IfCounters struct {
IfIndex uint32 `json:"if-index"`
IfType uint32 `json:"if-type"`
Expand Down
12 changes: 12 additions & 0 deletions decoders/sflow/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ type ExpandedFlowSample struct {
Records []FlowRecord `json:"records"`
}

// DropSample data structure according to https://sflow.org/sflow_drops.txt
type DropSample struct {
Header SampleHeader `json:"header"`

Drops uint32 `json:"drops"`
Input uint32 `json:"input"`
Output uint32 `json:"output"`
Reason uint32 `json:"reason"`
FlowRecordsCount uint32 `json:"flow-records-count"`
Records []FlowRecord `json:"records"`
}

type RecordHeader struct {
DataFormat uint32 `json:"data-format"`
Length uint32 `json:"length"`
Expand Down
52 changes: 50 additions & 2 deletions decoders/sflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
SAMPLE_FORMAT_COUNTER = 2
SAMPLE_FORMAT_EXPANDED_FLOW = 3
SAMPLE_FORMAT_EXPANDED_COUNTER = 4
SAMPLE_FORMAT_DROP = 5
)

// Opaque flow_data types according to https://sflow.org/SFLOW-STRUCTS5.txt
Expand All @@ -33,6 +34,11 @@ const (
FLOW_TYPE_EXT_MPLS_FEC = 1010
FLOW_TYPE_EXT_MPLS_LVP_FEC = 1011
FLOW_TYPE_EXT_VLAN_TUNNEL = 1012

// According to https://sflow.org/sflow_drops.txt
FLOW_TYPE_EGRESS_QUEUE = 1036
FLOW_TYPE_EXT_ACL = 1037
FLOW_TYPE_EXT_FUNCTION = 1038
)

// Opaque counter_data types according to https://sflow.org/SFLOW-STRUCTS5.txt
Expand Down Expand Up @@ -319,6 +325,24 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
extendedGateway.Communities = communities

flowRecord.Data = extendedGateway
case FLOW_TYPE_EGRESS_QUEUE:
var queue EgressQueue
if err := utils.BinaryDecoder(payload, &queue.Queue); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = queue
case FLOW_TYPE_EXT_ACL:
var acl ExtendedACL
if err := utils.BinaryDecoder(payload, &acl.Number, &acl.Name, &acl.Direction); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = acl
case FLOW_TYPE_EXT_FUNCTION:
var function ExtendedFunction
if err := utils.BinaryDecoder(payload, &function.Symbol); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = function
default:
var rawRecord RawRecord
rawRecord.Data = payload.Bytes()
Expand All @@ -344,10 +368,9 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
if err := utils.BinaryDecoder(payload, &sourceId); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)}
}

header.SourceIdType = sourceId >> 24
header.SourceIdValue = sourceId & 0x00ffffff
case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER:
case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER, SAMPLE_FORMAT_DROP:
// Explicit data-source format
if err := utils.BinaryDecoder(payload,
&header.SourceIdType,
Expand All @@ -363,6 +386,8 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
var flowSample FlowSample
var counterSample CounterSample
var expandedFlowSample ExpandedFlowSample
var dropSample DropSample

switch format {
case SAMPLE_FORMAT_FLOW:
flowSample.Header = *header
Expand Down Expand Up @@ -410,6 +435,23 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
recordsCount = expandedFlowSample.FlowRecordsCount
expandedFlowSample.Records = make([]FlowRecord, recordsCount)
sample = expandedFlowSample
case SAMPLE_FORMAT_DROP:
dropSample.Header = *header
if err := utils.BinaryDecoder(payload,
&dropSample.Drops,
&dropSample.Input,
&dropSample.Output,
&dropSample.Reason,
&dropSample.FlowRecordsCount,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("raw [%w]", err)}
}
recordsCount = dropSample.FlowRecordsCount
if recordsCount > 1000 { // protection against ddos
return sample, &FlowError{format, seq, fmt.Errorf("too many flow records: %d", recordsCount)}
}
dropSample.Records = make([]FlowRecord, recordsCount) // max size of 1000 for protection
sample = dropSample
}
for i := 0; i < int(recordsCount) && payload.Len() >= 8; i++ {
recordHeader := RecordHeader{}
Expand Down Expand Up @@ -442,6 +484,12 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)}
}
expandedFlowSample.Records[i] = record
case SAMPLE_FORMAT_DROP:
record, err := DecodeFlowRecord(&recordHeader, recordReader)
if err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)}
}
dropSample.Records[i] = record
}
}
return sample, nil
Expand Down
62 changes: 62 additions & 0 deletions decoders/sflow/sflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,65 @@ func TestExpandedSFlowDecode(t *testing.T) {
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
}

func TestSFlowDecodeDropEgressQueue(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x2C, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x2a,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
assert.Len(t, packet.Samples, 1)
assert.NotNil(t, packet.Samples[0])
sample, ok := packet.Samples[0].(DropSample)
assert.True(t, ok)
assert.Len(t, sample.Records, 1)
assert.Equal(t, EgressQueue{Queue: 42}, sample.Records[0].Data)
}

func TestSFlowDecodeDropExtendedACL(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x38, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0d, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x2a,
0x00, 0x00, 0x00, 0x04, 0x66, 0x6f, 0x6f, 0x21, 0x00, 0x00, 0x00, 0x02,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
assert.Len(t, packet.Samples, 1)
assert.NotNil(t, packet.Samples[0])
sample, ok := packet.Samples[0].(DropSample)
assert.True(t, ok)
assert.Len(t, sample.Records, 1)
assert.Equal(t, ExtendedACL{Number: 42, Name: "foo!", Direction: 2}, sample.Records[0].Data)
}

func TestSFlowDecodeDropExtendedFunction(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x32, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0e, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x06,
0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72,
}

buf := bytes.NewBuffer(data)
var packet Packet
assert.NoError(t, DecodeMessageVersion(buf, &packet))
assert.Len(t, packet.Samples, 1)
assert.NotNil(t, packet.Samples[0])
sample, ok := packet.Samples[0].(DropSample)
assert.True(t, ok)
assert.Len(t, sample.Records, 1)
assert.Equal(t, ExtendedFunction{Symbol: "foobar"}, sample.Records[0].Data)
}
9 changes: 9 additions & 0 deletions decoders/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ func BinaryRead(payload BytesBuffer, order binary.ByteOrder, data any) error {
*data = int64(order.Uint64(bs))
case *uint64:
*data = order.Uint64(bs)
case *string:
strlen := int(order.Uint32(bs))
buf := payload.Next(strlen)
if len(buf) < strlen {
return io.ErrUnexpectedEOF
}
*data = string(buf)
case []bool:
for i, x := range bs { // Easier to loop over the input for 8-bit values.
data[i] = x != 0
Expand Down Expand Up @@ -121,6 +128,8 @@ func intDataSize(data any) int {
return 2 * len(data)
case int32, uint32, *int32, *uint32:
return 4
case *string: // return the length field
return 4
case []int32:
return 4 * len(data)
case []uint32:
Expand Down

0 comments on commit e171a04

Please sign in to comment.