Skip to content

Commit

Permalink
Addressed new comments
Browse files Browse the repository at this point in the history
Signed-off-by: Tushar Tathgur <[email protected]>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Jan 17, 2024
1 parent fe52bb5 commit 1f4f8d1
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 34 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ func run(o *Options) error {
networkPolicyController,
flowExporterOptions,
egressController,
l7FlowExporterController)
l7FlowExporterController,
l7FlowExporterEnabled)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,4 +668,4 @@ HTTP fields in the `httpVals` are:

As of now, the only supported layer 7 protocol is `HTTP1.1`. Support for more
protocols may be added in the future. Antrea supports L7FlowExporter feature only
on Linux Nodes. Windows Nodes are not supported yet.
on Linux Nodes.
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) {
// TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export.
func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
klog.V(2).Infof("Polling conntrack")
// DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events and avoid missing the L7 events for corresponding L4 connection
// DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events
// and avoid missing the L7 events for corresponding L4 connection
l7EventMap := cs.l7EventMapGetter.ConsumeL7EventMap()

var zones []uint16
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/l7_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ func (l *L7Listener) listenAndAcceptConn() {
// Remove stale connections
if err := os.Remove(l.suricataEventSocketPath); err != nil && !os.IsNotExist(err) {
klog.V(2).ErrorS(err, "failed to remove stale socket")
return
}
if err := os.MkdirAll(filepath.Dir(l.suricataEventSocketPath), 0750); err != nil {
klog.ErrorS(err, "Failed to create directory %s", filepath.Dir(l.suricataEventSocketPath))
return
}
listener, err := net.Listen("unix", l.suricataEventSocketPath)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter
func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet,
ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions,
egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter) (*FlowExporter, error) {
egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter, l7FlowExporterEnabled bool) (*FlowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
Expand All @@ -172,7 +172,10 @@ func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClie
return nil, err
}
expInput := prepareExporterInputArgs(o.FlowCollectorProto, nodeName)
l7Listener := connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore)
var l7Listener *connections.L7Listener
if l7FlowExporterEnabled {
l7Listener = connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore)
}

connTrackDumper := connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, proxyEnabled)
denyConnStore := connections.NewDenyConnectionStore(podStore, proxier, o)
Expand Down
65 changes: 36 additions & 29 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,18 @@ func TestFlowAggregator(t *testing.T) {

if v4Enabled {
t.Run("IPv4", func(t *testing.T) { testHelper(t, data, false) })
t.Run("L7FlowExporterController", func(t *testing.T) {
testL7FlowExporterController(t, data, false)
})
}

if v6Enabled {
t.Run("IPv6", func(t *testing.T) { testHelper(t, data, true) })
t.Run("L7FlowExporterController", func(t *testing.T) {
testL7FlowExporterController(t, data, true)
})
}

t.Run("L7FlowExporterController", func(t *testing.T) {
testL7FlowExporterControllerRun(t, data)
})

}

func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) {
Expand Down Expand Up @@ -1877,37 +1879,42 @@ func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error {
return nil
}

func testL7FlowExporterControllerRun(t *testing.T, data *TestData) {
func testL7FlowExporterController(t *testing.T, data *TestData, isIPv6 bool) {
skipIfFeatureDisabled(t, features.L7FlowExporter, true, false)

clientPodName := "test-l7-flow-exporter"
clientPodLabels := map[string]string{"test-l7-flow-exporter-e2e": "true"}
clientPodAnnotations := map[string]string{antreaagenttypes.L7FlowExporterAnnotationKey: "both"}
cmd := []string{"sleep", "3600"}

// Create a client Pod which will be selected by test L7 NetworkPolices.
require.NoError(t, NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(nodeName(0)).WithCommand(cmd).WithLabels(clientPodLabels).WithAnnotations(clientPodAnnotations).Create(data))
clientPodIP, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace)
require.NoErrorf(t, err, "Error when waiting for IP for Pod '%s': %v", clientPodName, err)
require.NoError(t, data.podWaitForRunning(defaultTimeout, clientPodName, data.testNamespace))

serverIPs := createToExternalTestServer(t, data)
srcIP := clientPodIP.IPv4.String()
dstIP := serverIPs.IPv4.String()
err := data.UpdatePod(data.testNamespace, "perftest-a", func(pod *corev1.Pod) {
pod.Annotations = map[string]string{antreaagenttypes.L7FlowExporterAnnotationKey: "both"}
})
require.NoErrorf(t, err, "error when updated pod annotations")

// checkRecordsForToExternalFlows(t, data, nodeName(0), clientPodName, clientPodIP.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, "", "")
cmd = []string{
"curl",
fmt.Sprintf("http://%s:%d", serverIPs.IPv4.String(), serverPodPort),
testFlow1 := testFlow{
srcPodName: "perftest-a",
dstPodName: "perftest-b",
}
var cmd []string
if !isIPv6 {
testFlow1.srcIP = podAIPs.IPv4.String()
testFlow1.dstIP = podBIPs.IPv4.String()
cmd = []string{
"curl",
fmt.Sprintf("http://%s:%d", podBIPs.IPv4.String(), serverPodPort),
}
} else {
testFlow1.srcIP = podAIPs.IPv6.String()
testFlow1.dstIP = podBIPs.IPv6.String()
cmd = []string{
"curl",
fmt.Sprintf("http://%s:%d", podBIPs.IPv6.String(), serverPodPort),
}
}
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientPodName, toolboxContainerName, cmd)
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", toolboxContainerName, cmd)
require.NoErrorf(t, err, "Error when running curl command, stdout: %s, stderr: %s", stdout, stderr)
_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, false, data, "")
_, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, "")
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
if strings.Contains(record, testFlow1.srcIP) && strings.Contains(record, testFlow1.dstIP) {
// checkPodAndNodeData(t, record, clientPodName, nodeName(0), "", "", data.testNamespace)
assert := assert.New(t)
assert.Contains(record, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName)
assert.Contains(record, "perftest-a", "Record with srcIP does not have Pod name: perftest-a")
assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace)
assert.Contains(record, fmt.Sprintf("sourceNodeName: %s", nodeName(0)), "Record does not have correct sourceNodeName: %s", nodeName(0))
assert.Contains(record, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod")
Expand All @@ -1917,10 +1924,10 @@ func testL7FlowExporterControllerRun(t *testing.T, data *TestData) {
}
}

clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "")
clickHouseRecords := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, "")
for _, record := range clickHouseRecords {
assert := assert.New(t)
assert.Equal(record.SourcePodName, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName)
assert.Equal(record.SourcePodName, "perftest-a", "Record with srcIP does not have Pod name: perftest-a")
assert.Equal(record.SourcePodNamespace, data.testNamespace, "Record does not have correct sourcePodNamespace: %s", data.testNamespace)
assert.Equal(record.SourceNodeName, nodeName(0), "Record does not have correct sourceNodeName: %s", nodeName(0))
assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod")
Expand Down

0 comments on commit 1f4f8d1

Please sign in to comment.