Skip to content

Commit

Permalink
Improve flow-visibility e2e test
Browse files Browse the repository at this point in the history
In this commit, we do:

1. Changed the order where we append expired records before exporting them from our exporter.
For inter-node traffic with egress/ingress np with action drop, we will receive records from PacketIn and the conntrack table. If the record from the conntrack table is exported first, then the record will need to do correlation at FA. From the egress case, there is no issue as we will receive the records from both nodes. But the record won't be sent to the collector as it keeps waiting to do correlation if both records come from the same node.
Similar approach is added in vmware/go-ipfix#338 as well.

2. Add check to verify if Flow Exporters can successfully resolve the Flow Aggregator Service address before sending traffic.

3. Add check to verify if Flow Aggregator can successfully connect to the ClickHouse before sending traffic.

4. Add labels to External subtest to filter useless logs from the IPFIX collector Pod.

5. Confirm the correct addition of a label to a specific Pod after updating the Pod.

6. Adjust the flow-visibility end-to-end test by disabling the octetDeltaCount check. This modification is necessary because, when the dual-stack cluster is enabled, the time taken to retrieve logs from the IPFIX collector Pod is significantly longer (around 4 seconds). In the e2e test, we regularly checked the logs every 500 milliseconds to ensure that we didn't receive the last record (where octetDeltaCount is 0). However, due to the delay, the PollImmediately() function doesn't execute every 500 milliseconds. Therefore, we have removed the octetDeltaCount check and, instead, filter out all records with octetDeltaCount=0 when retrieving records from the IPFIX collector Pod.

7. Use new image from go-ipfix PR ( vmware/go-ipfix#338).  We improve the IPFIX collector by:
    a.  Disable printing records whenever we receive it. Instead, we store records in a string array.
    b. Add http listener and handler to receive request to return or reset records.
    In this way, we can reduce the retrieving log time from ~4s to ~80ms when we have ~1900 records
    inside it.

Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
Yun-Tang Hsu committed Dec 13, 2023
1 parent 2a38ef5 commit 4d4b87a
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 172 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \
"projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \
"projects.registry.vmware.com/antrea/toolbox:1.1-0")

FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \
FLOW_VISIBILITY_IMAGE_LIST=("docker.io/hsuy947/go-ipfix" \
"projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \
"projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \
"projects.registry.vmware.com/antrea/clickhouse-server:23.4")
Expand Down
7 changes: 6 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,13 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) {
func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) {
currTime := time.Now()
var expireTime1, expireTime2 time.Duration
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// For inter-node traffic with egress/ingress np with action drop, we will receive records from PacketIn and the conntrack table.
// If the record from the conntrack table is exported first, then the record will need to do correlation at FA.
// From the egress case, there is no issue as we will receive the records from both nodes.
// But the record won't be sent to the collector as it keeps waiting to do correlation if both records come from the same node.

exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// Select the shorter time out among two connection stores to do the next round of export.
nextExpireTime := getMinTime(expireTime1, expireTime2)
for i := range exp.expiredConns {
Expand Down
38 changes: 38 additions & 0 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"fmt"
"log"
"net"
Expand All @@ -25,6 +26,8 @@ import (
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/component-base/featuregate"

"antrea.io/antrea/pkg/agent/config"
Expand Down Expand Up @@ -300,10 +303,45 @@ func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*Te
if err := testData.deployFlowAggregator(ipfixCollectorAddr, o); err != nil {
return testData, v4Enabled, v6Enabled, err
}
tb.Logf("Checking Flow Exporters can resolve Flow Aggregator Service address correctly")
err = checkResolveAddress(testData)
if err != nil {
return testData, v4Enabled, v6Enabled, err
}

return testData, v4Enabled, v6Enabled, nil
}

func checkResolveAddress(data *TestData) error {
// This function verifies whether the Flow Aggregator Service address has been successfully resolved in each
// antrea-agent Pod. The purpose of this check is to ensure the immediate sending of records from Flow Exporters,
// preventing potential e2e test failures due to timeout issues resulting from the absence of records in the IPFIX
// Pod and ClickHouse.
pods, err := data.clientset.CoreV1().Pods(antreaNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: "app=antrea,component=antrea-agent",
})
if err != nil {
return fmt.Errorf("failed to list antrea-agent Pods: %v", err)
}
for i := range pods.Items {
pod := pods.Items[i]
err := wait.Poll(defaultInterval, 2*defaultTimeout, func() (bool, error) {
logs, err := data.GetPodLogs(context.TODO(), antreaNamespace, pod.Name, agentContainerName)
if err != nil {
return false, err
}
if strings.Contains(logs, "Resolved FlowAggregator Service address") {
return true, nil
}
return false, nil
})
if err != nil {
return fmt.Errorf("error when waiting Flow Exporters to resolve Flow Aggregator Service address: %v", err)
}
}
return nil
}

func exportLogsForSubtest(tb testing.TB, data *TestData) func() {
substrings := strings.Split(tb.Name(), "/")
subDir := substrings[len(substrings)-1]
Expand Down
Loading

0 comments on commit 4d4b87a

Please sign in to comment.