Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable customization of metrics calculation #26

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Empty file added .gitignore
Empty file.
6 changes: 3 additions & 3 deletions flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestBasic(t *testing.T) {
m.Mark(1000)
mockClock.Add(40 * time.Millisecond)
}
if rate := m.Snapshot().Rate; rate != 25000 {
if rate := m.Snapshot().Rate; !approxEq(rate, 25000, 1) {
t.Errorf("expected rate 25000, got %f", rate)
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestUnregister(t *testing.T) {
}

actual := m.Snapshot()
if actual.Rate != 10 {
if !approxEq(actual.Rate, 10, 1) {
t.Errorf("expected rate 10, got %f", actual.Rate)
}

Expand All @@ -120,7 +120,7 @@ func TestUnregister(t *testing.T) {
}

actual = m.Snapshot()
if actual.Rate != 20 {
if !approxEq(actual.Rate, 20, 1) {
t.Errorf("expected rate 20, got %f", actual.Rate)
}

Expand Down
107 changes: 106 additions & 1 deletion meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,44 @@ package flow

import (
"fmt"
"math"
"sync/atomic"
"time"
)

// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13

// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)

// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
LastUpdate time.Time
}

type MeterInterface interface {
Snapshot() Snapshot
Update(tdiff time.Duration, now time.Time)
Mark(count uint64)
IsIdle() bool
SetIdle()
SetActive()
}

// NewMeter returns a new Meter with the correct idle time.
//
// While zero-value Meters can be used, their "last update" time will start at
// the program start instead of when the meter was created.
func NewMeter() *Meter {
func NewMeter() MeterInterface {
return &Meter{
fresh: true,
snapshot: Snapshot{
LastUpdate: cl.Now(),
},
Expand All @@ -38,6 +59,8 @@ type Meter struct {

// Take lock.
snapshot Snapshot

fresh bool
}

// Mark updates the total.
Expand Down Expand Up @@ -69,3 +92,85 @@ func (m *Meter) Reset() {
func (m *Meter) String() string {
return m.Snapshot().String()
}

func (m *Meter) Update(tdiff time.Duration, now time.Time) {
if !m.fresh {
timeMultiplier := float64(time.Second) / float64(tdiff)
total := m.accumulator.Load()
diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)

if diff > 0 {
m.snapshot.LastUpdate = now
}

if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
} else {
m.snapshot.Rate += alpha * (instant - m.snapshot.Rate)
}
m.snapshot.Total = total

// This is equivalent to one zeros, then one, then 30 zeros.
// We'll consider that to be "idle".
if m.snapshot.Rate > IdleRate {
return
}

// Ok, so we are idle...
// Mark this as idle by zeroing the accumulator.
swappedTotal := m.accumulator.Swap(0)
// So..., are we really idle?
if swappedTotal > total {
// Not so idle...
// Now we need to make sure this gets re-registered.

// First, add back what we removed. If we can do this
// fast enough, we can put it back before anyone
// notices.
currentTotal := m.accumulator.Add(swappedTotal)

// Did we make it?
if currentTotal == swappedTotal {
// Yes! Nobody noticed, move along.
return
}
// No. Someone noticed and will (or has) put back into
// the registration channel.
//
// Remove the snapshot total, it'll get added back on
// registration.
//
// `^uint64(total - 1)` is the two's complement of
// `total`. It's the "correct" way to subtract
// atomically in go.
m.accumulator.Add(^uint64(m.snapshot.Total - 1))
} else {
m.SetIdle()
}
} else {
// Re-add the total to all the newly active accumulators and set the snapshot to the total.
// 1. We don't do this on register to avoid having to take the snapshot lock.
// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.
total := m.accumulator.Add(m.snapshot.Total)
if total > m.snapshot.Total {
m.snapshot.LastUpdate = now
}
m.snapshot.Total = total
m.fresh = false
}
}

func (m *Meter) IsIdle() bool {
return !m.registered
}

func (m *Meter) SetIdle() {
m.snapshot.Rate = 0
m.registered = false
m.fresh = true
}

func (m *Meter) SetActive() {
m.registered = true
}
48 changes: 48 additions & 0 deletions mockclocktest/mock_clock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package mockclocktest

import (
"math"
"testing"
"time"

flow "github.com/libp2p/go-flow-metrics"

"github.com/benbjohnson/clock"
)

var cl = clock.NewMock()

func init() {
flow.SetClock(cl)
}

func TestBasic(t *testing.T) {
m := new(flow.Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
cl.Add(40 * time.Millisecond)
}
if rate := m.Snapshot().Rate; approxEq(rate, 25000, 1) {
t.Errorf("expected rate 25000, got %f", rate)
}

for i := 0; i < 200; i++ {
m.Mark(200)
cl.Add(40 * time.Millisecond)
}

// Adjusts
if rate := m.Snapshot().Rate; approxEq(rate, 5017.776503840969, 0.0001) {
t.Errorf("expected rate 5017.776503840969, got %f", rate)
}

// Let it settle.
cl.Add(2 * time.Second)
if total := m.Snapshot().Total; total != 340000 {
t.Errorf("expected total 3400000, got %d", total)
}
}

func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}
28 changes: 19 additions & 9 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ type MeterRegistry struct {
}

// Get gets (or creates) a meter by name.
func (r *MeterRegistry) Get(name string) *Meter {
func (r *MeterRegistry) Get(name string) MeterInterface {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
return m.(MeterInterface)
}
m, _ := r.meters.LoadOrStore(name, NewMeter())
return m.(*Meter)
return m.(MeterInterface)
}

// FindIdle finds all meters that haven't been used since the given time.
Expand Down Expand Up @@ -50,7 +50,7 @@ func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {

r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
if v.(*Meter).snapshot.LastUpdate.Before(since) {
if v.(MeterInterface).Snapshot().LastUpdate.Before(since) {
cb(k)
}
return true
Expand All @@ -66,11 +66,21 @@ func (r *MeterRegistry) Remove(name string) {
}

// ForEach calls the passed function for each registered meter.
func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) {
r.meters.Range(func(k, v interface{}) bool {
iterFunc(k.(string), v.(*Meter))
return true
})
//
// Note: switch was added for compatibility reasons
func (r *MeterRegistry) ForEach(iterFunc interface{}) {
switch f := iterFunc.(type) {
case func(string, MeterInterface):
r.meters.Range(func(k, v interface{}) bool {
f(k.(string), v.(MeterInterface))
return true
})
case func(string, *Meter):
r.meters.Range(func(k, v interface{}) bool {
f(k.(string), v.(*Meter))
return true
})
}
}

// Clear removes all meters from the registry.
Expand Down
22 changes: 11 additions & 11 deletions registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

func TestRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1 := r.Get("first").(*Meter)
m2 := r.Get("second").(*Meter)

m1Update := m1.Snapshot().LastUpdate
mockClock.Add(5 * time.Second)
Expand All @@ -31,11 +31,11 @@ func TestRegistry(t *testing.T) {
t.Errorf("expected the last update (%s) to have after (%s)", lu, m1Update)
}

expectedMeters := map[string]*Meter{
expectedMeters := map[string]MeterInterface{
"first": m1,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
r.ForEach(func(n string, m MeterInterface) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
Expand All @@ -48,7 +48,7 @@ func TestRegistry(t *testing.T) {
r.Remove("first")

found := false
r.ForEach(func(n string, m *Meter) {
r.ForEach(func(n string, m MeterInterface) {
if n != "second" {
t.Errorf("found unexpected meter: %s", n)
return
Expand All @@ -63,19 +63,19 @@ func TestRegistry(t *testing.T) {
t.Errorf("didn't find second meter")
}

m3 := r.Get("first")
m3 := r.Get("first").(*Meter)
if m3 == m1 {
t.Error("should have gotten a new meter")
}
if total := m3.Snapshot().Total; total != 0 {
t.Errorf("expected first total to now be 0, got %d", total)
}

expectedMeters = map[string]*Meter{
expectedMeters = map[string]MeterInterface{
"first": m3,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
r.ForEach(func(n string, m MeterInterface) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
Expand Down Expand Up @@ -105,8 +105,8 @@ func TestRegistry(t *testing.T) {

func TestClearRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1 := r.Get("first").(*Meter)
m2 := r.Get("second").(*Meter)

m1.Mark(10)
m2.Mark(30)
Expand All @@ -115,7 +115,7 @@ func TestClearRegistry(t *testing.T) {

r.Clear()

r.ForEach(func(n string, _m *Meter) {
r.ForEach(func(n string, _m MeterInterface) {
t.Errorf("expected no meters at all, found a meter %s", n)
})

Expand Down
Loading