Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete stale metric labels from BPF maps #99

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/activeconn/activeconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "bpf/bpf_helpers.h"
#include "bpf/bpf_core_read.h"
#include "bpf/bpf_tracing.h"
#include "unistd.h"

char __license[] SEC("license") = "Dual MIT/GPL";

Expand Down Expand Up @@ -49,6 +50,7 @@ record(struct pt_regs *ctx, int ret, int op)
__u32 daddr;
u64 val;
u64 *valp;
__u32 myval;
struct dimensions_t key = {};

bpf_printk("exit: getting sk for tid: '%u', ret is: '%d'", tid, ret);
Expand Down Expand Up @@ -76,6 +78,7 @@ record(struct pt_regs *ctx, int ret, int op)
}
bpf_map_update_elem(&sockets_ext, &key, &val, 0);
bpf_map_delete_elem(&sockets, &tid);

return 0;
}

Expand Down
1 change: 1 addition & 0 deletions examples/tcpconnect/tcpconnect.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ exit_tcp_connect(struct pt_regs *ctx, int ret)
val = 1;
}
else {
bpf_map_delete_elem(&sockets, &tid);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a permanent change and is it required? For the long term, I think it would be nice if we would only alter the upstream examples as little as absolutely needed for maintainability purposes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krisztianfekete no this is just for testing, I'm working off a VM in gcloud so have been pushing changes for tests. Will remove before merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tjons this PR looks good. Can you clean up so we can merge?

bpf_printk("found existing value '%llu' for {saddr: %u, daddr: %u}", *valp, hash_key.saddr, hash_key.daddr);
val = *valp + 1;
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func (l *loader) startHashMap(
select {
case <-ticker.C:
mapIter := liveMap.Iterate()
labels := make([]map[string]string, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the map have a Len function to use here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very unfortunately it does not

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okie dokie

for {
// Use generic key,value so we can decode ourselves
var (
Expand Down Expand Up @@ -398,12 +399,15 @@ func (l *loader) startHashMap(
}
stringLabels := stringify(decodedKey)
instrument.Set(ctx, int64(intVal), stringLabels)
labels = append(labels, stringLabels)
thisKvPair := KvPair{Key: stringLabels, Value: fmt.Sprint(intVal)}
watcher.SendEntry(MapEntry{
Name: name,
Entry: thisKvPair,
})
}
// remove any old labels that weren't in this last iteration of the HashMap
instrument.Clean(labels)

case <-ctx.Done():
// fmt.Println("got done in hashmap loop, returning")
Expand Down Expand Up @@ -453,6 +457,8 @@ func (n *noop) Set(
) {
}

func (n *noop) Clean(_ []map[string]string) {}

func createDir(ctx context.Context, path string, perm os.FileMode) error {
file, err := os.Stat(path)
if os.IsNotExist(err) {
Expand Down
95 changes: 84 additions & 11 deletions pkg/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net/http"
"strings"

"github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -20,7 +21,7 @@ const (
type PrometheusOpts struct {
Port uint32
MetricsPath string
Registry *prometheus.Registry
Registry *prometheus.Registry
}

func (p *PrometheusOpts) initDefaults() {
Expand Down Expand Up @@ -70,13 +71,15 @@ type MetricsProvider interface {

type IncrementInstrument interface {
Increment(ctx context.Context, labels map[string]string)
Clean(newLabels []map[string]string)
}

type SetInstrument interface {
Set(ctx context.Context, val int64, labels map[string]string)
Clean(newLabels []map[string]string)
}

type metricsProvider struct{
type metricsProvider struct {
registry *prometheus.Registry
}

Expand All @@ -88,8 +91,9 @@ func (m *metricsProvider) NewSetCounter(name string, labels []string) SetInstrum

m.register(counter)
return &setCounter{
counter: counter,
counterMap: map[uint64]int64{},
counter: counter,
counterMap: map[uint64]int64{},
currentLabels: map[string]map[string]string{},
}
}

Expand All @@ -101,7 +105,8 @@ func (m *metricsProvider) NewIncrementCounter(name string, labels []string) Incr

m.register(counter)
return &incrementCounter{
counter: counter,
counter: counter,
currentLabels: map[string]map[string]string{},
}
}

Expand All @@ -113,7 +118,8 @@ func (m *metricsProvider) NewGauge(name string, labels []string) SetInstrument {

m.register(gaugeVec)
return &gauge{
gauge: gaugeVec,
gauge: gaugeVec,
currentLabels: map[string]map[string]string{},
}
}

Expand All @@ -126,16 +132,42 @@ func (m *metricsProvider) register(collectors ...prometheus.Collector) {
}

type setCounter struct {
counter *prometheus.CounterVec
counterMap map[uint64]int64
counter *prometheus.CounterVec
counterMap map[uint64]int64
currentLabels map[string]map[string]string
}

func makeLabelString(label map[string]string) string {
pairs := make([]string, 0, len(label))
for k, v := range label {
pairs = append(pairs, fmt.Sprintf("%s|%s", k, v))
}
return strings.Join(pairs, ",")
}

func (c *setCounter) trackLabel(decodedKey map[string]string) {
c.currentLabels[makeLabelString(decodedKey)] = decodedKey
}

func (c *setCounter) Clean(newLabels []map[string]string) {
labelsToKeep := make(map[string]bool)
for _, newLabel := range newLabels {
labelsToKeep[makeLabelString(newLabel)] = true
}

for oldLabelKey, oldLabel := range c.currentLabels {
if _, ok := labelsToKeep[oldLabelKey]; !ok {
delete(c.currentLabels, oldLabelKey)
c.counter.Delete(oldLabel)
}
}
}

func (c *setCounter) Set(
ctx context.Context,
intVal int64,
decodedKey map[string]string,
) {

keyHash, err := hashstructure.Hash(decodedKey, hashstructure.FormatV2, nil)
if err != nil {
log.Fatal("This should never happen")
Expand All @@ -148,21 +180,43 @@ func (c *setCounter) Set(
}
c.counterMap[keyHash] = intVal
c.counter.With(prometheus.Labels(decodedKey)).Add(float64(diff))
c.trackLabel(decodedKey)
}

type incrementCounter struct {
counter *prometheus.CounterVec
counter *prometheus.CounterVec
currentLabels map[string]map[string]string
}

func (i *incrementCounter) Increment(
ctx context.Context,
decodedKey map[string]string,
) {
i.counter.With(prometheus.Labels(decodedKey)).Inc()
i.trackLabel(decodedKey)
}

func (i *incrementCounter) trackLabel(decodedKey map[string]string) {
i.currentLabels[makeLabelString(decodedKey)] = decodedKey
}

func (i *incrementCounter) Clean(newLabels []map[string]string) {
labelsToKeep := make(map[string]bool)
for _, newLabel := range newLabels {
labelsToKeep[makeLabelString(newLabel)] = true
}

for oldLabelKey, oldLabel := range i.currentLabels {
if _, ok := labelsToKeep[oldLabelKey]; !ok {
delete(i.currentLabels, oldLabelKey)
i.counter.Delete(oldLabel)
}
}
}

type gauge struct {
gauge *prometheus.GaugeVec
gauge *prometheus.GaugeVec
currentLabels map[string]map[string]string
}

func (g *gauge) Set(
Expand All @@ -171,4 +225,23 @@ func (g *gauge) Set(
decodedKey map[string]string,
) {
g.gauge.With(prometheus.Labels(decodedKey)).Set(float64(intVal))
g.trackLabel(decodedKey)
}

func (g *gauge) trackLabel(decodedKey map[string]string) {
g.currentLabels[makeLabelString(decodedKey)] = decodedKey
}

func (g *gauge) Clean(newLabels []map[string]string) {
labelsToKeep := make(map[string]bool)
for _, newLabel := range newLabels {
labelsToKeep[makeLabelString(newLabel)] = true
}

for oldLabelKey, oldLabel := range g.currentLabels {
if _, ok := labelsToKeep[oldLabelKey]; !ok {
delete(g.currentLabels, oldLabelKey)
g.gauge.Delete(oldLabel)
}
}
}