From f5abd8b38357d734f00c1de7a9bdac158db54a4f Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Mon, 4 Dec 2023 16:38:27 -0800 Subject: [PATCH] Improve flow-visibility e2e test In this commit, we do: 1. Changed the order where we append expired records before exporting them from our exporter. For inter-node traffic with egress/ingress np with action drop, we will receive records from PacketIn and the conntrack table. If the record from the conntrack table is exported first, then the record will need to do correlation at FA. From the egress case, there is no issue as we will receive the records from both nodes. But the record won't be sent to the collector as it keeps waiting to do correlation if both records come from the same node. Similar approach is added in https://github.com/vmware/go-ipfix/pull/338 as well. 2. Add check to verify if Flow Exporters can successfully resolve the Flow Aggregator Service address before sending traffic. 3. Add check to verify if Flow Aggregator can successfully connect to the ClickHouse before sending traffic. 4. Add labels to External subtest to filter useless logs from the IPFIX collector Pod. 5. Confirm the correct addition of a label to a specific Pod after updating the Pod. 6. Adjust the flow-visibility end-to-end test by disabling the octetDeltaCount check. This modification is necessary because, when the dual-stack cluster is enabled, the time taken to retrieve logs from the IPFIX collector Pod is significantly longer (around 4 seconds). In the e2e test, we regularly checked the logs every 500 milliseconds to ensure that we didn't receive the last record (where octetDeltaCount is 0). However, due to the delay, the PollImmediately() function doesn't execute every 500 milliseconds. Therefore, we have removed the octetDeltaCount check and, instead, filter out all records with octetDeltaCount=0 when retrieving records from the IPFIX collector Pod. 7. Use new image from go-ipfix PR ( https://github.com/vmware/go-ipfix/pull/338). We improve the IPFIX collector by: a. Disable printing records whenever we receive it. Instead, we store records in a string array. b. Add http listener and handler to receive request to return or reset records. In this way, we can reduce the retrieving log time from ~4s to ~80ms when we have ~1900 records inside it. Signed-off-by: Yun-Tang Hsu --- ci/kind/test-e2e-kind.sh | 2 +- pkg/agent/flowexporter/exporter/exporter.go | 7 +- test/e2e/fixtures.go | 38 +++ test/e2e/flowaggregator_test.go | 288 +++++++++++--------- test/e2e/framework.go | 95 ++++--- 5 files changed, 258 insertions(+), 172 deletions(-) diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 7bf19354a0f..de178d77182 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \ "projects.registry.vmware.com/antrea/toolbox:1.1-0") -FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \ +FLOW_VISIBILITY_IMAGE_LIST=("docker.io/hsuy947/go-ipfix" \ "projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \ "projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \ "projects.registry.vmware.com/antrea/clickhouse-server:23.4") diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index f5a64d7118a..cf62fbd8fea 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -260,8 +260,13 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) { currTime := time.Now() var expireTime1, expireTime2 time.Duration - exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) + // For inter-node traffic with egress/ingress np with action drop, we will receive records from PacketIn and the conntrack table. + // If the record from the conntrack table is exported first, then the record will need to do correlation at FA. + // From the egress case, there is no issue as we will receive the records from both nodes. + // But the record won't be sent to the collector as it keeps waiting to do correlation if both records come from the same node. + exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) + exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) // Select the shorter time out among two connection stores to do the next round of export. nextExpireTime := getMinTime(expireTime1, expireTime2) for i := range exp.expiredConns { diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index 45af883bccd..60afc8041e5 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "log" "net" @@ -25,6 +26,8 @@ import ( "testing" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/component-base/featuregate" "antrea.io/antrea/pkg/agent/config" @@ -300,10 +303,45 @@ func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*Te if err := testData.deployFlowAggregator(ipfixCollectorAddr, o); err != nil { return testData, v4Enabled, v6Enabled, err } + tb.Logf("Checking Flow Exporters can resolve Flow Aggregator Service address correctly") + err = checkResolveAddress(testData) + if err != nil { + return testData, v4Enabled, v6Enabled, err + } return testData, v4Enabled, v6Enabled, nil } +func checkResolveAddress(data *TestData) error { + // This function verifies whether the Flow Aggregator Service address has been successfully resolved in each + // antrea-agent Pod. The purpose of this check is to ensure the immediate sending of records from Flow Exporters, + // preventing potential e2e test failures due to timeout issues resulting from the absence of records in the IPFIX + // Pod and ClickHouse. + pods, err := data.clientset.CoreV1().Pods(antreaNamespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app=antrea,component=antrea-agent", + }) + if err != nil { + return fmt.Errorf("failed to list antrea-agent Pods: %v", err) + } + for i := range pods.Items { + pod := pods.Items[i] + err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { + logs, err := data.GetPodLogs(context.TODO(), antreaNamespace, pod.Name, agentContainerName) + if err != nil { + return false, err + } + if strings.Contains(logs, "Resolved FlowAggregator Service address") { + return true, nil + } + return false, nil + }) + if err != nil { + return fmt.Errorf("error when waiting Flow Exporters to resolve Flow Aggregator Service address: %v", err) + } + } + return nil +} + func exportLogsForSubtest(tb testing.TB, data *TestData) func() { substrings := strings.Split(tb.Name(), "/") subDir := substrings[len(substrings)-1] diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 85abc98d7df..e899b8adc6e 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "net" - "regexp" "strconv" "strings" "testing" @@ -30,6 +29,7 @@ import ( ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/strings/slices" @@ -168,6 +168,10 @@ type testFlow struct { checkDstSvc bool } +type Response struct { + FlowRecords []string `json:"flowRecords"` +} + func TestFlowAggregatorSecureConnection(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) @@ -306,7 +310,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // records from previous subtests. To mitigate this, we add a different label to perftest Pods during each subtest // before initiating traffic. This label is then employed as a filter when collecting records from either the // ClickHouse or the IPFIX collector Pod. - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6, label) }) @@ -316,7 +320,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -353,7 +357,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -390,7 +394,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -428,7 +432,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -470,7 +474,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -511,7 +515,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeFlows", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeFlows" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1)) defer func() { if anp1 != nil { @@ -534,7 +538,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -571,7 +575,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -608,7 +612,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -646,7 +650,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -693,7 +697,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -744,6 +748,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnSourceNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node running the client Pods var egressNodeIP string @@ -759,14 +765,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -784,6 +789,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnOtherNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node not running the client Pods var egressNodeIP string @@ -799,14 +806,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -817,14 +823,15 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() - + label := "ToExternalFlows" + addLabelToTestPods(t, data, label, []string{clientName}) if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "", label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "", label) } } }) @@ -833,7 +840,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("LocalServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "LocalServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -849,7 +856,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("RemoteServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "RemoteServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -987,79 +994,71 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) + // Checking only data records as data records cannot be decoded without template + // record. + assert.GreaterOrEqualf(t, len(recordSlices), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) // Iterate over recordSlices and build some results to test with expected results - dataRecordsCount := 0 - src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) for _, record := range recordSlices { - // Check the source port along with source and destination IPs as there - // are flow records for control flows during the iperf with same IPs - // and destination port. - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - dataRecordsCount = dataRecordsCount + 1 - // Check if record has both Pod name of source and destination Pod. + // Check if record has both Pod name of source and destination Pod. + if isIntraNode { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + } else { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) + } + assert := assert.New(t) + if checkService { if isIntraNode { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") } else { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) - } - assert := assert.New(t) - if checkService { - if isIntraNode { - assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") - } else { - assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") - } - } - if checkK8sNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - } - if checkAntreaNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") } + } + if checkK8sNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + } + if checkAntreaNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + } - // Skip the bandwidth check for the iperf control flow records which have 0 throughput. - if !strings.Contains(record, "throughput: 0") { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) - flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - var recBandwidth float64 - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - // Check average bandwidth on the last record. - octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") - recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 - } else { - // Check bandwidth with the field "throughput" except for the last record, - // as their throughput may be significantly lower than the average Iperf throughput. - throughput := getUint64FieldFromRecord(t, record, "throughput") - recBandwidth = float64(throughput) / 1000000 - } - t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) - assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) + // Skip the bandwidth check for the iperf control flow records which have 0 throughput. + if !strings.Contains(record, "throughput: 0") { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) + flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) + var recBandwidth float64 + // flowEndReason == 3 means the end of flow detected + if flowEndReason == 3 { + // Check average bandwidth on the last record. + octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") + recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 + } else { + // Check bandwidth with the field "throughput" except for the last record, + // as their throughput may be significantly lower than the average Iperf throughput. + throughput := getUint64FieldFromRecord(t, record, "throughput") + recBandwidth = float64(throughput) / 1000000 } + t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) + assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) } } - // Checking only data records as data records cannot be decoded without template - // record. - assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { @@ -1114,7 +1113,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, exportTime := record.FlowEndSeconds.Unix() var recBandwidth float64 // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + if record.FlowEndReason == 3 { octetTotalCount := record.OctetTotalCount recBandwidth = float64(octetTotalCount) * 8 / float64(exportTime-flowStartTime) / 1000000 } else { @@ -1132,7 +1131,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, assert.GreaterOrEqualf(t, len(clickHouseRecords), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %s", clickHouseRecords) } -func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP string) { +func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP, labelFilter string) { var cmd string if !isIPv6 { cmd = fmt.Sprintf("wget -O- %s:%d", dstIP, dstPort) @@ -1141,24 +1140,19 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, "") + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter) for _, record := range recordSlices { - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { - checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) - assert.NotContains(t, record, "octetDeltaCount: 0", "octetDeltaCount should be non-zero") - if egressName != "" { - checkEgressInfo(t, record, egressName, egressIP) - } + checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) + if egressName != "" { + checkEgressInfo(t, record, egressName, egressIP) } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, labelFilter) for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) - assert.Greater(t, record.OctetDeltaCount, uint64(0), "octetDeltaCount should be non-zero") if egressName != "" { checkEgressInfoClickHouse(t, record, egressName, egressIP) } @@ -1418,31 +1412,40 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService err := wait.PollImmediate(500*time.Millisecond, exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout*2, func() (bool, error) { var rc int var err error + var cmd string + ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testData.testNamespace) + if err != nil || len(ipfixCollectorIP.IPStrings) == 0 { + require.NoErrorf(t, err, "Should be able to get IP from IPFIX collector Pod") + } + if !isIPv6 { + cmd = fmt.Sprintf("curl http://%s:8080/records", ipfixCollectorIP.IPv4.String()) + } else { + cmd = fmt.Sprintf("curl http://[%s]:8080/records", ipfixCollectorIP.IPv6.String()) + } // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) + timeNow := time.Now() + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), cmd) if err != nil || rc != 0 { return false, err } + t.Logf("Time used for retrieving logs from IPFIX collector Pod is: %v", time.Since(timeNow).String()) + // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) + recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter, src, dst, srcPort) if checkAllRecords { for _, record := range recordSlices { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - return true, nil - } + // flowEndReason == 3 means the end of flow detected + if flowEndReason == 3 { + return true, nil } } return false, nil } - return strings.Contains(collectorOutput, src) && strings.Contains(collectorOutput, dst) && strings.Contains(collectorOutput, srcPort), nil + return len(recordSlices) != 0, nil }) - require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v iperf source port: %s", collectorOutput, srcPort) + require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector, recordSlices ares: %v, output: %v iperf source port: %s", recordSlices, collectorOutput, srcPort) return collectorOutput, recordSlices } @@ -1454,9 +1457,9 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) @@ -1473,10 +1476,12 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str // ClickHouse output expected to be checked after IPFIX collector. // Waiting additional 4x commit interval to be adequate for 3 commit attempts. err := wait.PollImmediate(500*time.Millisecond, aggregatorClickHouseCommitInterval*4, func() (bool, error) { + timeNow := time.Now() queryOutput, _, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", cmd) if err != nil { return false, err } + t.Logf("Time used for retrieving logs from ClickHouse is: %v", time.Since(timeNow).String()) rows := strings.Split(queryOutput, "\n") flowRecords = make([]*ClickHouseFullRow, 0, len(rows)) @@ -1495,10 +1500,8 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str if checkAllRecords { for _, record := range flowRecords { - flowStartTime := record.FlowStartSeconds.Unix() - exportTime := record.FlowEndSeconds.Unix() // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + if record.FlowEndReason == 3 { return true, nil } } @@ -1510,20 +1513,29 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(t *testing.T, output, labelFilter string) []string { - re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") - output = re.ReplaceAllString(output, "") - output = strings.TrimSpace(output) - recordSlices := strings.Split(output, "IPFIX-HDR:") - if labelFilter == "" { - return recordSlices +func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort string) []string { + var response Response + err := json.Unmarshal([]byte(output), &response) + if err != nil { + require.NoErrorf(t, err, "error when unmarshall output from IPFIX collector Pod") } + recordSlices := response.FlowRecords + t.Logf("Current number of records in IPFIX collector Pod is: %d", len(recordSlices)) records := []string{} for _, recordSlice := range recordSlices { - if strings.Contains(recordSlice, labelFilter) { + // We don't check the last record. + if strings.Contains(recordSlice, "octetDeltaCount: 0") { + continue + } + // We don't check the record that can't match the srcIP, dstIP and srcPort. + if !strings.Contains(recordSlice, src) || !strings.Contains(recordSlice, dst) || !strings.Contains(recordSlice, srcPort) { + continue + } + if labelFilter == "" || strings.Contains(recordSlice, labelFilter) { records = append(records, recordSlice) } } + t.Logf("Number of matched records in IPFIX collector Pod is: %d, src: %s, dst: %s, srcPort: %s", len(records), src, dst, srcPort) return records } @@ -1753,14 +1765,24 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } -func addLabelToPerftestPods(t *testing.T, data *TestData, label string) { - perftestPods, err := data.clientset.CoreV1().Pods(data.testNamespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app=iperf"}) - require.NoError(t, err, "Error when getting perftest Pods") - for i := range perftestPods.Items { - pod := &perftestPods.Items[i] - pod.Labels["targetLabel"] = label - _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) - require.NoErrorf(t, err, "Error when adding label to %s", pod.Name) +func addLabelToTestPods(t *testing.T, data *TestData, label string, podNames []string) { + for _, podName := range podNames { + testPod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + require.NoErrorf(t, err, "Error when getting Pod %s in %s", testPod, data.testNamespace) + testPod.Labels["targetLabel"] = label + _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), testPod, metav1.UpdateOptions{}) + require.NoErrorf(t, err, "Error when adding label to %s", testPod.Name) + err = wait.Poll(defaultInterval, timeout, func() (bool, error) { + pod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), testPod.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error when getting Pod '%s': %v", pod.Name, err) + } + return pod.Labels["targetLabel"] == label, nil + }) + require.NoErrorf(t, err, "Error when verifying the label on %s", testPod.Name) } } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 7973fb7f9d2..1d30042cbe8 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -75,42 +75,44 @@ const ( defaultInterval = 1 * time.Second // antreaNamespace is the K8s Namespace in which all Antrea resources are running. - antreaNamespace = "kube-system" - kubeNamespace = "kube-system" - flowAggregatorNamespace = "flow-aggregator" - antreaConfigVolume = "antrea-config" - antreaWindowsConfigVolume = "antrea-windows-config" - flowAggregatorConfigVolume = "flow-aggregator-config" - antreaDaemonSet = "antrea-agent" - antreaWindowsDaemonSet = "antrea-agent-windows" - antreaDeployment = "antrea-controller" - flowAggregatorDeployment = "flow-aggregator" - flowAggregatorCHSecret = "clickhouse-ca" - antreaDefaultGW = "antrea-gw0" - testAntreaIPAMNamespace = "antrea-ipam-test" - testAntreaIPAMNamespace11 = "antrea-ipam-test-11" - testAntreaIPAMNamespace12 = "antrea-ipam-test-12" - busyboxContainerName = "busybox" - mcjoinContainerName = "mcjoin" - agnhostContainerName = "agnhost" - toolboxContainerName = "toolbox" - nginxContainerName = "nginx" - controllerContainerName = "antrea-controller" - ovsContainerName = "antrea-ovs" - agentContainerName = "antrea-agent" - antreaYML = "antrea.yml" - antreaIPSecYML = "antrea-ipsec.yml" - antreaCovYML = "antrea-coverage.yml" - antreaIPSecCovYML = "antrea-ipsec-coverage.yml" - flowAggregatorYML = "flow-aggregator.yml" - flowAggregatorCovYML = "flow-aggregator-coverage.yml" - flowVisibilityYML = "flow-visibility.yml" - flowVisibilityTLSYML = "flow-visibility-tls.yml" - chOperatorYML = "clickhouse-operator-install-bundle.yml" - flowVisibilityCHPodName = "chi-clickhouse-clickhouse-0-0-0" - flowVisibilityNamespace = "flow-visibility" - defaultBridgeName = "br-int" - monitoringNamespace = "monitoring" + antreaNamespace = "kube-system" + kubeNamespace = "kube-system" + flowAggregatorNamespace = "flow-aggregator" + antreaConfigVolume = "antrea-config" + antreaWindowsConfigVolume = "antrea-windows-config" + flowAggregatorConfigVolume = "flow-aggregator-config" + antreaDaemonSet = "antrea-agent" + antreaWindowsDaemonSet = "antrea-agent-windows" + antreaDeployment = "antrea-controller" + flowAggregatorDeployment = "flow-aggregator" + flowAggregatorCHSecret = "clickhouse-ca" + antreaDefaultGW = "antrea-gw0" + testAntreaIPAMNamespace = "antrea-ipam-test" + testAntreaIPAMNamespace11 = "antrea-ipam-test-11" + testAntreaIPAMNamespace12 = "antrea-ipam-test-12" + busyboxContainerName = "busybox" + mcjoinContainerName = "mcjoin" + agnhostContainerName = "agnhost" + toolboxContainerName = "toolbox" + nginxContainerName = "nginx" + controllerContainerName = "antrea-controller" + ovsContainerName = "antrea-ovs" + agentContainerName = "antrea-agent" + flowAggregatorContainerName = "flow-aggregator" + + antreaYML = "antrea.yml" + antreaIPSecYML = "antrea-ipsec.yml" + antreaCovYML = "antrea-coverage.yml" + antreaIPSecCovYML = "antrea-ipsec-coverage.yml" + flowAggregatorYML = "flow-aggregator.yml" + flowAggregatorCovYML = "flow-aggregator-coverage.yml" + flowVisibilityYML = "flow-visibility.yml" + flowVisibilityTLSYML = "flow-visibility-tls.yml" + chOperatorYML = "clickhouse-operator-install-bundle.yml" + flowVisibilityCHPodName = "chi-clickhouse-clickhouse-0-0-0" + flowVisibilityNamespace = "flow-visibility" + defaultBridgeName = "br-int" + monitoringNamespace = "monitoring" antreaControllerCovBinary = "antrea-controller-coverage" antreaAgentCovBinary = "antrea-agent-coverage" @@ -132,7 +134,7 @@ const ( nginxImage = "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" toolboxImage = "projects.registry.vmware.com/antrea/toolbox:1.2-1" - ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" + ipfixCollectorImage = "docker.io/hsuy947/go-ipfix" ipfixCollectorPort = "4739" clickHouseHTTPPort = "8123" @@ -938,6 +940,25 @@ func (data *TestData) deployFlowAggregator(ipfixCollector string, o flowVisibili if err = data.podWaitForReady(2*defaultTimeout, flowAggPod.Name, flowAggregatorNamespace); err != nil { return err } + var errLog error + // Check log of flow-aggregator pod to make sure ClickHouse is enabled and connected to flow-aggregator + if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { + logString, err := data.GetPodLogs(context.TODO(), flowAggregatorNamespace, flowAggPod.Name, flowAggregatorContainerName) + if err != nil { + errLog = fmt.Errorf("error when getting Flow Aggregaotr logs: %v", err) + return false, nil + } + if strings.Contains(logString, "error when creating ClickHouse export process") { + errLog = fmt.Errorf("error when creating ClickHouse export process") + return false, nil + } + if strings.Contains(logString, "Starting ClickHouse exporting process") { + return true, nil + } + return false, nil + }); err != nil { + return fmt.Errorf("error when checking log for Flow Aggregator: %v", errLog) + } return nil }