Skip to content

Commit

Permalink
make rolling window configurable
Browse files Browse the repository at this point in the history
Change-Id: I9cf603df0717a8f174cf71ea6c236967d21d356c
  • Loading branch information
邝昌浪 committed Oct 31, 2017
1 parent f118cd9 commit ff11a6a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 13 deletions.
49 changes: 36 additions & 13 deletions hystrix/rolling/rolling.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package rolling

import (
"errors"
"sync"
"time"
)

// Number tracks a numberBucket over a bounded number of
// time buckets. Currently the buckets are one second long and only the last 10 seconds are kept.
const (
defaultWindow = 10
)

var (
ErrNegativeWindow = errors.New("window must be positive integer")
)

// Number tracks a numberBucket over a bounded Number of
// time buckets. Currently the buckets are one second long and only the last {window} seconds are kept.
type Number struct {
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex
window int64 // seconds
}

type numberBucket struct {
Expand All @@ -19,12 +29,25 @@ type numberBucket struct {
// NewNumber initializes a RollingNumber struct.
func NewNumber() *Number {
r := &Number{
Buckets: make(map[int64]*numberBucket),
Buckets: make(map[int64]*numberBucket, defaultWindow),
Mutex: &sync.RWMutex{},
window: defaultWindow,
}
return r
}

// NewNumberWithWindow initializes a RollingNumber with window given
func NewNumberWithWindow(window int64) (*Number, error) {
if window < 0 {
return nil, ErrNegativeWindow
}
return &Number{
Buckets: make(map[int64]*numberBucket, window),
Mutex: &sync.RWMutex{},
window: window,
}, nil
}

func (r *Number) getCurrentBucket() *numberBucket {
now := time.Now().Unix()
var bucket *numberBucket
Expand All @@ -39,17 +62,16 @@ func (r *Number) getCurrentBucket() *numberBucket {
}

func (r *Number) removeOldBuckets() {
now := time.Now().Unix() - 10
now := time.Now().Unix() - r.window

for timestamp := range r.Buckets {
// TODO: configurable rolling window
if timestamp <= now {
delete(r.Buckets, timestamp)
}
}
}

// Increment increments the number in current timeBucket.
// Increment increments the Number in current timeBucket.
func (r *Number) Increment(i float64) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
Expand All @@ -71,33 +93,31 @@ func (r *Number) UpdateMax(n float64) {
r.removeOldBuckets()
}

// Sum sums the values over the buckets in the last 10 seconds.
// Sum sums the values over the buckets in the last {window} seconds.
func (r *Number) Sum(now time.Time) float64 {
sum := float64(0)

r.Mutex.RLock()
defer r.Mutex.RUnlock()

for timestamp, bucket := range r.Buckets {
// TODO: configurable rolling window
if timestamp >= now.Unix()-10 {
if timestamp >= now.Unix()-r.window {
sum += bucket.Value
}
}

return sum
}

// Max returns the maximum value seen in the last 10 seconds.
// Max returns the maximum value seen in the last {window} seconds.
func (r *Number) Max(now time.Time) float64 {
var max float64

r.Mutex.RLock()
defer r.Mutex.RUnlock()

for timestamp, bucket := range r.Buckets {
// TODO: configurable rolling window
if timestamp >= now.Unix()-10 {
if timestamp >= now.Unix()-r.window {
if bucket.Value > max {
max = bucket.Value
}
Expand All @@ -108,5 +128,8 @@ func (r *Number) Max(now time.Time) float64 {
}

func (r *Number) Avg(now time.Time) float64 {
return r.Sum(now) / 10
if r.window == 0 { // unexpected 0
return r.Sum(now)
}
return r.Sum(now) / float64(r.window)
}
14 changes: 14 additions & 0 deletions hystrix/rolling/rolling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ func TestAvg(t *testing.T) {
})
}

func TestWindowAvg(t *testing.T) {
Convey("when adding values to a rolling number", t, func() {
n, _ := NewNumberWithWindow(2)
for _, x := range []float64{0.5, 1.5, 2.5, 3.5, 4.5} {
n.Increment(x)
time.Sleep(1 * time.Second)
}

Convey("it should calculate the average over the number of configured buckets", func() {
So(n.Avg(time.Now()), ShouldEqual, 4) // (3.5+4.5)/2
})
})
}

func BenchmarkRollingNumberIncrement(b *testing.B) {
n := NewNumber()

Expand Down

0 comments on commit ff11a6a

Please sign in to comment.