Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
feat: track egress network traffic (#11)
Browse files Browse the repository at this point in the history
Signed-off-by: Grant Linville <[email protected]>
  • Loading branch information
g-linville authored Feb 6, 2024
1 parent 48ed971 commit 6b4c0bc
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 62 deletions.
13 changes: 7 additions & 6 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,13 @@ func (a *Aggregator) processThroughputEvent(e throughput.ThroughputEvent) {
}

pkt := datastore.Packet{
Time: e.Timestamp,
Size: e.Size,
FromIP: e.SAddr,
FromPort: e.SPort,
ToIP: e.DAddr,
ToPort: e.DPort,
Time: e.Timestamp,
Size: e.Size,
FromIP: e.SAddr,
FromPort: e.SPort,
ToIP: e.DAddr,
ToPort: e.DPort,
IsIngress: e.IsIngress,
}

// determine source information
Expand Down
3 changes: 2 additions & 1 deletion aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (a *Aggregator) processPod(d k8s.K8sResourceMessage) {
OwnerID: ownerID,
OwnerName: ownerName,

Labels: pod.GetLabels(),
Labels: pod.GetLabels(),
Annotations: pod.GetAnnotations(),
}

switch d.EventType {
Expand Down
23 changes: 14 additions & 9 deletions datastore/dto.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package datastore

type Pod struct {
UID string // Pod UID
Name string // Pod Name
Namespace string // Namespace
Image string // Main container image
IP string // Pod IP
OwnerType string // ReplicaSet or nil
OwnerID string // ReplicaSet UID
OwnerName string // ReplicaSet Name
Labels map[string]string
UID string // Pod UID
Name string // Pod Name
Namespace string // Namespace
Image string // Main container image
IP string // Pod IP
OwnerType string // ReplicaSet or nil
OwnerID string // ReplicaSet UID
OwnerName string // ReplicaSet Name
Labels map[string]string
Annotations map[string]string
}

type Service struct {
Expand Down Expand Up @@ -152,4 +153,8 @@ type Packet struct {
ToType Dest
ToUID string
ToPort uint16
// IsIngress indicates whether the packet was detected on the ingress or egress bpf filter.
// The egress bpf filter is used for the throughput metric, while the ingress bpf filter
// is used for the egress metric. (Seems backwards, but that is how it actually works.)
IsIngress bool
}
38 changes: 20 additions & 18 deletions datastore/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ type Event interface {
}

type PodEvent struct {
UID string `json:"uid"`
EventType string `json:"event_type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
IP string `json:"ip"`
OwnerType string `json:"owner_type"`
OwnerName string `json:"owner_name"`
OwnerID string `json:"owner_id"`
Labels map[string]string `json:"labels"`
UID string `json:"uid"`
EventType string `json:"event_type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
IP string `json:"ip"`
OwnerType string `json:"owner_type"`
OwnerName string `json:"owner_name"`
OwnerID string `json:"owner_id"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
}

func (p PodEvent) GetUID() string { return p.UID }
Expand Down Expand Up @@ -136,15 +137,16 @@ type RequestsPayload struct {

func convertPodToPodEvent(pod Pod, eventType string) PodEvent {
return PodEvent{
UID: pod.UID,
EventType: eventType,
Name: pod.Name,
Namespace: pod.Namespace,
IP: pod.IP,
OwnerType: pod.OwnerType,
OwnerName: pod.OwnerName,
OwnerID: pod.OwnerID,
Labels: pod.Labels,
UID: pod.UID,
EventType: eventType,
Name: pod.Name,
Namespace: pod.Namespace,
IP: pod.IP,
OwnerType: pod.OwnerType,
OwnerName: pod.OwnerName,
OwnerID: pod.OwnerID,
Labels: pod.Labels,
Annotations: pod.Annotations,
}
}

Expand Down
77 changes: 62 additions & 15 deletions datastore/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package datastore

import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"

"github.com/ddosify/alaz/log"
"github.com/prometheus/client_golang/prometheus"
)

const (
accountIDLabel = "acorn.io/account-id"
appLabel = "acorn.io/app-public-name"
appNamespaceLabel = "acorn.io/app-namespace"
containerLabel = "acorn.io/container-name"
projectLabel = "acorn.io/project-name"

resolvedOfferingsAnnotation = "acorn.io/container-resolved-offerings"
)

var (
latencyHistLabels = []string{"toPod", "toAcornApp", "toAcornContainer", "toAcornAppNamespace"}
statusCounterLabels = []string{"toPod", "toAcornApp", "toAcornContainer", "toAcornAppNamespace", "status"}
throughputCounterLabels = []string{"fromPod", "fromAcornApp", "fromAcornContainer", "fromAcornAppNamespace", "fromHostname", "toPod", "toAcornApp", "toAcornContainer", "toAcornAppNamespace", "toPort", "toHostname"}
egressCounterLabels = []string{"fromPod", "fromAcornApp", "fromAcornContainer", "fromAcornProject", "fromAcornAccountID", "fromAcornComputeClass"}
)

type PrometheusExporter struct {
Expand All @@ -28,6 +35,7 @@ type PrometheusExporter struct {
latencyHistogram *prometheus.HistogramVec
statusCounter *prometheus.CounterVec
throughputCounter *prometheus.CounterVec
egressCounter *prometheus.CounterVec

podCache *eventCache
podIPCache *eventCache
Expand Down Expand Up @@ -111,6 +119,15 @@ func NewPrometheusExporter(ctx context.Context) *PrometheusExporter {
)
exporter.reg.MustRegister(exporter.throughputCounter)

exporter.egressCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "alaz",
Name: "egress",
},
egressCounterLabels,
)
exporter.reg.MustRegister(exporter.egressCounter)

go exporter.handleReqs()
go exporter.handlePackets()

Expand Down Expand Up @@ -174,12 +191,9 @@ func (p *PrometheusExporter) handlePackets() {
}

func (p *PrometheusExporter) handlePacket(pkt Packet) {
labels := prometheus.Labels{
"toPort": strconv.Itoa(int(pkt.ToPort)),
}

// We only want to keep metrics between pods in the same project (app namespace)
if pkt.FromType == PodSource && pkt.ToType == PodDest {
// Check for packets between pods in the same project.
// (Reminder: !pkt.IsIngress means that the packet was detected by the egress eBPF filter.)
if !pkt.IsIngress && pkt.FromType == PodSource && pkt.ToType == PodDest {
fromPod, found := p.podCache.get(pkt.FromUID)
toPod, found2 := p.podCache.get(pkt.ToUID)

Expand All @@ -188,19 +202,52 @@ func (p *PrometheusExporter) handlePacket(pkt Packet) {
fromPod.(PodEvent).Labels[appNamespaceLabel] == toPod.(PodEvent).Labels[appNamespaceLabel] &&
fromPod.(PodEvent).Labels[appLabel] == toPod.(PodEvent).Labels[appLabel] {

labels["fromPod"] = fromPod.(PodEvent).Name
labels["fromAcornApp"] = fromPod.(PodEvent).Labels[appLabel]
labels["fromAcornAppNamespace"] = fromPod.(PodEvent).Labels[appNamespaceLabel]
labels["fromAcornContainer"] = fromPod.(PodEvent).Labels[containerLabel]

labels["toPod"] = toPod.(PodEvent).Name
labels["toAcornApp"] = toPod.(PodEvent).Labels[appLabel]
labels["toAcornAppNamespace"] = toPod.(PodEvent).Labels[appNamespaceLabel]
labels["toAcornContainer"] = toPod.(PodEvent).Labels[containerLabel]
labels := prometheus.Labels{
"toPort": strconv.Itoa(int(pkt.ToPort)),
"fromPod": fromPod.(PodEvent).Name,
"fromAcornApp": fromPod.(PodEvent).Labels[appLabel],
"fromAcornAppNamespace": fromPod.(PodEvent).Labels[appNamespaceLabel],
"fromAcornContainer": fromPod.(PodEvent).Labels[containerLabel],
"toPod": toPod.(PodEvent).Name,
"toAcornApp": toPod.(PodEvent).Labels[appLabel],
"toAcornAppNamespace": toPod.(PodEvent).Labels[appNamespaceLabel],
"toAcornContainer": toPod.(PodEvent).Labels[containerLabel],
}

p.throughputCounter.With(setEmptyPrometheusLabels(labels, throughputCounterLabels)).Add(float64(pkt.Size))
}
}

// Check for packets from pods to outside the cluster.
// (Reminder: pkt.IsIngress just means that the packet was detected by the ingress eBPF filter, which is actually detecting egress traffic.)
// OutsideDest indicates that the destination IP address is not a known pod or service IP address.
// We also filter out the 10. prefix because that is the internal IP address range used by the cluster.
if pkt.IsIngress && pkt.FromType == PodSource && pkt.ToType == OutsideDest && !strings.HasPrefix(pkt.ToIP, "10.") {
fromPod, found := p.podCache.get(pkt.FromUID)

if found && fromPod.(PodEvent).Labels[accountIDLabel] != "" {
labels := prometheus.Labels{
"fromPod": fromPod.(PodEvent).Name,
"fromAcornApp": fromPod.(PodEvent).Labels[appLabel],
"fromAcornProject": fromPod.(PodEvent).Labels[projectLabel],
"fromAcornContainer": fromPod.(PodEvent).Labels[containerLabel],
"fromAcornAccountID": fromPod.(PodEvent).Labels[accountIDLabel],
}

if resolvedOfferingsJson, ok := fromPod.(PodEvent).Annotations[resolvedOfferingsAnnotation]; ok {
offerings := map[string]any{}
if err := json.Unmarshal([]byte(resolvedOfferingsJson), &offerings); err == nil {
if class, ok := offerings["class"]; ok {
labels["fromAcornComputeClass"] = class.(string)
}
} else {
log.Logger.Error().Msg(err.Error())
}
}

p.egressCounter.With(setEmptyPrometheusLabels(labels, egressCounterLabels)).Add(float64(pkt.Size))
}
}
}

func setEmptyPrometheusLabels(labels prometheus.Labels, labelList []string) prometheus.Labels {
Expand Down
19 changes: 16 additions & 3 deletions datastore/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewServer(ctx context.Context, reg *prometheus.Registry, podIPCache *eventC
}

func (s *Server) Serve() {
http.Handle("/metricz", s.authorizePrometheus(promhttp.HandlerFor(s.reg, promhttp.HandlerOpts{})))
http.Handle("/metricz", s.authorize(promhttp.HandlerFor(s.reg, promhttp.HandlerOpts{})))
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Logger.Error().Err(err).Msg("error while serving metrics")
Expand All @@ -44,7 +44,10 @@ func (s *Server) Serve() {
log.Logger.Info().Msg("Prometheus HTTP server stopped")
}

func (s *Server) authorizePrometheus(handler http.Handler) http.Handler {
func (s *Server) authorize(handler http.Handler) http.Handler {
// Only two things are authorized to scrape Alaz:
// - Prometheus
// - cluster-agent
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var sourceIP string
parts := strings.Split(r.RemoteAddr, ":")
Expand All @@ -58,7 +61,7 @@ func (s *Server) authorizePrometheus(handler http.Handler) http.Handler {
}

pod, ok := s.podIPCache.get(sourceIP)
if ok && pod.(PodEvent).Namespace == s.prometheusNamespace {
if ok && (isPrometheus(pod.(PodEvent)) || isClusterAgent(pod.(PodEvent))) {
handler.ServeHTTP(w, r)
return
}
Expand All @@ -68,3 +71,13 @@ func (s *Server) authorizePrometheus(handler http.Handler) http.Handler {
w.Write([]byte("401 Unauthorized\n"))
})
}

func isPrometheus(p PodEvent) bool {
return p.Namespace == "prometheus-operator"
}

func isClusterAgent(p PodEvent) bool {
return p.Labels[appLabel] == "cluster-agent" &&
p.Labels[appNamespaceLabel] == "acorn" &&
p.Labels[containerLabel] == "cluster-agent"
}
Binary file modified ebpf/throughput/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/throughput/bpf_bpfel.o
Binary file not shown.
64 changes: 59 additions & 5 deletions ebpf/throughput/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type ThroughputEventBpf struct {
DPort uint16
SAddr [16]byte
DAddr [16]byte
IsIngress uint8
}

// for user space
Expand All @@ -34,6 +35,7 @@ type ThroughputEvent struct {
DPort uint16
SAddr string
DAddr string
IsIngress bool
}

// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
Expand Down Expand Up @@ -97,6 +99,7 @@ func DeployAndWait(ctx context.Context, ch chan interface{}, eventChan <-chan in
DPort: bpfEvent.DPort,
SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]),
DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]),
IsIngress: bpfEvent.IsIngress != 0,
}
}

Expand Down Expand Up @@ -148,11 +151,8 @@ func setFiltersOnCiliumInterfaces(objs bpfObjects) error {
errs = append(errs, fmt.Errorf("failed to set up egress filter for link %s: %w", link.Attrs().Name, err))
}

// We were previously using an ingress filter in addition to the egress filter, but it wasn't actually doing anything.
// For now, we will just delete those until we can figure out how to make them work.
// Egress on its own is enough to track throughput between all pods in the cluster.
if err := deleteIngressFilters(link); err != nil {
errs = append(errs, fmt.Errorf("failed to set up ingress filter for link %s: %w", link.Attrs().Name, err))
if err := setUpIngressFilter(link, objs); err != nil {
errs = append(errs, err)
}
}
}
Expand Down Expand Up @@ -190,6 +190,40 @@ func setUpEgressFilter(link netlink.Link, objs bpfObjects) error {
return netlink.FilterReplace(filter)
}

func setUpIngressFilter(link netlink.Link, objs bpfObjects) error {
existingFilters, err := netlink.FilterList(link, netlink.HANDLE_MIN_INGRESS)
if err != nil {
return err
}

for _, filter := range existingFilters {
if bpfFilter, ok := filter.(*netlink.BpfFilter); ok {
if bpfFilter.Name == "throughput_bpf_ingress" || bpfFilter.Name == "packet_classifier" {
if err := netlink.FilterDel(filter); err != nil {
return err
}
}
}
}

filter := &netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: link.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Protocol: unix.ETH_P_ALL,
Priority: 1,
},
Fd: objs.bpfPrograms.PacketClassifier.FD(),
Name: "throughput_bpf_ingress",
DirectAction: true,
}

if err := netlink.FilterReplace(filter); err != nil {
return fmt.Errorf("failed to set up ingress filter for link %s: %w", link.Attrs().Name, err)
}
return nil
}

func deleteIngressFilters(link netlink.Link) error {
existingFilters, err := netlink.FilterList(link, netlink.HANDLE_MIN_INGRESS)
if err != nil {
Expand All @@ -209,3 +243,23 @@ func deleteIngressFilters(link netlink.Link) error {

return nil
}

func deleteEgressFilters(link netlink.Link) error {
existingFilters, err := netlink.FilterList(link, netlink.HANDLE_MIN_EGRESS)
if err != nil {
return err
}

for _, filter := range existingFilters {
if filter.Type() == "bpf" {
bpfFilter := filter.(*netlink.BpfFilter)
if bpfFilter.Name == "throughput_bpf_egress" {
if err := netlink.FilterDel(bpfFilter); err != nil {
return err
}
}
}
}

return nil
}
Loading

0 comments on commit 6b4c0bc

Please sign in to comment.