-
Notifications
You must be signed in to change notification settings - Fork 0
/
counter.go
68 lines (57 loc) · 1.65 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package bulwark
// Source: https://github.com/bradenaw/backpressure
import (
"time"
)
// windowedCounter counts events in an approximate time window. It does this by splitting time into
// buckets of some width and removing buckets that are too old.
type windowedCounter struct {
// The width of a single bucket.
width time.Duration
// The last time the bucket was read or written.
last time.Time
// The sum of all buckets.
count int
// The count of evens that happened in each bucket. This is a circular buffer.
buckets []int
// The index of the 'head' of the circular buffer, that is, the bucket that corresponds to
// `last`.
head int
}
func newWindowedCounter(now time.Time, width time.Duration, n int) windowedCounter {
return windowedCounter{
width: width,
last: now,
buckets: make([]int, n),
}
}
func (c *windowedCounter) add(now time.Time, x int) {
c.get(now)
c.buckets[c.head] += x
c.count += x
}
func (c *windowedCounter) get(now time.Time) int {
elapsed := now.Sub(c.last)
// How many buckets have we passed since `last`?
bucketsPassed := int(elapsed / c.width)
if bucketsPassed < 0 {
bucketsPassed = 0
}
// Since it's a circular buffer, passing more than all of the buckets is the same as passing all
// of them.
if bucketsPassed >= len(c.buckets) {
bucketsPassed = len(c.buckets)
}
// For all of the buckets that already happened, zero them out, advance head, and remove their
// amounts from c.count.
for i := 0; i < bucketsPassed; i++ {
nextIdx := (c.head + 1) % len(c.buckets)
c.count -= c.buckets[nextIdx]
c.buckets[nextIdx] = 0
c.head = nextIdx
}
if bucketsPassed > 0 {
c.last = now
}
return c.count
}