Skip to content

Commit

Permalink
redefine last record
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <[email protected]>

test

Signed-off-by: Yun-Tang Hsu <[email protected]>

test

Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
Yun-Tang Hsu committed Dec 11, 2023
1 parent e93ece2 commit 8d2f11c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
7 changes: 6 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,13 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) {
func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) {
currTime := time.Now()
var expireTime1, expireTime2 time.Duration
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// For inter-node traffic with egress/ingress np with action drop, we will receive records from PacketIn and the conntrack table.
// If the record from the conntrack table is exported first, then the record will need to do correlation at FA.
// From the egress case, there is no issue as we will receive the records from both nodes.
// But the record won't be sent to the collector as it keeps waiting to do correlation if both records come from the same node.

exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// Select the shorter time out among two connection stores to do the next round of export.
nextExpireTime := getMinTime(expireTime1, expireTime2)
for i := range exp.expiredConns {
Expand Down
54 changes: 45 additions & 9 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"regexp"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -170,6 +171,7 @@ type testFlow struct {
}

func TestFlowAggregatorSecureConnection(t *testing.T) {
t.Skip()
skipIfNotFlowVisibilityTest(t)
skipIfHasWindowsNodes(t)
testCases := []struct {
Expand Down Expand Up @@ -1042,7 +1044,7 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s
flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason"))
var recBandwidth float64
// flowEndReason == 3 means the end of flow detected
if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 {
if flowEndReason == 3 {
// Check average bandwidth on the last record.
octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount")
recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000
Expand Down Expand Up @@ -1110,7 +1112,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP,
exportTime := record.FlowEndSeconds.Unix()
var recBandwidth float64
// flowEndReason == 3 means the end of flow detected
if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 {
if record.FlowEndReason == 3 {
octetTotalCount := record.OctetTotalCount
recBandwidth = float64(octetTotalCount) * 8 / float64(exportTime-flowStartTime) / 1000000
} else {
Expand Down Expand Up @@ -1410,29 +1412,29 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService
var rc int
var err error
// `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed
timeNow := time.Now()
rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace))
if err != nil || rc != 0 {
return false, err
}
t.Logf("Time used for retrieving logs from IPFIX collector Pod is: %v", time.Since(timeNow).String())

// Checking that all the data records which correspond to the iperf flow are received
src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6)
recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter, src, dst, srcPort)
if checkAllRecords {
for _, record := range recordSlices {
flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds"))
exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds"))
flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason"))
// flowEndReason == 3 means the end of flow detected
if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 {
if flowEndReason == 3 {
return true, nil
}
}
return false, nil
}
return len(recordSlices) != 0, nil
})
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v iperf source port: %s", collectorOutput, srcPort)
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector, recordSlices ares: %v, output: %v iperf source port: %s", recordSlices, collectorOutput, srcPort)
return collectorOutput, recordSlices
}

Expand Down Expand Up @@ -1485,10 +1487,8 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str

if checkAllRecords {
for _, record := range flowRecords {
flowStartTime := record.FlowStartSeconds.Unix()
exportTime := record.FlowEndSeconds.Unix()
// flowEndReason == 3 means the end of flow detected
if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 {
if record.FlowEndReason == 3 {
return true, nil
}
}
Expand All @@ -1505,8 +1505,24 @@ func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort s
output = re.ReplaceAllString(output, "")
output = strings.TrimSpace(output)
recordSlices := strings.Split(output, "IPFIX-HDR:")
connectionMap := map[string]int{}
t.Logf("Current number of records in IPFIX collector Pod is: %d", len(recordSlices))
records := []string{}
mostRecord := ""
highestNumber := 0
for _, recordSlice := range recordSlices {
split := strings.Split(recordSlice, "sourcePodLabels")
if len(split) < 2 {
continue
}
split2 := strings.Split(split[1], "I1211")
result := strings.ReplaceAll(split2[0], "\\n", " ")
connectionMap["sourcePodLabels"+result] += 1
if connectionMap["sourcePodLabels"+result] >= highestNumber {
highestNumber = connectionMap["sourcePodLabels"+result]
mostRecord = recordSlice
}

// We don't check the last record.
if strings.Contains(recordSlice, "octetDeltaCount: 0") {
continue
Expand All @@ -1519,6 +1535,26 @@ func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort s
records = append(records, recordSlice)
}
}

keys := make([]string, 0, len(connectionMap))

for key := range connectionMap {
keys = append(keys, key)
}

sort.SliceStable(keys, func(i, j int) bool {
return connectionMap[keys[i]] > connectionMap[keys[j]]
})

t.Logf("Top 3 src+pod Labels:")
for i, k := range keys {
fmt.Println(connectionMap[k], k)
if i == 2 {
break
}
}
t.Logf("Highest count record: %s", mostRecord)

return records
}

Expand Down

0 comments on commit 8d2f11c

Please sign in to comment.