diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index bc435e0ac63..3d35951dbb4 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -73,6 +73,9 @@ featureGates: # into account application context. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "L7NetworkPolicy" "default" false) }} +# Enable L7Visibility on Pods and Namespace. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "L7Visibility" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 7db11aebb8e..aaf95e3725a 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -173,7 +173,6 @@ rules: resources: - externalippools - ippools - - trafficcontrols verbs: - get - watch @@ -219,3 +218,15 @@ rules: - get - list - watch + - apiGroups: + - crd.antrea.io + resources: + - trafficcontrols + verbs: + - get + - watch + - list + - update + - patch + - create + - delete diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index 42dff202fa5..de59aec510e 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -306,7 +306,7 @@ controller: flowExporter: # -- Enable the flow exporter feature. - enable: false + enable: true # -- IPFIX collector address as a string with format :[][:]. # If the collector is running in-cluster as a Service, set to # /. diff --git a/build/charts/flow-aggregator/values.yaml b/build/charts/flow-aggregator/values.yaml index e88b9ea321a..e7f4e2774e7 100644 --- a/build/charts/flow-aggregator/values.yaml +++ b/build/charts/flow-aggregator/values.yaml @@ -44,7 +44,7 @@ flowCollector: # clickHouse contains ClickHouse related configuration options. clickHouse: # -- Determine whether to enable exporting flow records to ClickHouse. - enable: false + enable: true # -- DatabaseURL is the url to the database. TCP protocol is required. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" # -- Debug enables debug logs from ClickHouse sql driver. diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 7546dcb717b..f718b19f12a 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3282,7 +3282,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index f0915be56d9..0ee46158985 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3282,7 +3282,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 2e99b1d125b..5d4b9fbcf7e 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3282,7 +3282,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index b16a69409e0..d90b309841c 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3295,7 +3295,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 9d8a9da5a66..55903b27af5 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3282,7 +3282,7 @@ data: # IPFIX flow records from each agent to a configured collector. To enable this # feature, you need to set "enable" to true, and ensure that the FlowExporter # feature gate is also enabled. - enable: false + enable: true # Provide the IPFIX collector address as a string with format :[][:]. # HOST can either be the DNS name, IP, or Service name of the Flow Collector. If # using an IP, it can be either IPv4 or IPv6. However, IPv6 address should be diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index 09d1e690ed5..545a3b89786 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -219,7 +219,7 @@ data: # clickHouse contains ClickHouse related configuration options. clickHouse: # Enable is the switch to enable exporting flow records to ClickHouse. - enable: false + enable: true # Database is the name of database where Antrea "flows" table is created. database: "default" diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 7644124e03d..e9472ef1497 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -39,6 +39,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/egress" "antrea.io/antrea/pkg/agent/controller/ipseccertificate" "antrea.io/antrea/pkg/agent/controller/networkpolicy" + "antrea.io/antrea/pkg/agent/controller/networkpolicy/l7engine" "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" @@ -690,6 +691,7 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.TrafficControl) { tcController := trafficcontrol.NewTrafficControlController(ofClient, + crdClient, ifaceStore, ovsBridgeClient, ovsCtlClient, @@ -698,6 +700,9 @@ func run(o *Options) error { namespaceInformer, podUpdateChannel) go tcController.Run(stopCh) + if features.DefaultFeatureGate.Enabled(features.L7Visibility) { + go l7engine.Run(tcController) + } } // Start the localPodInformer diff --git a/go.mod b/go.mod index 1b436ddc0c3..2f585e4ab6f 100644 --- a/go.mod +++ b/go.mod @@ -233,3 +233,5 @@ require ( sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) + +replace github.com/vmware/go-ipfix v0.6.1 => github.com/tushartathgur/go-ipfix v0.0.0-20230714214305-c3c7a13353f8 diff --git a/go.sum b/go.sum index f57d040713a..f9ba14f4b13 100644 --- a/go.sum +++ b/go.sum @@ -1083,6 +1083,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA= +github.com/tushartathgur/go-ipfix v0.0.0-20230714214305-c3c7a13353f8 h1:5f0hm5NzcSY4tKiKInkCGLnps+DDWEVK5qV9wYnE9/g= +github.com/tushartathgur/go-ipfix v0.0.0-20230714214305-c3c7a13353f8/go.mod h1:dGCppoeqknr9o3yz9BD74mP/FPHgefb6v34xdUKxDPI= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go index b2e9b012ee5..f29929e8fb8 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go @@ -228,6 +228,7 @@ func (r *Reconciler) AddRule(ruleID, policyName string, vlanID uint32, l7Protoco // Add a Suricata tenant. if err := r.addBindingSuricataTenant(vlanID, rulesPath); err != nil { return fmt.Errorf("failed to add Suricata tenant for L7 rule %s of %s: %w", ruleID, policyName, err) + } return nil @@ -386,7 +387,7 @@ func (r *Reconciler) unregisterSuricataTenantHandler(tenantID, vlanID uint32) (* return r.suricataScFn(scCmd) } -func (r *Reconciler) startSuricata() { +func (r *Reconciler) startSuricata() { // Create the config file /etc/suricata/antrea.yaml for Antrea which will be included in the default Suricata config file // /etc/suricata/suricata.yaml. suricataAntreaConfigData := fmt.Sprintf(`%%YAML 1.1 @@ -405,7 +406,28 @@ outputs: types: - alert: tagged-packets: yes + - eve-log: + enabled: yes + filetype: unix_stream + filename: /var/log/antrea/networkpolicy/suricata_eve.socket + rotate-interval: day + pcap-file: false + community-id: false + community-id-seed: 0 + xff: + enabled: no + types: + - http: + extended: yes af-packet: + - interface: eth0 + threads: auto + cluster-id: 93 + cluster-type: cluster_flow + defrag: no + use-mmap: yes + tpacket-v2: yes + checksum-checks: no - interface: %[1]s threads: auto cluster-id: 80 diff --git a/pkg/agent/controller/trafficcontrol/controller.go b/pkg/agent/controller/trafficcontrol/controller.go index 1c0d8720a77..328cf8fbe2e 100644 --- a/pkg/agent/controller/trafficcontrol/controller.go +++ b/pkg/agent/controller/trafficcontrol/controller.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/apis/crd/v1alpha2" + clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" "antrea.io/antrea/pkg/ovs/ovsconfig" @@ -112,7 +113,8 @@ type portToTCBinding struct { } type Controller struct { - ofClient openflow.Client + ofClient openflow.Client + CrdClient clientsetversioned.Interface portToTCBindings map[string]*portToTCBinding ovsBridgeClient ovsconfig.OVSBridgeClient @@ -142,6 +144,7 @@ type Controller struct { } func NewTrafficControlController(ofClient openflow.Client, + crdClient clientsetversioned.Interface, interfaceStore interfacestore.InterfaceStore, ovsBridgeClient ovsconfig.OVSBridgeClient, ovsCtlClient ovsctl.OVSCtlClient, @@ -151,6 +154,7 @@ func NewTrafficControlController(ofClient openflow.Client, podUpdateSubscriber channel.Subscriber) *Controller { c := &Controller{ ofClient: ofClient, + CrdClient: crdClient, ovsBridgeClient: ovsBridgeClient, ovsCtlClient: ovsCtlClient, interfaceStore: interfaceStore, diff --git a/pkg/agent/flowexporter/connections/conntrack_ovs.go b/pkg/agent/flowexporter/connections/conntrack_ovs.go index 52e0f89bfba..e4f4b67c8d3 100644 --- a/pkg/agent/flowexporter/connections/conntrack_ovs.go +++ b/pkg/agent/flowexporter/connections/conntrack_ovs.go @@ -32,13 +32,6 @@ import ( // Following map is for converting protocol name (string) to protocol identifier var ( - protocols = map[string]uint8{ - "icmp": 1, - "igmp": 2, - "tcp": 6, - "udp": 17, - "ipv6-icmp": 58, - } // Mapping is defined at https://github.com/torvalds/linux/blob/v5.9/include/uapi/linux/netfilter/nf_conntrack_common.h#L42 conntrackStatusMap = map[string]uint32{ "EXPECTED": uint32(1), @@ -140,7 +133,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter switch { case hasAnyProto(fs): // Proto identifier - proto, err := lookupProtocolMap(fs) + proto, err := flowexporter.LookupProtocolMap(fs) if err != nil { return nil, err } @@ -287,7 +280,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter } func hasAnyProto(text string) bool { - for proto := range protocols { + for proto := range flowexporter.Protocols { if strings.Contains(strings.ToLower(text), proto) { return true } @@ -295,17 +288,6 @@ func hasAnyProto(text string) bool { return false } -// lookupProtocolMap returns protocol identifier given protocol name -func lookupProtocolMap(name string) (uint8, error) { - name = strings.TrimSpace(name) - lowerCaseStr := strings.ToLower(name) - proto, found := protocols[lowerCaseStr] - if !found { - return 0, fmt.Errorf("unknown IP protocol specified: %s", name) - } - return proto, nil -} - func (ct *connTrackOvsCtl) GetMaxConnections() (int, error) { cmdOutput, execErr := ct.ovsctlClient.RunAppctlCmd("dpctl/ct-get-maxconns", false) if execErr != nil { diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 622165fdb7c..00d97ae6188 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -15,10 +15,15 @@ package exporter import ( + "bufio" "context" + "encoding/json" "fmt" "hash/fnv" "net" + "os" + "strconv" + "strings" "time" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" @@ -37,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" + "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ipfix" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/querier" @@ -54,7 +60,11 @@ import ( // can be taking a fraction of the size of connection store to approximate the // number of expired connections, while having a min and a max to handle edge cases, // e.g. min(50 + 0.1 * connectionStore.size(), 200) -const maxConnsToExport = 64 +const ( + maxConnsToExport = 64 + socketPath = "/var/log/antrea/networkpolicy/suricata_eve.socket" + // socketPath = "suricata_eve.socket" +) var ( IANAInfoElementsCommon = []string{ @@ -101,6 +111,8 @@ var ( "flowType", "egressName", "egressIP", + "isL7", + "httpVals", } AntreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) @@ -129,6 +141,39 @@ type FlowExporter struct { denyPriorityQueue *priorityqueue.ExpirePriorityQueue expiredConns []flowexporter.Connection egressQuerier querier.EgressQuerier + l7events map[flowexporter.ConnectionKey]*httpEvent +} + +// Define struct to hold the L7 flow JSON values +type Http struct { + Hostname string `json:"hostname"` + URL string `json:"url"` + UserAgent string `json:"http_user_agent"` + ContentType string `json:"http_content_type"` + Method string `json:"http_method"` + Protocol string `json:"protocol"` + Status int `json:"status"` + ContentLength int `json:"length"` +} + +type JsonToEvent struct { + Timestamp string `json:"timestamp"` + FlowID int64 `json:"flow_id"` + InInterface string `json:"in_iface"` + EventType string `json:"event_type"` + VLAN []int `json:"vlan"` + SrcIP net.IP `json:"src_ip"` + SrcPort int `json:"src_port"` + DestIP net.IP `json:"dest_ip"` + DestPort int `json:"dest_port"` + Proto string `json:"proto"` + TxID int `json:"tx_id"` + HTTP Http `json:"http"` +} + +type httpEvent struct { + http [][]string + httpQueryFlag bool } func genObservationID(nodeName string) uint32 { @@ -189,6 +234,7 @@ func NewFlowExporter(ifaceStore interfacestore.InterfaceStore, proxier proxy.Pro denyPriorityQueue: denyConnStore.GetPriorityQueue(), expiredConns: make([]flowexporter.Connection, 0, maxConnsToExport*2), egressQuerier: egressQuerier, + l7events: make(map[flowexporter.ConnectionKey]*httpEvent), }, nil } @@ -203,6 +249,11 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { // Start the goroutine to poll conntrack flows. go exp.conntrackConnStore.Run(stopCh) + // Start L7 connection flow socket + if features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy) { + go exp.l7Listener() + } + defaultTimeout := exp.conntrackPriorityQueue.ActiveFlowTimeout expireTimer := time.NewTimer(defaultTimeout) for { @@ -418,6 +469,7 @@ func (exp *FlowExporter) sendTemplateSet(isIPv6 bool) (int, error) { func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { exp.ipfixSet.ResetSet() + connkey := flowexporter.NewConnectionKey(conn) eL := exp.elementsListv4 templateID := exp.templateIDv4 @@ -564,6 +616,10 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { ie.SetStringValue(conn.EgressName) case "egressIP": ie.SetStringValue(conn.EgressIP) + case "isL7": + ie.SetStringValue(exp.getL7EventData(connkey, "httpL7")) + case "httpVals": + ie.SetStringValue(exp.getL7EventData(connkey, "httpVals")) } } err := exp.ipfixSet.AddRecord(eL, templateID) @@ -648,3 +704,135 @@ func getMinTime(t1, t2 time.Duration) time.Duration { } return t2 } + +func (exp *FlowExporter) l7Listener() { + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + klog.ErrorS(err, "Error removing Suricata socket") + } + listener, err := net.Listen("unix", socketPath) + if err != nil { + klog.ErrorS(err, "Error listening on Suricata socket") + return + } + defer listener.Close() + + klog.InfoS("L7 Listener Server started. Listening for connections...") + + for { + conn, err := listener.Accept() + if err != nil { + klog.ErrorS(err, "Error accepting Suricata connection") + continue + } + go exp.handleClientConnection(conn) + } +} + +func (exp *FlowExporter) handleClientConnection(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + for { + buffer, err := reader.ReadBytes('\n') + if err != nil { + klog.ErrorS(err, "Error reading data", "buffer", buffer) + break + } + exp.processLog(buffer) + } +} + +func (exp *FlowExporter) processLog(data []byte) { + // Check if the event type is "http" + if eventType := extractEventType(data); eventType != "http" { + return + } + var event JsonToEvent + err := json.Unmarshal(data, &event) + if err != nil { + klog.ErrorS(err, "Error parsing JSON:", "data", data) + return + } + exp.WriteConnection(event) +} + +func extractEventType(data []byte) string { + // Define a temporary struct for extracting the event type + type Temp struct { + EventType string `json:"event_type"` + } + + // Parse the JSON string and extract the event type + var temp Temp + err := json.Unmarshal(data, &temp) + if err != nil { + klog.ErrorS(err, "Error parsing JSON for eventtype:", "data", data) + return "" + } + + return temp.EventType +} + +func (exp *FlowExporter) WriteConnection(event JsonToEvent) { + protocol, err := flowexporter.LookupProtocolMap(event.Proto) + if err != nil { + klog.ErrorS(err, "InValid Protocol type") + return + } + // Get 5-tuple information + tuple := flowexporter.Tuple{ + SourceAddress: event.SrcIP, + DestinationAddress: event.DestIP, + Protocol: protocol, + SourcePort: uint16(event.SrcPort), + DestinationPort: uint16(event.DestPort), + } + conn := flowexporter.Connection{} + conn.FlowKey = tuple + connKey := flowexporter.NewConnectionKey(&conn) + var tempArr []string + tempArr = append(tempArr, + event.HTTP.Hostname, + event.HTTP.URL, + event.HTTP.UserAgent, + event.HTTP.ContentType, + event.HTTP.Method, + event.HTTP.Protocol, + strconv.Itoa(event.HTTP.Status), + strconv.Itoa(event.HTTP.ContentLength)) + if _, found := exp.l7events[connKey]; found { + exp.l7events[connKey].http = append(exp.l7events[connKey].http, tempArr) + exp.l7events[connKey].httpQueryFlag = false + } else { + var temp httpEvent + temp.http = append(temp.http, tempArr) + temp.httpQueryFlag = false + exp.l7events[connKey] = &temp + } +} + +func (exp *FlowExporter) getL7EventData(connkey flowexporter.ConnectionKey, field string) string { + _, exists := exp.l7events[connkey] + if exists { + switch field { + case "httpL7": + return "true" + case "httpVals": + var temp []string + // Using '<' '>' as delimiters + for _, httpVals := range exp.l7events[connkey].http { + temp = append(temp, strings.Join(httpVals, "<>")) + } + allHttpVals := strings.Join(temp, "><") + exp.l7events[connkey].httpQueryFlag = true + exp.verifyAndDeleteEvent(connkey) + return allHttpVals + } + } + return "" +} + +func (exp *FlowExporter) verifyAndDeleteEvent(connkey flowexporter.ConnectionKey) { + if exp.l7events[connkey].httpQueryFlag { + delete(exp.l7events, connkey) + } +} diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 2a1b5951c87..d26156eb4ff 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -17,6 +17,7 @@ package exporter import ( "context" "fmt" + // "encoding/json" "net" "strings" "testing" @@ -849,3 +850,249 @@ func TestFlowExporter_fillEgressInfo(t *testing.T) { }) } } +// func TestFlowExporter_l7Listener(t *testing.T) { +// flowExp := &FlowExporter{ +// l7events: make(map[flowexporter.ConnectionKey]*Event), +// } +// go flowExp.l7Listener() +// <-time.After(100 * time.Millisecond) + +// conn, err := net.Dial("unix", socketPath) +// if err != nil { +// t.Fatalf("Failed to connect to server: %s", err) +// } +// defer conn.Close() + +// testCases := []struct { +// name string +// input Event +// eventPresent bool +// expected Event +// expectedErr error +// }{ +// { +// name: "Valid case", +// input: Event{ +// Timestamp: "2023-06-16T20:31:48.910477+0000", +// FlowID: 1, +// InInterface: "mock_interface", +// EventType: "http", +// VLAN: []int{1}, +// SrcIP: net.ParseIP("10.10.0.1"), +// SrcPort: 59920, +// DestIP: net.ParseIP("10.10.0.2"), +// DestPort: 80, +// Proto: "TCP", +// TxID: 0, +// HTTP: Http{ +// Hostname: "10.10.0.1", +// URL: "/public/", +// UserAgent: "curl/7.74.0", +// ContentType: "text/html", +// Method: "GET", +// Protocol: "HTTP/1.1", +// Status: 200, +// ContentLength: 153, +// }, +// }, +// eventPresent: true, +// expected: Event{ +// Timestamp: "2023-06-16T20:31:48.910477+0000", +// FlowID: 1, +// InInterface: "mock_interface", +// EventType: "http", +// VLAN: []int{1}, +// SrcIP: net.ParseIP("10.10.0.1"), +// SrcPort: 59920, +// DestIP: net.ParseIP("10.10.0.2"), +// DestPort: 80, +// Proto: "TCP", +// TxID: 0, +// HTTP: Http{ +// Hostname: "10.10.0.1", +// URL: "/public/", +// UserAgent: "curl/7.74.0", +// ContentType: "text/html", +// Method: "GET", +// Protocol: "HTTP/1.1", +// Status: 200, +// ContentLength: 153, +// }, +// }, +// }, { +// name: "InValid eventType", +// input: Event{ +// Timestamp: "2023-06-16T20:31:48.910477+0000", +// FlowID: 1, +// InInterface: "mock_interface", +// EventType: "mock_event", +// VLAN: []int{1}, +// SrcIP: net.ParseIP("10.10.0.1"), +// SrcPort: 59920, +// DestIP: net.ParseIP("10.10.0.2"), +// DestPort: 80, +// Proto: "TCP", +// TxID: 0, +// HTTP: Http{ +// Hostname: "10.10.0.1", +// URL: "/public/", +// UserAgent: "curl/7.74.0", +// ContentType: "text/html", +// Method: "GET", +// Protocol: "HTTP/1.1", +// Status: 200, +// ContentLength: 153, +// }, +// }, +// eventPresent: false, +// expected: Event{}, +// }, +// } +// for _, tc := range testCases { +// jsonData, err := json.Marshal(tc.input) +// if err != nil { +// t.Errorf("Error Marshaling data: %v", err) +// } +// _, err = conn.Write(jsonData) +// if err != nil { +// t.Errorf("Error writing event data: %v", err) +// } +// <-time.After(100 * time.Millisecond) +// } + +// for _, tc := range testCases { +// protocol, _ := flowexporter.LookupProtocolMap(tc.expected.Proto) +// // Get 5-tuple information +// tuple := flowexporter.Tuple{ +// SourceAddress: tc.expected.SrcIP, +// DestinationAddress: tc.expected.DestIP, +// Protocol: protocol, +// SourcePort: uint16(tc.expected.SrcPort), +// DestinationPort: uint16(tc.expected.DestPort), +// } +// // Generate deny connection and add to deny connection store +// conn := flowexporter.Connection{} +// conn.FlowKey = tuple +// connKey := flowexporter.NewConnectionKey(&conn) +// // Check if event is present in event map +// existingEvent, exists := flowExp.l7events[connKey] +// assert.Equal(t, tc.eventPresent, exists) +// if exists == true { +// assert.Equal(t, tc.expected, *existingEvent) +// } +// } +// } + +// func TestGetL7EventData(t *testing.T) { +// flowExp := &FlowExporter{ +// l7events: make(map[flowexporter.ConnectionKey]*Event), +// } + +// // 5-tuple information +// tuple := flowexporter.Tuple{ +// SourceAddress: net.IP("10.10.0.1"), +// DestinationAddress: net.IP("10.10.0.2"), +// Protocol: 6, +// SourcePort: uint16(5229), +// DestinationPort: uint16(80), +// } +// conn := flowexporter.Connection{} +// conn.FlowKey = tuple +// connKey := flowexporter.NewConnectionKey(&conn) +// testCases := []struct { +// name string +// input Event +// expected string +// }{ +// { +// name: "httpHostname", +// input: Event{ +// HTTP: Http{ +// Hostname: "10.0.0.1", +// }, +// }, +// expected: "10.0.0.1", +// }, { +// name: "httpURL", +// input: Event{ +// HTTP: Http{ +// URL: "/public/", +// }, +// }, +// expected: "/public/", +// }, { +// name: "httpUserAgent", +// input: Event{ +// HTTP: Http{ +// UserAgent: "curl/7.74.0", +// }, +// }, +// expected: "curl/7.74.0", +// }, { +// name: "httpContentType", +// input: Event{ +// HTTP: Http{ +// ContentType: "text/html", +// }, +// }, +// expected: "text/html", +// }, { +// name: "httpMethod", +// input: Event{ +// HTTP: Http{ +// Method: "GET", +// }, +// }, +// expected: "GET", +// }, +// } +// for _, tc := range testCases { +// flowExp.l7events[connKey] = &tc.input +// result := flowExp.getL7EventData(connKey, tc.name) +// assert.Equal(t, tc.expected, result) +// } + +// //Connkey not present in Event map +// conn.FlowKey.Protocol = 1 +// connKey = flowexporter.NewConnectionKey(&conn) +// result := flowExp.getL7EventData(connKey, "httpHostname") +// assert.Equal(t, "", result) +// } + +// func TestGetL7EventHttpStatus(t *testing.T) { +// flowExp := &FlowExporter{ +// l7events: make(map[flowexporter.ConnectionKey]*Event), +// } + +// // 5-tuple information +// tuple := flowexporter.Tuple{ +// SourceAddress: net.IP("10.10.0.1"), +// DestinationAddress: net.IP("10.10.0.2"), +// Protocol: 6, +// SourcePort: uint16(5229), +// DestinationPort: uint16(80), +// } +// conn := flowexporter.Connection{} +// conn.FlowKey = tuple +// connKey := flowexporter.NewConnectionKey(&conn) +// testCases := []struct { +// name string +// input Event +// expected uint16 +// }{ +// { +// name: "httpStatus", +// input: Event{ +// HTTP: Http{ +// Status: 200, +// }, +// }, +// expected: uint16(200), +// }, +// } +// for _, tc := range testCases { +// flowExp.l7events[connKey] = &tc.input +// result := flowExp.getL7EventHttpStatus(connKey, tc.name) +// assert.Equal(t, tc.expected, result) +// } +// } diff --git a/pkg/agent/flowexporter/utils.go b/pkg/agent/flowexporter/utils.go index 7b007ed1863..8bdc6ecc984 100644 --- a/pkg/agent/flowexporter/utils.go +++ b/pkg/agent/flowexporter/utils.go @@ -15,7 +15,9 @@ package flowexporter import ( + "fmt" "strconv" + "strings" "github.com/vmware/go-ipfix/pkg/registry" @@ -26,6 +28,16 @@ const ( connectionDyingFlag = uint32(1 << 9) ) +var ( + Protocols = map[string]uint8{ + "icmp": 1, + "igmp": 2, + "tcp": 6, + "udp": 17, + "ipv6-icmp": 58, + } +) + // NewConnectionKey creates 5-tuple of flow as connection key func NewConnectionKey(conn *Connection) ConnectionKey { return ConnectionKey{conn.FlowKey.SourceAddress.String(), @@ -91,3 +103,14 @@ func PolicyTypeToUint8(policyType v1beta2.NetworkPolicyType) uint8 { return registry.PolicyTypeK8sNetworkPolicy } } + +// LookupProtocolMap returns protocol identifier given protocol name +func LookupProtocolMap(name string) (uint8, error) { + name = strings.TrimSpace(name) + lowerCaseStr := strings.ToLower(name) + proto, found := Protocols[lowerCaseStr] + if !found { + return 0, fmt.Errorf("unknown IP protocol specified: %s", name) + } + return proto, nil +} \ No newline at end of file diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 898892ff6a5..a8bac8154e8 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -125,6 +125,10 @@ const ( // Enable users to protect their applications by specifying how they are allowed to communicate with others, taking // into account application context. L7NetworkPolicy featuregate.Feature = "L7NetworkPolicy" + + // alpha: v1.13 + // Enable L7 visibility on Pods and Namespaces + L7Visibility featuregate.Feature = "L7Visibility" ) var ( @@ -159,6 +163,7 @@ var ( ExternalNode: {Default: false, PreRelease: featuregate.Alpha}, SupportBundleCollection: {Default: false, PreRelease: featuregate.Alpha}, L7NetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, + L7Visibility: {Default: false, PreRelease: featuregate.Alpha}, } // UnsupportedFeaturesOnWindows records the features not supported on @@ -182,6 +187,7 @@ var ( // in the future if it's fully tested on Windows. Multicluster: {}, L7NetworkPolicy: {}, + L7Visibility: {}, } // supportedFeaturesOnExternalNode records the features supported on an external // Node. Antrea Agent checks the enabled features if it is running on an diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index edd57a27fbd..ce86f544089 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -84,9 +84,12 @@ const ( reverseThroughputFromDestinationNode, clusterUUID, egressName, - egressIP) + egressIP, + isL7, + httpVals) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?)` ) // PrepareClickHouseConnection is used for unit testing @@ -330,6 +333,8 @@ func (ch *ClickHouseExportProcess) batchCommitAll(ctx context.Context) (int, err ch.clusterUUID, record.EgressName, record.EgressIP, + record.IsL7, + record.HttpVals, ) if err != nil { diff --git a/pkg/flowaggregator/flowlogger/logger.go b/pkg/flowaggregator/flowlogger/logger.go index 798589e7d65..ee2c0e22713 100644 --- a/pkg/flowaggregator/flowlogger/logger.go +++ b/pkg/flowaggregator/flowlogger/logger.go @@ -116,6 +116,8 @@ func (fl *FlowLogger) WriteRecord(r *flowrecord.FlowRecord, prettyPrint bool) er egressNetworkPolicyType, r.EgressName, r.EgressIP, + r.IsL7, + r.HttpVals, } str := strings.Join(fields, ",") diff --git a/pkg/flowaggregator/flowrecord/record.go b/pkg/flowaggregator/flowrecord/record.go index ed14f09f283..33a411a705f 100644 --- a/pkg/flowaggregator/flowrecord/record.go +++ b/pkg/flowaggregator/flowrecord/record.go @@ -70,6 +70,8 @@ type FlowRecord struct { ReverseThroughputFromDestinationNode uint64 EgressName string EgressIP string + IsL7 string + HttpVals string } // GetFlowRecord converts ipfixentities.Record to FlowRecord @@ -227,6 +229,11 @@ func GetFlowRecord(record ipfixentities.Record) *FlowRecord { } if egressIP, _, ok := record.GetInfoElementWithValue("egressIP"); ok { r.EgressIP = egressIP.GetStringValue() + if isL7, _, ok := record.GetInfoElementWithValue("isL7"); ok { + r.IsL7 = isL7.GetStringValue() + } + if httpVals, _, ok := record.GetInfoElementWithValue("httpVals"); ok { + r.HttpVals = httpVals.GetStringValue() } return r } diff --git a/pkg/flowaggregator/infoelements/elements.go b/pkg/flowaggregator/infoelements/elements.go index f93f422398b..f69abba1818 100644 --- a/pkg/flowaggregator/infoelements/elements.go +++ b/pkg/flowaggregator/infoelements/elements.go @@ -59,6 +59,8 @@ var ( "flowType", "egressName", "egressIP", + "isL7", + "httpVals", } AntreaInfoElementsIPv4 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv4"}...) AntreaInfoElementsIPv6 = append(AntreaInfoElementsCommon, []string{"destinationClusterIPv6"}...) diff --git a/pkg/flowaggregator/s3uploader/s3uploader.go b/pkg/flowaggregator/s3uploader/s3uploader.go index 68431b10153..ce92092b2ae 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader.go +++ b/pkg/flowaggregator/s3uploader/s3uploader.go @@ -484,4 +484,8 @@ func writeRecord(w io.Writer, r *flowrecord.FlowRecord, clusterUUID string) { io.WriteString(w, r.EgressName) io.WriteString(w, ",") io.WriteString(w, r.EgressIP) + io.WriteString(w, ",") + io.WriteString(w, r.IsL7) + io.WriteString(w, ",") + io.WriteString(w, r.HttpVals) }