Skip to content

Commit

Permalink
Use thread-local RNGs for jitters (#48860)
Browse files Browse the repository at this point in the history
* Use math/rand/v2's thread-local RNG for jitters

* Replace NewJitter() with DefaultJitter

* Replace NewFullJitter() with FullJitter

* Replace NewHalfJitter() with HalfJitter

* Replace NewSeventhJitter() with SeventhJitter

* Inline jitters in lib/utils/diagnostics/latency

* Replace var jitters in lib/utils with proxy functions

* Replace utils.HalfJitter with retryutils.HalfJitter

* Replace utils.FullJitter with retryutils.FullJitter

* Replace utils.SeventhJitter with retryutils.SeventhJitter

* Replace sharded jitters with thread-local ones

* Remove NewSeventhJitter reference in integrations
  • Loading branch information
espadolini authored Nov 13, 2024
1 parent f84405d commit a92e49f
Show file tree
Hide file tree
Showing 58 changed files with 266 additions and 394 deletions.
2 changes: 1 addition & 1 deletion api/breaker/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (c *Config) CheckAndSetDefaults() error {
c.IsSuccessful = NonNilErrorIsSuccess
}

c.TrippedPeriod = retryutils.NewSeventhJitter()(c.TrippedPeriod)
c.TrippedPeriod = retryutils.SeventhJitter(c.TrippedPeriod)

return nil
}
Expand Down
189 changes: 73 additions & 116 deletions api/utils/retryutils/jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,141 +15,98 @@
package retryutils

import (
"math/rand"
"sync"
"sync/atomic"
"math/rand/v2"
"time"

"github.com/gravitational/trace"
)

// Jitter is a function which applies random jitter to a
// duration. Used to randomize backoff values. Must be
// safe for concurrent usage.
// Jitter is a function which applies random jitter to a duration. Used to
// randomize backoff values. Must be safe for concurrent usage.
type Jitter func(time.Duration) time.Duration

// NewJitter builds a new default jitter (currently jitters on
// the range [d/2,d), but this is subject to change).
func NewJitter() Jitter {
return NewHalfJitter()
}
// NewJitter returns a default jitter (currently [HalfJitter], i.e. a jitter on
// the range [d/2, d), but this is subject to change).
//
// Deprecated: use DefaultJitter directly instead.
func NewJitter() Jitter { return DefaultJitter }

// NewFullJitter builds a new jitter on the range [0,d). Most use-cases
// are better served by a jitter with a meaningful minimum value, but if
// the *only* purpose of the jitter is to spread out retries to the greatest
// extent possible (e.g. when retrying a CompareAndSwap operation), a full jitter
// may be appropriate.
func NewFullJitter() Jitter {
jitter, _ := newJitter(1, newDefaultRng())
return jitter
}
// DefaultJitter is a default jitter (currently [HalfJitter], i.e. a jitter on
// the range [d/2, d), but this is subject to change).
func DefaultJitter(d time.Duration) time.Duration { return HalfJitter(d) }

// NewShardedFullJitter is equivalent to NewFullJitter except that it
// performs better under high concurrency at the cost of having a larger
// footprint in memory.
func NewShardedFullJitter() Jitter {
jitter, _ := newShardedJitter(1, newDefaultRng)
return jitter
}
// NewFullJitter returns [FullJitter], i.e. a jitter on the full [0, d) range.
//
// Deprecated: use FullJitter directly instead.
func NewFullJitter() Jitter { return FullJitter }

// NewHalfJitter returns a new jitter on the range [d/2,d). This is
// a large range and most suitable for jittering things like backoff
// operations where breaking cycles quickly is a priority.
func NewHalfJitter() Jitter {
jitter, _ := newJitter(2, newDefaultRng())
return jitter
}
// NewShardedFullJitter returns [FullJitter], i.e. a jitter on the full [0, d)
// range.
//
// Deprecated: use FullJitter directly instead.
func NewShardedFullJitter() Jitter { return FullJitter }

// NewShardedHalfJitter is equivalent to NewHalfJitter except that it
// performs better under high concurrency at the cost of having a larger
// footprint in memory.
func NewShardedHalfJitter() Jitter {
jitter, _ := newShardedJitter(2, newDefaultRng)
return jitter
}
// NewHalfJitter returns [HalfJitter], i.e. a jitter on the range [d/2, d).
//
// Deprecated: use HalfJitter directly instead.
func NewHalfJitter() Jitter { return HalfJitter }

// NewSeventhJitter builds a new jitter on the range [6d/7,d). Prefer smaller
// jitters such as this when jittering periodic operations (e.g. cert rotation
// checks) since large jitters result in significantly increased load.
func NewSeventhJitter() Jitter {
jitter, _ := newJitter(7, newDefaultRng())
return jitter
}
// NewShardedHalfJitter returns [HalfJitter], i.e. a jitter on the range [d/2,
// d).
//
// Deprecated: use HalfJitter directly instead.
func NewShardedHalfJitter() Jitter { return HalfJitter }

// NewShardedSeventhJitter is equivalent to NewSeventhJitter except that it
// performs better under high concurrency at the cost of having a larger
// footprint in memory.
func NewShardedSeventhJitter() Jitter {
jitter, _ := newShardedJitter(7, newDefaultRng)
return jitter
}
// NewSeventhJitter returns [SeventhJitter], i.e. a jitter on the range [6d/7,
// d).
//
// Deprecated: use SeventhJitter directly instead.
func NewSeventhJitter() Jitter { return SeventhJitter }

func newDefaultRng() rng {
return rand.New(rand.NewSource(time.Now().UnixNano()))
}
// NewShardedSeventhJitter returns [SeventhJitter], i.e. a jitter on the range
// [6d/7, d).
//
// Deprecated: use SeventhJitter directly instead.
func NewShardedSeventhJitter() Jitter { return SeventhJitter }

// FullJitter is a jitter on the range [0, d). Most use-cases are better served
// by a jitter with a meaningful minimum value, but if the *only* purpose of the
// jitter is to spread out retries to the greatest extent possible (e.g. when
// retrying a ConditionalUpdate operation), a full jitter may be appropriate.
func FullJitter(d time.Duration) time.Duration {
if d < 1 {
return 0
}

// rng is an interface implemented by math/rand.Rand. This interface
// is used in testting.
type rng interface {
// Int63n returns, as an int64, a non-negative pseudo-random number
// in the half-open interval [0,n). It panics if n <= 0.
Int63n(n int64) int64
return time.Duration(rand.Int64N(int64(d)))
}

// newJitter builds a new jitter on the range [d*(n-1)/n,d)
// newJitter only returns an error if n < 1.
func newJitter(n time.Duration, rng rng) (Jitter, error) {
if n < 1 {
return nil, trace.BadParameter("newJitter expects n>=1, but got %v", n)
// HalfJitter is a jitter on the range [d/2, d). This is a large range and most
// suitable for jittering things like backoff operations where breaking cycles
// quickly is a priority.
func HalfJitter(d time.Duration) time.Duration {
if d < 1 {
return 0
}

frac := d / 2
if frac < 1 {
return d
}
var mu sync.Mutex
return func(d time.Duration) time.Duration {
// values less than 1 cause rng to panic, and some logic
// relies on treating zero duration as non-blocking case.
if d < 1 {
return 0
}
mu.Lock()
defer mu.Unlock()
return d*(n-1)/n + time.Duration(rng.Int63n(int64(d))/int64(n))
}, nil

return d - frac + time.Duration(rand.Int64N(int64(frac)))
}

// newShardedJitter constructs a new sharded jitter instance on the range [d*(n-1)/n,d)
// newShardedJitter only returns an error if n < 1.
func newShardedJitter(n time.Duration, mkrng func() rng) (Jitter, error) {
// the shard count here is pretty arbitrary. it was selected based on
// fiddling with some benchmarks. seems to be a good balance between
// limiting size and maximing perf under 100k concurrent calls
const shards = 64
// SeventhJitter returns a jitter on the range [6d/7, d). Prefer smaller jitters
// such as this when jittering periodic operations (e.g. cert rotation checks)
// since large jitters result in significantly increased load.
func SeventhJitter(d time.Duration) time.Duration {
if d < 1 {
return 0
}

if n < 1 {
return nil, trace.BadParameter("newShardedJitter expects n>=1, but got %v", n)
frac := d / 7
if frac < 1 {
return d
}

var rngs [shards]rng
var mus [shards]sync.Mutex
var ctr atomic.Uint64
var initOnce sync.Once

return func(d time.Duration) time.Duration {
// rng's allocate >4kb each during init, which is a bit annoying if the jitter
// isn't actually being used (e.g. when importing a package that has a global jitter).
// best to allocate lazily (this has no measurable impact on benchmarks).
initOnce.Do(func() {
for i := range rngs {
rngs[i] = mkrng()
}
})
// values less than 1 cause rng to panic, and some logic
// relies on treating zero duration as non-blocking case.
if d < 1 {
return 0
}
idx := ctr.Add(1) % shards
mus[idx].Lock()
r := d*(n-1)/n + time.Duration(rngs[idx].Int63n(int64(d))/int64(n))
mus[idx].Unlock()
return r
}, nil
return d - frac + time.Duration(rand.Int64N(int64(frac)))
}
Loading

0 comments on commit a92e49f

Please sign in to comment.