Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staleness batch #6355

Merged
merged 10 commits into from
Feb 16, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Main (unreleased)
- Added additional http client proxy configurations to components for
`no_proxy`, `proxy_from_environment`, and `proxy_connect_header`. (@erikbaranowski)

- Batch staleness tracking to reduce mutex contention and increase performance. (@mattdurham)

### Bugfixes

- Fix an issue in `remote.s3` where the exported content of an object would be an empty string if `remote.s3` failed to fully retrieve
Expand Down
38 changes: 20 additions & 18 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)
Expand Down Expand Up @@ -75,11 +74,12 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {
ctx = scrape.ContextWithMetricMetadataStore(ctx, NoopMetadataStore{})

app := &appender{
children: make([]storage.Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
children: make([]storage.Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
}

for _, x := range f.children {
Expand All @@ -92,12 +92,13 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {
}

type appender struct {
children []storage.Appender
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
children []storage.Appender
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
}

var _ storage.Appender = (*appender)(nil)
Expand All @@ -110,12 +111,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing some context here, but don't we need similar staleness tracking for AppendExemplar and AppendHistogram?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do but it never had it so keeping the pr small. I will have 2-3 more prs incoming.

GlobalRefID: uint64(ref),
Labels: l,
Value: v,
})
var multiErr error
updated := false
for _, x := range a.children {
Expand All @@ -136,6 +136,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
func (a *appender) Commit() error {
defer a.recordLatency()
var multiErr error
a.ls.TrackStaleness(a.stalenessTrackers)
for _, x := range a.children {
err := x.Commit()
if err != nil {
Expand All @@ -148,6 +149,7 @@ func (a *appender) Commit() error {
// Rollback satisfies the Appender interface.
func (a *appender) Rollback() error {
defer a.recordLatency()
a.ls.TrackStaleness(a.stalenessTrackers)
var multiErr error
for _, x := range a.children {
err := x.Rollback()
Expand Down
27 changes: 14 additions & 13 deletions component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -80,8 +79,9 @@ func WithHistogramHook(f func(ref storage.SeriesRef, l labels.Labels, t int64, h
// Appender satisfies the Appendable interface.
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
app := &interceptappender{
interceptor: f,
ls: f.ls,
interceptor: f,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
}
if f.next != nil {
app.child = f.next.Appender(ctx)
Expand All @@ -90,9 +90,10 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
}

type interceptappender struct {
interceptor *Interceptor
child storage.Appender
ls labelstore.LabelStore
interceptor *Interceptor
child storage.Appender
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
}

var _ storage.Appender = (*interceptappender)(nil)
Expand All @@ -102,13 +103,11 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create a labelstore.StalenessTracker here, but later in func (s *service) TrackStaleness(ids []StalenessTracker) we convert these to &staleMarker{} and calculate the labels hash. Could we instead create the &staleMarker{} type right away here and calculate the hash here?

That way there will be less work and structs to create, the code will get simpler too I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel those are owned by two different things. Mainly the last marked state should really only be set by the labelstore itself and exposing that field feels off. This gets cleaned up slightly in the next PR.

GlobalRefID: uint64(ref),
Labels: l,
Value: v,
})

if a.interceptor.onAppend != nil {
return a.interceptor.onAppend(ref, l, t, v, a.child)
Expand All @@ -121,6 +120,7 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int

// Commit satisfies the Appender interface.
func (a *interceptappender) Commit() error {
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
}
Expand All @@ -129,6 +129,7 @@ func (a *interceptappender) Commit() error {

// Rollback satisfies the Appender interface.
func (a *interceptappender) Rollback() error {
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
}
Expand Down
15 changes: 9 additions & 6 deletions service/labelstore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package labelstore
import "github.com/prometheus/prometheus/model/labels"

type LabelStore interface {

// GetOrAddLink returns the global id for the values, if none found one will be created based on the lbls.
GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64

Expand All @@ -16,12 +15,16 @@ type LabelStore interface {
// GetLocalRefID gets the mapping from global to local id specific to a component. Returns 0 if nothing found.
GetLocalRefID(componentID string, globalRefID uint64) uint64

// AddStaleMarker adds a stale marker to a reference, that reference will then get removed on the next check.
AddStaleMarker(globalRefID uint64, l labels.Labels)

// RemoveStaleMarker removes the stale marker for a reference, keeping it around.
RemoveStaleMarker(globalRefID uint64)
// TrackStaleness adds a stale marker if NaN, then that reference will be removed on the next check. If not a NaN
// then if tracked will remove it.
TrackStaleness(ids []StalenessTracker)

// CheckAndRemoveStaleMarkers identifies any series with a stale marker and removes those entries from the LabelStore.
CheckAndRemoveStaleMarkers()
}

type StalenessTracker struct {
GlobalRefID uint64
Value float64
Labels labels.Labels
}
38 changes: 25 additions & 13 deletions service/labelstore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
flow_service "github.com/grafana/agent/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
)

const ServiceName = "labelstore"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (s *service) Describe(m chan<- *prometheus.Desc) {
m <- s.totalIDs
m <- s.idsInRemoteWrapping
}

func (s *service) Collect(m chan<- prometheus.Metric) {
s.mut.Lock()
defer s.mut.Unlock()
Expand Down Expand Up @@ -196,24 +198,34 @@ func (s *service) GetLocalRefID(componentID string, globalRefID uint64) uint64 {
return local
}

// AddStaleMarker adds a stale marker
func (s *service) AddStaleMarker(globalRefID uint64, l labels.Labels) {
s.mut.Lock()
defer s.mut.Unlock()

s.staleGlobals[globalRefID] = &staleMarker{
lastMarkedStale: time.Now(),
labelHash: l.Hash(),
globalID: globalRefID,
func (s *service) TrackStaleness(ids []StalenessTracker) {
var (
toAdd = make([]*staleMarker, 0)
toRemove = make([]uint64, 0)
now = time.Now()
)

for _, id := range ids {
if value.IsStaleNaN(id.Value) {
toAdd = append(toAdd, &staleMarker{
globalID: id.GlobalRefID,
lastMarkedStale: now,
labelHash: id.Labels.Hash(),
})
} else {
toRemove = append(toRemove, id.GlobalRefID)
}
}
}

// RemoveStaleMarker removes a stale marker
func (s *service) RemoveStaleMarker(globalRefID uint64) {
s.mut.Lock()
defer s.mut.Unlock()

delete(s.staleGlobals, globalRefID)
for _, marker := range toAdd {
s.staleGlobals[marker.globalID] = marker
}
for _, id := range toRemove {
delete(s.staleGlobals, id)
}
}

// staleDuration determines how long we should wait after a stale value is received to GC that value
Expand Down
62 changes: 59 additions & 3 deletions service/labelstore/service_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package labelstore

import (
"math"
"strconv"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -122,7 +126,13 @@ func TestStaleness(t *testing.T) {

global1 := mapping.GetOrAddLink("1", 1, l)
_ = mapping.GetOrAddLink("2", 1, l2)
mapping.AddStaleMarker(global1, l)
mapping.TrackStaleness([]StalenessTracker{
{
GlobalRefID: global1,
Value: math.Float64frombits(value.StaleNaN),
Labels: l,
},
})
require.Len(t, mapping.staleGlobals, 1)
require.Len(t, mapping.labelsHashToGlobal, 2)
staleDuration = 1 * time.Millisecond
Expand All @@ -141,8 +151,54 @@ func TestRemovingStaleness(t *testing.T) {
})

global1 := mapping.GetOrAddLink("1", 1, l)
mapping.AddStaleMarker(global1, l)
mapping.TrackStaleness([]StalenessTracker{
{
GlobalRefID: global1,
Value: math.Float64frombits(value.StaleNaN),
Labels: l,
},
})

require.Len(t, mapping.staleGlobals, 1)
mapping.RemoveStaleMarker(global1)
// This should remove it from staleness tracking.
mapping.TrackStaleness([]StalenessTracker{
{
GlobalRefID: global1,
Value: 1,
Labels: l,
},
})
require.Len(t, mapping.staleGlobals, 0)
}

func BenchmarkStaleness(b *testing.B) {
b.StopTimer()
ls := New(log.NewNopLogger(), prometheus.DefaultRegisterer)

tracking := make([]StalenessTracker, 100_000)
for i := 0; i < 100_000; i++ {
l := labels.FromStrings("id", strconv.Itoa(i))
gid := ls.GetOrAddGlobalRefID(l)
var val float64
if i%2 == 0 {
val = float64(i)
} else {
val = math.Float64frombits(value.StaleNaN)
}
tracking[i] = StalenessTracker{
GlobalRefID: gid,
Value: val,
Labels: l,
}
}
b.StartTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
ls.TrackStaleness(tracking)
wg.Done()
}()
}
wg.Wait()
}
Loading