diff --git a/hystrix/rolling/rolling.go b/hystrix/rolling/rolling.go index 5a69481..9c43d21 100644 --- a/hystrix/rolling/rolling.go +++ b/hystrix/rolling/rolling.go @@ -1,15 +1,25 @@ package rolling import ( + "errors" "sync" "time" ) -// Number tracks a numberBucket over a bounded number of -// time buckets. Currently the buckets are one second long and only the last 10 seconds are kept. +const ( + defaultWindow = 10 +) + +var ( + ErrNegativeWindow = errors.New("window must be positive integer") +) + +// Number tracks a numberBucket over a bounded Number of +// time buckets. Currently the buckets are one second long and only the last {window} seconds are kept. type Number struct { Buckets map[int64]*numberBucket Mutex *sync.RWMutex + window int64 // seconds } type numberBucket struct { @@ -19,12 +29,25 @@ type numberBucket struct { // NewNumber initializes a RollingNumber struct. func NewNumber() *Number { r := &Number{ - Buckets: make(map[int64]*numberBucket), + Buckets: make(map[int64]*numberBucket, defaultWindow), Mutex: &sync.RWMutex{}, + window: defaultWindow, } return r } +// NewNumberWithWindow initializes a RollingNumber with window given +func NewNumberWithWindow(window int64) (*Number, error) { + if window <= 0 { + return nil, ErrNegativeWindow + } + return &Number{ + Buckets: make(map[int64]*numberBucket, window), + Mutex: &sync.RWMutex{}, + window: window, + }, nil +} + func (r *Number) getCurrentBucket() *numberBucket { now := time.Now().Unix() var bucket *numberBucket @@ -39,17 +62,16 @@ func (r *Number) getCurrentBucket() *numberBucket { } func (r *Number) removeOldBuckets() { - now := time.Now().Unix() - 10 + now := time.Now().Unix() - r.window for timestamp := range r.Buckets { - // TODO: configurable rolling window if timestamp <= now { delete(r.Buckets, timestamp) } } } -// Increment increments the number in current timeBucket. +// Increment increments the Number in current timeBucket. func (r *Number) Increment(i float64) { r.Mutex.Lock() defer r.Mutex.Unlock() @@ -71,7 +93,7 @@ func (r *Number) UpdateMax(n float64) { r.removeOldBuckets() } -// Sum sums the values over the buckets in the last 10 seconds. +// Sum sums the values over the buckets in the last {window} seconds. func (r *Number) Sum(now time.Time) float64 { sum := float64(0) @@ -79,8 +101,7 @@ func (r *Number) Sum(now time.Time) float64 { defer r.Mutex.RUnlock() for timestamp, bucket := range r.Buckets { - // TODO: configurable rolling window - if timestamp >= now.Unix()-10 { + if timestamp >= now.Unix()-r.window { sum += bucket.Value } } @@ -88,7 +109,7 @@ func (r *Number) Sum(now time.Time) float64 { return sum } -// Max returns the maximum value seen in the last 10 seconds. +// Max returns the maximum value seen in the last {window} seconds. func (r *Number) Max(now time.Time) float64 { var max float64 @@ -96,8 +117,7 @@ func (r *Number) Max(now time.Time) float64 { defer r.Mutex.RUnlock() for timestamp, bucket := range r.Buckets { - // TODO: configurable rolling window - if timestamp >= now.Unix()-10 { + if timestamp >= now.Unix()-r.window { if bucket.Value > max { max = bucket.Value } @@ -108,5 +128,8 @@ func (r *Number) Max(now time.Time) float64 { } func (r *Number) Avg(now time.Time) float64 { - return r.Sum(now) / 10 + if r.window == 0 { // unexpected 0 + return r.Sum(now) + } + return r.Sum(now) / float64(r.window) } diff --git a/hystrix/rolling/rolling_test.go b/hystrix/rolling/rolling_test.go index f784024..c559844 100644 --- a/hystrix/rolling/rolling_test.go +++ b/hystrix/rolling/rolling_test.go @@ -36,6 +36,20 @@ func TestAvg(t *testing.T) { }) } +func TestWindowAvg(t *testing.T) { + Convey("when adding values to a rolling number", t, func() { + n, _ := NewNumberWithWindow(2) + for _, x := range []float64{0.5, 1.5, 2.5, 3.5, 4.5} { + n.Increment(x) + time.Sleep(1 * time.Second) + } + + Convey("it should calculate the average over the number of configured buckets", func() { + So(n.Avg(time.Now()), ShouldEqual, 4) // (3.5+4.5)/2 + }) + }) +} + func BenchmarkRollingNumberIncrement(b *testing.B) { n := NewNumber()