diff --git a/api/breaker/breaker.go b/api/breaker/breaker.go index a7cc26fe47e71..affc82a39cd79 100644 --- a/api/breaker/breaker.go +++ b/api/breaker/breaker.go @@ -275,7 +275,7 @@ func (c *Config) CheckAndSetDefaults() error { c.IsSuccessful = NonNilErrorIsSuccess } - c.TrippedPeriod = retryutils.NewSeventhJitter()(c.TrippedPeriod) + c.TrippedPeriod = retryutils.SeventhJitter(c.TrippedPeriod) return nil } diff --git a/api/utils/retryutils/jitter.go b/api/utils/retryutils/jitter.go index 828cf9241c32f..caa20a9647b40 100644 --- a/api/utils/retryutils/jitter.go +++ b/api/utils/retryutils/jitter.go @@ -15,141 +15,98 @@ package retryutils import ( - "math/rand" - "sync" - "sync/atomic" + "math/rand/v2" "time" - - "github.com/gravitational/trace" ) -// Jitter is a function which applies random jitter to a -// duration. Used to randomize backoff values. Must be -// safe for concurrent usage. +// Jitter is a function which applies random jitter to a duration. Used to +// randomize backoff values. Must be safe for concurrent usage. type Jitter func(time.Duration) time.Duration -// NewJitter builds a new default jitter (currently jitters on -// the range [d/2,d), but this is subject to change). -func NewJitter() Jitter { - return NewHalfJitter() -} +// NewJitter returns a default jitter (currently [HalfJitter], i.e. a jitter on +// the range [d/2, d), but this is subject to change). +// +// Deprecated: use DefaultJitter directly instead. +func NewJitter() Jitter { return DefaultJitter } -// NewFullJitter builds a new jitter on the range [0,d). Most use-cases -// are better served by a jitter with a meaningful minimum value, but if -// the *only* purpose of the jitter is to spread out retries to the greatest -// extent possible (e.g. when retrying a CompareAndSwap operation), a full jitter -// may be appropriate. -func NewFullJitter() Jitter { - jitter, _ := newJitter(1, newDefaultRng()) - return jitter -} +// DefaultJitter is a default jitter (currently [HalfJitter], i.e. a jitter on +// the range [d/2, d), but this is subject to change). +func DefaultJitter(d time.Duration) time.Duration { return HalfJitter(d) } -// NewShardedFullJitter is equivalent to NewFullJitter except that it -// performs better under high concurrency at the cost of having a larger -// footprint in memory. -func NewShardedFullJitter() Jitter { - jitter, _ := newShardedJitter(1, newDefaultRng) - return jitter -} +// NewFullJitter returns [FullJitter], i.e. a jitter on the full [0, d) range. +// +// Deprecated: use FullJitter directly instead. +func NewFullJitter() Jitter { return FullJitter } -// NewHalfJitter returns a new jitter on the range [d/2,d). This is -// a large range and most suitable for jittering things like backoff -// operations where breaking cycles quickly is a priority. -func NewHalfJitter() Jitter { - jitter, _ := newJitter(2, newDefaultRng()) - return jitter -} +// NewShardedFullJitter returns [FullJitter], i.e. a jitter on the full [0, d) +// range. +// +// Deprecated: use FullJitter directly instead. +func NewShardedFullJitter() Jitter { return FullJitter } -// NewShardedHalfJitter is equivalent to NewHalfJitter except that it -// performs better under high concurrency at the cost of having a larger -// footprint in memory. -func NewShardedHalfJitter() Jitter { - jitter, _ := newShardedJitter(2, newDefaultRng) - return jitter -} +// NewHalfJitter returns [HalfJitter], i.e. a jitter on the range [d/2, d). +// +// Deprecated: use HalfJitter directly instead. +func NewHalfJitter() Jitter { return HalfJitter } -// NewSeventhJitter builds a new jitter on the range [6d/7,d). Prefer smaller -// jitters such as this when jittering periodic operations (e.g. cert rotation -// checks) since large jitters result in significantly increased load. -func NewSeventhJitter() Jitter { - jitter, _ := newJitter(7, newDefaultRng()) - return jitter -} +// NewShardedHalfJitter returns [HalfJitter], i.e. a jitter on the range [d/2, +// d). +// +// Deprecated: use HalfJitter directly instead. +func NewShardedHalfJitter() Jitter { return HalfJitter } -// NewShardedSeventhJitter is equivalent to NewSeventhJitter except that it -// performs better under high concurrency at the cost of having a larger -// footprint in memory. -func NewShardedSeventhJitter() Jitter { - jitter, _ := newShardedJitter(7, newDefaultRng) - return jitter -} +// NewSeventhJitter returns [SeventhJitter], i.e. a jitter on the range [6d/7, +// d). +// +// Deprecated: use SeventhJitter directly instead. +func NewSeventhJitter() Jitter { return SeventhJitter } -func newDefaultRng() rng { - return rand.New(rand.NewSource(time.Now().UnixNano())) -} +// NewShardedSeventhJitter returns [SeventhJitter], i.e. a jitter on the range +// [6d/7, d). +// +// Deprecated: use SeventhJitter directly instead. +func NewShardedSeventhJitter() Jitter { return SeventhJitter } + +// FullJitter is a jitter on the range [0, d). Most use-cases are better served +// by a jitter with a meaningful minimum value, but if the *only* purpose of the +// jitter is to spread out retries to the greatest extent possible (e.g. when +// retrying a ConditionalUpdate operation), a full jitter may be appropriate. +func FullJitter(d time.Duration) time.Duration { + if d < 1 { + return 0 + } -// rng is an interface implemented by math/rand.Rand. This interface -// is used in testting. -type rng interface { - // Int63n returns, as an int64, a non-negative pseudo-random number - // in the half-open interval [0,n). It panics if n <= 0. - Int63n(n int64) int64 + return time.Duration(rand.Int64N(int64(d))) } -// newJitter builds a new jitter on the range [d*(n-1)/n,d) -// newJitter only returns an error if n < 1. -func newJitter(n time.Duration, rng rng) (Jitter, error) { - if n < 1 { - return nil, trace.BadParameter("newJitter expects n>=1, but got %v", n) +// HalfJitter is a jitter on the range [d/2, d). This is a large range and most +// suitable for jittering things like backoff operations where breaking cycles +// quickly is a priority. +func HalfJitter(d time.Duration) time.Duration { + if d < 1 { + return 0 + } + + frac := d / 2 + if frac < 1 { + return d } - var mu sync.Mutex - return func(d time.Duration) time.Duration { - // values less than 1 cause rng to panic, and some logic - // relies on treating zero duration as non-blocking case. - if d < 1 { - return 0 - } - mu.Lock() - defer mu.Unlock() - return d*(n-1)/n + time.Duration(rng.Int63n(int64(d))/int64(n)) - }, nil + + return d - frac + time.Duration(rand.Int64N(int64(frac))) } -// newShardedJitter constructs a new sharded jitter instance on the range [d*(n-1)/n,d) -// newShardedJitter only returns an error if n < 1. -func newShardedJitter(n time.Duration, mkrng func() rng) (Jitter, error) { - // the shard count here is pretty arbitrary. it was selected based on - // fiddling with some benchmarks. seems to be a good balance between - // limiting size and maximing perf under 100k concurrent calls - const shards = 64 +// SeventhJitter returns a jitter on the range [6d/7, d). Prefer smaller jitters +// such as this when jittering periodic operations (e.g. cert rotation checks) +// since large jitters result in significantly increased load. +func SeventhJitter(d time.Duration) time.Duration { + if d < 1 { + return 0 + } - if n < 1 { - return nil, trace.BadParameter("newShardedJitter expects n>=1, but got %v", n) + frac := d / 7 + if frac < 1 { + return d } - var rngs [shards]rng - var mus [shards]sync.Mutex - var ctr atomic.Uint64 - var initOnce sync.Once - - return func(d time.Duration) time.Duration { - // rng's allocate >4kb each during init, which is a bit annoying if the jitter - // isn't actually being used (e.g. when importing a package that has a global jitter). - // best to allocate lazily (this has no measurable impact on benchmarks). - initOnce.Do(func() { - for i := range rngs { - rngs[i] = mkrng() - } - }) - // values less than 1 cause rng to panic, and some logic - // relies on treating zero duration as non-blocking case. - if d < 1 { - return 0 - } - idx := ctr.Add(1) % shards - mus[idx].Lock() - r := d*(n-1)/n + time.Duration(rngs[idx].Int63n(int64(d))/int64(n)) - mus[idx].Unlock() - return r - }, nil + return d - frac + time.Duration(rand.Int64N(int64(frac))) } diff --git a/api/utils/retryutils/jitter_test.go b/api/utils/retryutils/jitter_test.go index d826c1b392ea1..398cafd0ebe56 100644 --- a/api/utils/retryutils/jitter_test.go +++ b/api/utils/retryutils/jitter_test.go @@ -18,139 +18,109 @@ package retryutils import ( "fmt" - "runtime" + "sync" + "sync/atomic" "testing" "time" - "github.com/gravitational/trace" "github.com/stretchr/testify/require" ) -func TestNewJitterBadParameter(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - n time.Duration - assertErr require.ErrorAssertionFunc - }{ - { - n: -1, - assertErr: func(t require.TestingT, err error, i ...interface{}) { - require.True(t, trace.IsBadParameter(err), err) - }, - }, - { - n: 0, - assertErr: func(t require.TestingT, err error, i ...interface{}) { - require.True(t, trace.IsBadParameter(err), err) - }, - }, - { - n: 1, - assertErr: require.NoError, - }, - { - n: 7, - assertErr: require.NoError, - }, - } { - t.Run(fmt.Sprintf("n=%v", tc.n), func(t *testing.T) { - _, err := newJitter(tc.n, nil) - tc.assertErr(t, err) - _, err = newShardedJitter(tc.n, nil) - tc.assertErr(t, err) - }) - } -} - func TestNewJitter(t *testing.T) { t.Parallel() - baseDuration := time.Second - mockInt63nFloor := mockInt63n(func(n int64) int64 { return 0 }) - mockInt63nCeiling := mockInt63n(func(n int64) int64 { return n - 1 }) - + const baseDuration time.Duration = time.Microsecond for _, tc := range []struct { desc string - n time.Duration + jitter Jitter expectFloor time.Duration expectCeiling time.Duration }{ { desc: "FullJitter", - n: 1, + jitter: FullJitter, expectFloor: 0, expectCeiling: baseDuration - 1, }, { desc: "HalfJitter", - n: 2, + jitter: HalfJitter, expectFloor: baseDuration / 2, expectCeiling: baseDuration - 1, }, { desc: "SeventhJitter", - n: 7, - expectFloor: baseDuration * 6 / 7, + jitter: SeventhJitter, + expectFloor: baseDuration - baseDuration/7, expectCeiling: baseDuration - 1, }, } { - tc := tc t.Run(tc.desc, func(t *testing.T) { t.Parallel() - testFloorJitter, err := newJitter(tc.n, mockInt63nFloor) - require.NoError(t, err) - require.Equal(t, tc.expectFloor, testFloorJitter(baseDuration)) - - testFloorJitter, err = newShardedJitter(tc.n, func() rng { return mockInt63nFloor }) - require.NoError(t, err) - require.Equal(t, tc.expectFloor, testFloorJitter(baseDuration)) - - testCeilingJitter, err := newJitter(tc.n, mockInt63nCeiling) - require.NoError(t, err) - require.Equal(t, tc.expectCeiling, testCeilingJitter(baseDuration)) - - testCeilingJitter, err = newShardedJitter(tc.n, func() rng { return mockInt63nCeiling }) - require.NoError(t, err) - require.Equal(t, tc.expectCeiling, testCeilingJitter(baseDuration)) + var gotFloor, gotCeiling bool + for !gotFloor || !gotCeiling { + d := tc.jitter(baseDuration) + require.GreaterOrEqual(t, d, tc.expectFloor) + if d == tc.expectFloor { + gotFloor = true + } + require.LessOrEqual(t, d, tc.expectCeiling) + if d == tc.expectCeiling { + gotCeiling = true + } + } }) } } -type mockInt63n func(n int64) int64 - -func (m mockInt63n) Int63n(n int64) int64 { - return m(n) +func mutexedSeventhJitter() Jitter { + var mu sync.Mutex + return func(d time.Duration) time.Duration { + mu.Lock() + defer mu.Unlock() + return SeventhJitter(d) + } } -// BenchmarkJitter is an attempt to check the effect of concurrency on the performance -// of a global jitter instance. I'm a bit skeptical of how "true to life" this benchmark -// really is, but the results would seem to indicate that >100k concurrent jitters would -// still all complete in <1s, which is very good for our purposes. -func BenchmarkSingleJitter(b *testing.B) { - benchmarkSharedJitter(b, NewHalfJitter()) -} +func shardedSeventhJitter() Jitter { + const shards = 64 -func BenchmarkShardedJitter(b *testing.B) { - benchmarkSharedJitter(b, NewShardedHalfJitter()) -} + var jitters [shards]Jitter + for i := range jitters { + jitters[i] = mutexedSeventhJitter() + } + var ctr atomic.Uint64 -func benchmarkSharedJitter(b *testing.B, jitter Jitter) { - benchmarkJitter(b, func() Jitter { return jitter }) + return func(d time.Duration) time.Duration { + return jitters[ctr.Add(1)%shards](d) + } } -func benchmarkJitter(b *testing.B, mkjitter func() Jitter) { - procs := runtime.GOMAXPROCS(0) - for n := procs; n < 200_000; n = n * 2 { - b.Run(fmt.Sprintf("n%d", n), func(b *testing.B) { - b.SetParallelism(n / procs) - b.RunParallel(func(pb *testing.PB) { - jitter := mkjitter() - for pb.Next() { - jitter(time.Hour) - } - }) +func BenchmarkJitter(b *testing.B) { + impls := map[string]Jitter{ + "old_global": mutexedSeventhJitter(), + "old_sharded": shardedSeventhJitter(), + "new": SeventhJitter, + } + for impl, jitter := range impls { + b.Run("impl="+impl, func(b *testing.B) { + for parShift := range 6 { + par := 1 << (parShift * 4) + b.Run(fmt.Sprintf("par=%d", par), func(b *testing.B) { + var wg sync.WaitGroup + wg.Add(par) + for range par { + go func() { + defer wg.Done() + for range b.N { + jitter(time.Hour) + } + }() + } + wg.Wait() + }) + } }) } } diff --git a/api/utils/retryutils/retry_test.go b/api/utils/retryutils/retry_test.go index 63bfd0b8a50db..dbbd7450394b4 100644 --- a/api/utils/retryutils/retry_test.go +++ b/api/utils/retryutils/retry_test.go @@ -105,7 +105,7 @@ func TestLinearRetryMax(t *testing.T) { First: time.Second * 45, Step: time.Second * 30, Max: time.Minute, - Jitter: NewFullJitter(), + Jitter: FullJitter, }, previousCompareFn: require.NotEqual, }, @@ -115,7 +115,7 @@ func TestLinearRetryMax(t *testing.T) { First: time.Second * 45, Step: time.Second * 30, Max: time.Minute, - Jitter: NewHalfJitter(), + Jitter: HalfJitter, }, previousCompareFn: require.NotEqual, }, @@ -125,7 +125,7 @@ func TestLinearRetryMax(t *testing.T) { First: time.Second * 45, Step: time.Second * 30, Max: time.Minute, - Jitter: NewSeventhJitter(), + Jitter: SeventhJitter, }, previousCompareFn: require.NotEqual, }, diff --git a/integrations/access/accesslist/app.go b/integrations/access/accesslist/app.go index 16c54d10b416b..02f933baf5ecd 100644 --- a/integrations/access/accesslist/app.go +++ b/integrations/access/accesslist/app.go @@ -122,7 +122,7 @@ func (a *App) run(ctx context.Context) error { a.job.SetReady(true) - jitter := retryutils.NewSeventhJitter() + jitter := retryutils.SeventhJitter timer := a.clock.NewTimer(jitter(30 * time.Second)) defer timer.Stop() diff --git a/integrations/event-handler/session_events_job.go b/integrations/event-handler/session_events_job.go index 4bf51a8f4e34a..e462345cf3a86 100644 --- a/integrations/event-handler/session_events_job.go +++ b/integrations/event-handler/session_events_job.go @@ -239,7 +239,7 @@ func (j *SessionEventsJob) processMissingRecordings(ctx context.Context) error { return nil }) - jitter := retryutils.NewSeventhJitter() + jitter := retryutils.SeventhJitter timer := time.NewTimer(jitter(initialProcessingDelay)) defer timer.Stop() diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 12f96f8ec1cd1..dcbcf632b598d 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -1316,12 +1316,12 @@ const ( // runPeriodicOperations runs some periodic bookkeeping operations // performed by auth server func (a *Server) runPeriodicOperations() { - firstReleaseCheck := utils.FullJitter(time.Hour * 6) + firstReleaseCheck := retryutils.FullJitter(time.Hour * 6) // this environment variable is "unstable" since it will be deprecated // by an upcoming tctl command. currently exists for testing purposes only. if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" { - firstReleaseCheck = utils.HalfJitter(time.Second * 10) + firstReleaseCheck = retryutils.HalfJitter(time.Second * 10) } // run periodic functions with a semi-random period @@ -1341,25 +1341,25 @@ func (a *Server) runPeriodicOperations() { Key: metricsKey, Duration: defaults.PrometheusScrapeInterval, FirstDuration: 5 * time.Second, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }, interval.SubInterval[periodicIntervalKey]{ Key: instancePeriodicsKey, Duration: 9 * time.Minute, - FirstDuration: utils.HalfJitter(time.Minute), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.HalfJitter(time.Minute), + Jitter: retryutils.SeventhJitter, }, interval.SubInterval[periodicIntervalKey]{ Key: notificationsCleanupKey, Duration: 48 * time.Hour, - FirstDuration: utils.FullJitter(time.Hour), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.FullJitter(time.Hour), + Jitter: retryutils.SeventhJitter, }, interval.SubInterval[periodicIntervalKey]{ Key: roleCountKey, Duration: 12 * time.Hour, - FirstDuration: utils.FullJitter(time.Minute), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.FullJitter(time.Minute), + Jitter: retryutils.SeventhJitter, }, ) @@ -1372,13 +1372,13 @@ func (a *Server) runPeriodicOperations() { ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: dynamicLabelsCheckKey, Duration: dynamicLabelCheckPeriod, - FirstDuration: utils.HalfJitter(10 * time.Second), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.HalfJitter(10 * time.Second), + Jitter: retryutils.SeventhJitter, }) ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: heartbeatCheckKey, Duration: apidefaults.ServerKeepAliveTTL() * 2, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: releaseCheckKey, @@ -1387,15 +1387,15 @@ func (a *Server) runPeriodicOperations() { // note the use of FullJitter for the releases check interval. this lets us ensure // that frequent restarts don't prevent checks from happening despite the infrequent // effective check rate. - Jitter: retryutils.NewFullJitter(), + Jitter: retryutils.FullJitter, }) // more frequent release check that just re-calculates alerts based on previously // pulled versioning info. ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: localReleaseCheckKey, Duration: 10 * time.Minute, - FirstDuration: utils.HalfJitter(10 * time.Second), - Jitter: retryutils.NewHalfJitter(), + FirstDuration: retryutils.HalfJitter(10 * time.Second), + Jitter: retryutils.HalfJitter, }) } @@ -1403,8 +1403,8 @@ func (a *Server) runPeriodicOperations() { ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: desktopCheckKey, Duration: OSSDesktopsCheckPeriod, - FirstDuration: utils.HalfJitter(10 * time.Second), - Jitter: retryutils.NewHalfJitter(), + FirstDuration: retryutils.HalfJitter(10 * time.Second), + Jitter: retryutils.HalfJitter, }) } else if err := a.DeleteClusterAlert(a.closeCtx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) { log.Warnf("Can't delete OSS non-AD desktops limit alert: %v", err) @@ -1415,7 +1415,7 @@ func (a *Server) runPeriodicOperations() { // reasonably small interval to ensure that users observe clusters as online within 1 minute of adding them. remoteClustersRefresh := interval.New(interval.Config{ Duration: time.Second * 40, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer remoteClustersRefresh.Stop() @@ -1435,8 +1435,8 @@ func (a *Server) runPeriodicOperations() { ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: upgradeWindowCheckKey, Duration: 3 * time.Minute, - FirstDuration: utils.FullJitter(30 * time.Second), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.FullJitter(30 * time.Second), + Jitter: retryutils.SeventhJitter, }) } diff --git a/lib/auth/client_tls_config_generator.go b/lib/auth/client_tls_config_generator.go index 391246ed657c3..9c29a9fad57b0 100644 --- a/lib/auth/client_tls_config_generator.go +++ b/lib/auth/client_tls_config_generator.go @@ -30,8 +30,8 @@ import ( "github.com/gravitational/teleport/api/types" apiutils "github.com/gravitational/teleport/api/utils" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth/authclient" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/genmap" ) @@ -204,7 +204,7 @@ func (c *ClientTLSConfigGenerator) refreshClientTLSConfigs(ctx context.Context) } select { - case <-time.After(utils.FullJitter(time.Second * 3)): + case <-time.After(retryutils.FullJitter(time.Second * 3)): case <-ctx.Done(): return } diff --git a/lib/auth/keystore/aws_kms.go b/lib/auth/keystore/aws_kms.go index a6c064969c089..536a940654a62 100644 --- a/lib/auth/keystore/aws_kms.go +++ b/lib/auth/keystore/aws_kms.go @@ -220,7 +220,7 @@ func (a *awsKMSKeystore) getPublicKeyDER(ctx context.Context, keyARN string) ([] First: pendingKeyBaseRetryInterval, Driver: retryutils.NewExponentialDriver(pendingKeyBaseRetryInterval), Max: pendingKeyMaxRetryInterval, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: a.clock, }) if err != nil { diff --git a/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go b/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go index 664d980c42a39..42d94ad03b093 100644 --- a/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go +++ b/lib/auth/machineid/machineidv1/spiffe_federation_syncer.go @@ -170,7 +170,7 @@ func NewSPIFFEFederationSyncer(cfg SPIFFEFederationSyncerConfig) (*SPIFFEFederat func (s *SPIFFEFederationSyncer) Run(ctx context.Context) error { // Loop to retry if acquiring lock fails, with some backoff to avoid pinning // the CPU. - waitWithJitter := retryutils.NewSeventhJitter()(time.Second * 10) + waitWithJitter := retryutils.SeventhJitter(time.Second * 10) for { err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ @@ -385,7 +385,7 @@ func (s *SPIFFEFederationSyncer) syncTrustDomainLoop( Step: time.Second, Max: time.Second * 10, Clock: s.cfg.Clock, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) if err != nil { log.ErrorContext( diff --git a/lib/auth/server_info.go b/lib/auth/server_info.go index abee67875e071..9a01a20a647f1 100644 --- a/lib/auth/server_info.go +++ b/lib/auth/server_info.go @@ -33,7 +33,6 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/utils" ) const serverInfoBatchSize = 100 @@ -58,10 +57,10 @@ type ServerInfoAccessPoint interface { // resources with their corresponding Teleport SSH servers. func ReconcileServerInfos(ctx context.Context, ap ServerInfoAccessPoint) error { retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.FullJitter(defaults.MaxWatcherBackoff / 10), + First: retryutils.FullJitter(defaults.MaxWatcherBackoff / 10), Step: defaults.MaxWatcherBackoff / 5, Max: defaults.MaxWatcherBackoff, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: ap.GetClock(), }) if err != nil { diff --git a/lib/automaticupgrades/version/basichttp.go b/lib/automaticupgrades/version/basichttp.go index 849ddc86108df..29bea8e325861 100644 --- a/lib/automaticupgrades/version/basichttp.go +++ b/lib/automaticupgrades/version/basichttp.go @@ -47,7 +47,7 @@ func (b *basicHTTPVersionClient) Get(ctx context.Context) (string, error) { versionURL := b.baseURL.JoinPath(constants.VersionPath) body, err := b.client.GetContent(ctx, *versionURL) if err != nil { - return "", trace.Wrap(err) + return "", trace.Wrap(err, "failed to get version from %s", versionURL) } response := string(body) if response == constants.NoVersion { diff --git a/lib/backend/dynamo/atomicwrite.go b/lib/backend/dynamo/atomicwrite.go index 19664b8c99090..4de200e5b97b1 100644 --- a/lib/backend/dynamo/atomicwrite.go +++ b/lib/backend/dynamo/atomicwrite.go @@ -34,7 +34,6 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/utils" ) const ( @@ -177,7 +176,7 @@ TxnLoop: First: time.Millisecond * 16, Driver: retryutils.NewExponentialDriver(time.Millisecond * 16), Max: time.Millisecond * 1024, - Jitter: utils.FullJitter, + Jitter: retryutils.FullJitter, }) if err != nil { diff --git a/lib/backend/etcdbk/etcd.go b/lib/backend/etcdbk/etcd.go index e95a4ad8729b3..8c1143c4bc8b6 100644 --- a/lib/backend/etcdbk/etcd.go +++ b/lib/backend/etcdbk/etcd.go @@ -47,6 +47,7 @@ import ( apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" apiutils "github.com/gravitational/teleport/api/utils" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/tlsca" @@ -266,11 +267,11 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke closeCtx, cancel := context.WithCancel(ctx) leaseCache, err := utils.NewFnCache(utils.FnCacheConfig{ - TTL: utils.SeventhJitter(time.Minute * 2), + TTL: retryutils.SeventhJitter(time.Minute * 2), Context: closeCtx, Clock: options.clock, ReloadOnErr: true, - CleanupInterval: utils.SeventhJitter(time.Minute * 2), + CleanupInterval: retryutils.SeventhJitter(time.Minute * 2), }) if err != nil { cancel() @@ -288,7 +289,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke ctx: closeCtx, watchDone: make(chan struct{}), buf: buf, - leaseBucket: utils.SeventhJitter(options.leaseBucket), + leaseBucket: retryutils.SeventhJitter(options.leaseBucket), leaseCache: leaseCache, } @@ -513,7 +514,7 @@ WatchEvents: // pause briefly to prevent excessive watcher creation attempts select { - case <-time.After(utils.HalfJitter(time.Millisecond * 1500)): + case <-time.After(retryutils.HalfJitter(time.Millisecond * 1500)): case <-b.ctx.Done(): break WatchEvents } diff --git a/lib/backend/firestore/firestorebk.go b/lib/backend/firestore/firestorebk.go index 96ac847a0be1e..71aaaf2c22076 100644 --- a/lib/backend/firestore/firestorebk.go +++ b/lib/backend/firestore/firestorebk.go @@ -47,7 +47,6 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" ) @@ -437,8 +436,8 @@ func New(ctx context.Context, params backend.Params, options Options) (*Backend, go func() { migrationInterval := interval.New(interval.Config{ Duration: time.Hour * 12, - FirstDuration: utils.FullJitter(time.Minute * 5), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.FullJitter(time.Minute * 5), + Jitter: retryutils.SeventhJitter, Clock: b.clock, }) defer migrationInterval.Stop() diff --git a/lib/backend/pgbk/common/utils.go b/lib/backend/pgbk/common/utils.go index fb4e19867b236..10caea2a17061 100644 --- a/lib/backend/pgbk/common/utils.go +++ b/lib/backend/pgbk/common/utils.go @@ -131,7 +131,7 @@ func retry[T any](ctx context.Context, log *slog.Logger, isIdempotent bool, f fu First: 0, Step: 100 * time.Millisecond, Max: 750 * time.Millisecond, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if retryErr != nil { var zeroT T diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 3a3894ee6686e..93a4067a1b81f 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -1095,10 +1095,10 @@ func New(config Config) (*Cache, error) { // Start the cache. Should only be called once. func (c *Cache) Start() error { retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ - First: utils.FullJitter(c.MaxRetryPeriod / 16), + First: retryutils.FullJitter(c.MaxRetryPeriod / 16), Driver: retryutils.NewExponentialDriver(c.MaxRetryPeriod / 16), Max: c.MaxRetryPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: c.Clock, }) if err != nil { @@ -1421,8 +1421,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer if c.EnableRelativeExpiry { relativeExpiryInterval = interval.New(interval.Config{ Duration: c.Config.RelativeExpiryCheckInterval, - FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.HalfJitter(c.Config.RelativeExpiryCheckInterval), + Jitter: retryutils.SeventhJitter, }) } defer relativeExpiryInterval.Stop() diff --git a/lib/client/client.go b/lib/client/client.go index c3e4e3cfabc7c..f14a5a2df476d 100644 --- a/lib/client/client.go +++ b/lib/client/client.go @@ -742,7 +742,7 @@ func proxyConnection(ctx context.Context, conn net.Conn, remoteAddr string, dial First: 100 * time.Millisecond, Step: 100 * time.Millisecond, Max: time.Second, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if err != nil { return trace.Wrap(err) diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index e4cdfe9736857..0b9655c90beb2 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -251,7 +251,7 @@ func (c *consumer) processEventsContinuously(ctx context.Context) { // Backend locking is used to make sure that only single auth is running consumer. func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProcessorFn func(context.Context)) { // for 1 minute it will be 5s sleep before retry which seems like reasonable value. - waitTimeAfterLockingError := retryutils.NewSeventhJitter()(c.batchMaxInterval / 12) + waitTimeAfterLockingError := retryutils.SeventhJitter(c.batchMaxInterval / 12) for { select { case <-ctx.Done(): diff --git a/lib/events/complete.go b/lib/events/complete.go index 327a886921812..de9022391a533 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -158,8 +158,8 @@ func (u *UploadCompleter) Serve(ctx context.Context) error { periodic := interval.New(interval.Config{ Clock: u.cfg.Clock, Duration: u.cfg.CheckPeriod, - FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.HalfJitter(u.cfg.CheckPeriod), + Jitter: retryutils.SeventhJitter, }) defer periodic.Stop() u.log.InfoContext(ctx, "upload completer starting", "check_interval", u.cfg.CheckPeriod.String()) diff --git a/lib/events/export/date_exporter.go b/lib/events/export/date_exporter.go index 698c67d8c156e..ce796e01c81f9 100644 --- a/lib/events/export/date_exporter.go +++ b/lib/events/export/date_exporter.go @@ -32,7 +32,6 @@ import ( auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/utils/retryutils" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" ) @@ -162,10 +161,10 @@ func NewDateExporter(cfg DateExporterConfig) (*DateExporter, error) { } retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ - First: utils.FullJitter(cfg.MaxBackoff / 16), + First: retryutils.FullJitter(cfg.MaxBackoff / 16), Driver: retryutils.NewExponentialDriver(cfg.MaxBackoff / 16), Max: cfg.MaxBackoff, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if err != nil { return nil, trace.Wrap(err) @@ -267,8 +266,8 @@ func (e *DateExporter) run(ctx context.Context) { poll := interval.New(interval.Config{ Duration: e.cfg.PollInterval, - FirstDuration: utils.FullJitter(e.cfg.PollInterval / 2), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.FullJitter(e.cfg.PollInterval / 2), + Jitter: retryutils.SeventhJitter, }) defer poll.Stop() diff --git a/lib/events/export/exporter.go b/lib/events/export/exporter.go index 779e0231edce0..ef29c47462dce 100644 --- a/lib/events/export/exporter.go +++ b/lib/events/export/exporter.go @@ -30,7 +30,6 @@ import ( auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/utils/retryutils" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" ) @@ -244,8 +243,8 @@ func (e *Exporter) run(ctx context.Context) { poll := interval.New(interval.Config{ Duration: e.cfg.PollInterval, - FirstDuration: utils.FullJitter(e.cfg.PollInterval / 2), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.FullJitter(e.cfg.PollInterval / 2), + Jitter: retryutils.SeventhJitter, }) defer poll.Stop() diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 6fc78a4ec116d..27b14f4408fd7 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -197,7 +197,7 @@ func (u *Uploader) Serve(ctx context.Context) error { Step: u.cfg.ScanPeriod, Max: u.cfg.ScanPeriod * 100, Clock: u.cfg.Clock, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) if err != nil { return trace.Wrap(err) diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index 71868b6da189a..274a0d9a4af01 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -33,6 +33,7 @@ import ( "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/utils/retryutils" usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" @@ -294,8 +295,8 @@ func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryContro interval.SubInterval[intervalKey]{ Key: instanceHeartbeatKey, VariableDuration: c.instanceHBVariableDuration, - FirstDuration: fullJitter(c.instanceHBVariableDuration.Duration()), - Jitter: seventhJitter, + FirstDuration: retryutils.FullJitter(c.instanceHBVariableDuration.Duration()), + Jitter: retryutils.SeventhJitter, }) handle := newUpstreamHandle(stream, hello, ticker) c.store.Insert(handle) @@ -430,8 +431,8 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { handle.ticker.Push(interval.SubInterval[intervalKey]{ Key: serverKeepAliveKey, Duration: c.serverKeepAlive, - FirstDuration: halfJitter(c.serverKeepAlive), - Jitter: seventhJitter, + FirstDuration: retryutils.HalfJitter(c.serverKeepAlive), + Jitter: retryutils.SeventhJitter, }) keepAliveNeedInit = false } diff --git a/lib/inventory/helpers.go b/lib/inventory/helpers.go deleted file mode 100644 index 4f97476573731..0000000000000 --- a/lib/inventory/helpers.go +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Teleport - * Copyright (C) 2023 Gravitational, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package inventory - -import ( - "github.com/gravitational/teleport/api/utils/retryutils" -) - -// we use dedicated global jitters for all the intervals/retries in this -// package. we do this because our jitter usage in this package can scale by -// the number of concurrent connections to auth, making dedicated jitters a -// poor choice (high memory usage for all the rngs). -var ( - seventhJitter = retryutils.NewShardedSeventhJitter() - halfJitter = retryutils.NewShardedHalfJitter() - fullJitter = retryutils.NewShardedFullJitter() -) diff --git a/lib/kube/proxy/cluster_details.go b/lib/kube/proxy/cluster_details.go index 1bd7ed579eef5..b74e81f1b5ddb 100644 --- a/lib/kube/proxy/cluster_details.go +++ b/lib/kube/proxy/cluster_details.go @@ -173,7 +173,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe First: firstPeriod, Step: backoffRefreshStep, Max: defaultRefreshPeriod, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, Clock: cfg.clock, }) if err != nil { diff --git a/lib/reversetunnel/agentpool.go b/lib/reversetunnel/agentpool.go index a0cc7bd2db44b..25a59cc1cdebe 100644 --- a/lib/reversetunnel/agentpool.go +++ b/lib/reversetunnel/agentpool.go @@ -189,7 +189,7 @@ func NewAgentPool(ctx context.Context, config AgentPoolConfig) (*AgentPool, erro retry, err := retryutils.NewLinear(retryutils.LinearConfig{ Step: time.Second, Max: maxBackoff, - Jitter: retryutils.NewJitter(), + Jitter: retryutils.DefaultJitter, AutoReset: 4, }) if err != nil { diff --git a/lib/reversetunnel/srv.go b/lib/reversetunnel/srv.go index 4cf45cf81a15f..cd36109a0b72f 100644 --- a/lib/reversetunnel/srv.go +++ b/lib/reversetunnel/srv.go @@ -1233,10 +1233,10 @@ func newRemoteSite(srv *server, domainName string, sconn ssh.Conn) (*remoteSite, remoteSite.certificateCache = certificateCache caRetry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.HalfJitter(srv.Config.PollingPeriod), + First: retryutils.HalfJitter(srv.Config.PollingPeriod), Step: srv.Config.PollingPeriod / 5, Max: srv.Config.PollingPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: srv.Clock, }) if err != nil { @@ -1262,10 +1262,10 @@ func newRemoteSite(srv *server, domainName string, sconn ssh.Conn) (*remoteSite, }() lockRetry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.HalfJitter(srv.Config.PollingPeriod), + First: retryutils.HalfJitter(srv.Config.PollingPeriod), Step: srv.Config.PollingPeriod / 5, Max: srv.Config.PollingPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: srv.Clock, }) if err != nil { diff --git a/lib/secretsscanner/authorizedkeys/authorized_keys.go b/lib/secretsscanner/authorizedkeys/authorized_keys.go index 11c77c6a70f78..bdcb45e78b34d 100644 --- a/lib/secretsscanner/authorizedkeys/authorized_keys.go +++ b/lib/secretsscanner/authorizedkeys/authorized_keys.go @@ -194,10 +194,10 @@ func (w *Watcher) start(ctx context.Context) error { select { case <-ctx.Done(): return nil - case <-w.clock.After(retryutils.NewFullJitter()(maxInitialDelay)): + case <-w.clock.After(retryutils.FullJitter(maxInitialDelay)): } - jitterFunc := retryutils.NewHalfJitter() + jitterFunc := retryutils.HalfJitter // maxReSendInterval is the maximum interval to re-send the authorized keys report // to the cluster in case of no changes. const maxReSendInterval = accessgraph.AuthorizedKeyDefaultKeyTTL - 20*time.Minute diff --git a/lib/secretsscanner/authorizedkeys/supervisor.go b/lib/secretsscanner/authorizedkeys/supervisor.go index c0048abb07877..003dbb118efa4 100644 --- a/lib/secretsscanner/authorizedkeys/supervisor.go +++ b/lib/secretsscanner/authorizedkeys/supervisor.go @@ -79,7 +79,7 @@ func supervisorRunner(parentCtx context.Context, cfg supervisorRunnerConfig) err } } - jitterFunc := retryutils.NewHalfJitter() + jitterFunc := retryutils.HalfJitter t := cfg.clock.NewTimer(jitterFunc(cfg.tickerInterval)) for { switch enabled, err := cfg.checkIfMonitorEnabled(parentCtx); { diff --git a/lib/service/awsoidc.go b/lib/service/awsoidc.go index 47baedcac25dd..47e9a1297d86a 100644 --- a/lib/service/awsoidc.go +++ b/lib/service/awsoidc.go @@ -159,7 +159,7 @@ func NewDeployServiceUpdater(config AWSOIDCDeployServiceUpdaterConfig) (*AWSOIDC func (updater *AWSOIDCDeployServiceUpdater) Run(ctx context.Context) error { periodic := interval.New(interval.Config{ Duration: updateAWSOIDCDeployServiceInterval, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer periodic.Stop() diff --git a/lib/service/connect.go b/lib/service/connect.go index fcf4122e44f58..62bc4a5cbcde9 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -72,11 +72,11 @@ const updateClientsJoinWarning = "This agent joined the cluster during the updat // service until succeeds or process gets shut down func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (*Connector, error) { retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.HalfJitter(process.Config.MaxRetryPeriod / 10), + First: retryutils.HalfJitter(process.Config.MaxRetryPeriod / 10), Step: process.Config.MaxRetryPeriod / 5, Max: process.Config.MaxRetryPeriod, Clock: process.Clock, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if err != nil { return nil, trace.Wrap(err) @@ -640,10 +640,10 @@ func (process *TeleportProcess) periodicSyncRotationState() error { process.logger.InfoContext(process.ExitContext(), "The new service has started successfully. Starting syncing rotation status.", "max_retry_period", maxRetryPeriod) retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ - First: utils.FullJitter(maxRetryPeriod / 16), + First: retryutils.FullJitter(maxRetryPeriod / 16), Driver: retryutils.NewExponentialDriver(maxRetryPeriod / 16), Max: maxRetryPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: process.Clock, }) if err != nil { @@ -704,8 +704,8 @@ func (process *TeleportProcess) syncRotationStateCycle(retry retryutils.Retry) e periodic := interval.New(interval.Config{ Duration: process.Config.PollingPeriod, - FirstDuration: utils.HalfJitter(process.Config.PollingPeriod), - Jitter: retryutils.NewSeventhJitter(), + FirstDuration: retryutils.HalfJitter(process.Config.PollingPeriod), + Jitter: retryutils.SeventhJitter, }) defer periodic.Stop() diff --git a/lib/service/service.go b/lib/service/service.go index 6ad43e0ab60dd..0021ddc7460c3 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -84,6 +84,7 @@ import ( "github.com/gravitational/teleport/api/utils/aws" "github.com/gravitational/teleport/api/utils/grpc/interceptors" "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/api/utils/retryutils" apisshutils "github.com/gravitational/teleport/api/utils/sshutils" "github.com/gravitational/teleport/lib" "github.com/gravitational/teleport/lib/agentless" @@ -4624,7 +4625,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { TracerProvider: process.TracingProvider, AutomaticUpgradesChannels: cfg.Proxy.AutomaticUpgradesChannels, IntegrationAppHandler: connectionsHandler, - FeatureWatchInterval: utils.HalfJitter(web.DefaultFeatureWatchInterval * 2), + FeatureWatchInterval: retryutils.HalfJitter(web.DefaultFeatureWatchInterval * 2), } webHandler, err := web.NewHandler(webConfig) if err != nil { diff --git a/lib/services/local/externalauditstorage_watcher.go b/lib/services/local/externalauditstorage_watcher.go index 641f1bcd03e88..633e8a5c53f3e 100644 --- a/lib/services/local/externalauditstorage_watcher.go +++ b/lib/services/local/externalauditstorage_watcher.go @@ -88,7 +88,7 @@ func NewClusterExternalAuditWatcher(ctx context.Context, cfg ClusterExternalAudi First: defaults.HighResPollingPeriod, Driver: retryutils.NewExponentialDriver(defaults.HighResPollingPeriod), Max: defaults.LowResPollingPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: cfg.Clock, }) if err != nil { diff --git a/lib/services/local/headlessauthn_watcher.go b/lib/services/local/headlessauthn_watcher.go index c90f2da8afbd7..d3ded3d60439c 100644 --- a/lib/services/local/headlessauthn_watcher.go +++ b/lib/services/local/headlessauthn_watcher.go @@ -33,7 +33,6 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/utils" ) // maxSubscribers is the maximum number of concurrent subscribers that a headless authentication watcher @@ -98,10 +97,10 @@ func NewHeadlessAuthenticationWatcher(ctx context.Context, cfg HeadlessAuthentic } retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.FullJitter(cfg.MaxRetryPeriod / 10), + First: retryutils.FullJitter(cfg.MaxRetryPeriod / 10), Step: cfg.MaxRetryPeriod / 5, Max: cfg.MaxRetryPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: cfg.Clock, }) if err != nil { diff --git a/lib/services/local/presence.go b/lib/services/local/presence.go index 4c609180e1304..e730a9b7daff0 100644 --- a/lib/services/local/presence.go +++ b/lib/services/local/presence.go @@ -57,7 +57,7 @@ type backendItemToResourceFunc func(item backend.Item) (types.ResourceWithLabels func NewPresenceService(b backend.Backend) *PresenceService { return &PresenceService{ log: logrus.WithFields(logrus.Fields{teleport.ComponentKey: "Presence"}), - jitter: retryutils.NewFullJitter(), + jitter: retryutils.FullJitter, Backend: b, } } diff --git a/lib/services/semaphore.go b/lib/services/semaphore.go index 2cebd19a14aba..52211acbfa33c 100644 --- a/lib/services/semaphore.go +++ b/lib/services/semaphore.go @@ -298,7 +298,7 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph retry, err := retryutils.NewLinear(retryutils.LinearConfig{ Max: cfg.Expiry / 4, Step: cfg.Expiry / 16, - Jitter: retryutils.NewJitter(), + Jitter: retryutils.DefaultJitter, Clock: cfg.Clock, }) if err != nil { diff --git a/lib/services/watcher.go b/lib/services/watcher.go index d06d97dc7c7c2..6bbd71bee0993 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -138,10 +138,10 @@ func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg Re return nil, trace.Wrap(err) } retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.FullJitter(cfg.MaxRetryPeriod / 10), + First: retryutils.FullJitter(cfg.MaxRetryPeriod / 10), Step: cfg.MaxRetryPeriod / 5, Max: cfg.MaxRetryPeriod, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: cfg.Clock, }) if err != nil { diff --git a/lib/srv/db/cloud/iam.go b/lib/srv/db/cloud/iam.go index 4ef4e4d9dd0e5..60cc45ffa2ad0 100644 --- a/lib/srv/db/cloud/iam.go +++ b/lib/srv/db/cloud/iam.go @@ -315,7 +315,7 @@ func (c *IAM) processTask(ctx context.Context, task iamTask) error { Retry: retryutils.LinearConfig{ Step: 10 * time.Second, Max: 2 * time.Minute, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }, }) if err != nil { diff --git a/lib/srv/db/cloud/users/users.go b/lib/srv/db/cloud/users/users.go index 0dcc7ec7d6ae4..f0bffc41022b2 100644 --- a/lib/srv/db/cloud/users/users.go +++ b/lib/srv/db/cloud/users/users.go @@ -162,7 +162,7 @@ func (u *Users) Start(ctx context.Context, getAllDatabases func() types.Database ticker := interval.New(interval.Config{ // Use jitter for HA setups. - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, // NewSeventhJitter builds a new jitter on the range [6n/7,n). // Use n = cfg.Interval*7/6 gives an effective duration range of diff --git a/lib/srv/db/objects/importer.go b/lib/srv/db/objects/importer.go index cc5afc6476afb..b5c3d6d5a3a9a 100644 --- a/lib/srv/db/objects/importer.go +++ b/lib/srv/db/objects/importer.go @@ -79,9 +79,9 @@ func newSingleDatabaseImporter(cfg Config, database types.Database, fetcher Obje func (i *singleDatabaseImporter) start(ctx context.Context) { i.cfg.Log.DebugContext(ctx, "Starting database importer.") ticker := interval.New(interval.Config{ - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, Duration: i.cfg.ScanInterval * 7 / 6, - FirstDuration: retryutils.NewFullJitter()(i.cfg.ScanInterval), + FirstDuration: retryutils.FullJitter(i.cfg.ScanInterval), }) defer ticker.Stop() diff --git a/lib/srv/db/postgres/users.go b/lib/srv/db/postgres/users.go index 9426fc2fe429d..c3eb63affa615 100644 --- a/lib/srv/db/postgres/users.go +++ b/lib/srv/db/postgres/users.go @@ -611,7 +611,7 @@ func withRetry(ctx context.Context, log *slog.Logger, f func() error) error { First: 0, Step: 100 * time.Millisecond, Max: 750 * time.Millisecond, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if err != nil { return trace.Wrap(err) diff --git a/lib/srv/discovery/access_graph.go b/lib/srv/discovery/access_graph.go index c7949050ee47a..7d65f99f88ef5 100644 --- a/lib/srv/discovery/access_graph.go +++ b/lib/srv/discovery/access_graph.go @@ -299,7 +299,7 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c First: time.Second, Step: semaphoreExpiration / 2, Max: semaphoreExpiration, - Jitter: retryutils.NewJitter(), + Jitter: retryutils.DefaultJitter, }, }, ) diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 4b78786d5f60a..eda442a967129 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -264,7 +264,7 @@ kubernetes matchers are present.`) c.Matchers.Azure = services.SimplifyAzureMatchers(c.Matchers.Azure) - c.jitter = retryutils.NewSeventhJitter() + c.jitter = retryutils.SeventhJitter return nil } diff --git a/lib/srv/discovery/reconciler.go b/lib/srv/discovery/reconciler.go index dd9dc1d605f9c..6dd91d6e9358d 100644 --- a/lib/srv/discovery/reconciler.go +++ b/lib/srv/discovery/reconciler.go @@ -84,7 +84,7 @@ func newLabelReconciler(cfg *labelReconcilerConfig) (*labelReconciler, error) { discoveredServers: make(map[string]types.ServerInfo), serverInfoQueue: make([]types.ServerInfo, 0, minBatchSize), lastBatchSize: minBatchSize, - jitter: retryutils.NewSeventhJitter(), + jitter: retryutils.SeventhJitter, }, nil } diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 321619bb02636..460053ff47daf 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -412,7 +412,7 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun First: time.Second, Step: semaphoreExpiration / 2, Max: semaphoreExpiration, - Jitter: retryutils.NewJitter(), + Jitter: retryutils.DefaultJitter, }, }, ) diff --git a/lib/srv/heartbeatv2.go b/lib/srv/heartbeatv2.go index 0deb0a43c1f66..7a95020e7a31d 100644 --- a/lib/srv/heartbeatv2.go +++ b/lib/srv/heartbeatv2.go @@ -38,7 +38,6 @@ import ( "github.com/gravitational/teleport/lib/inventory" "github.com/gravitational/teleport/lib/inventory/metadata" "github.com/gravitational/teleport/lib/services" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" ) @@ -309,17 +308,17 @@ func (h *HeartbeatV2) run() { // set up interval for forced announcement (i.e. heartbeat even if state is unchanged). h.announce = interval.New(interval.Config{ - FirstDuration: utils.HalfJitter(h.announceInterval), + FirstDuration: retryutils.HalfJitter(h.announceInterval), Duration: h.announceInterval, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer h.announce.Stop() // set up interval for polling the inner heartbeat impl for changes. h.poll = interval.New(interval.Config{ - FirstDuration: utils.HalfJitter(h.pollInterval), + FirstDuration: retryutils.HalfJitter(h.pollInterval), Duration: h.pollInterval, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer h.poll.Stop() @@ -360,7 +359,7 @@ func (h *HeartbeatV2) run() { } else { h.testEvent(hbv2FallbackErr) // announce failed, enter a backoff state. - h.fallbackBackoffTime = time.Now().Add(utils.SeventhJitter(h.fallbackBackoff)) + h.fallbackBackoffTime = time.Now().Add(retryutils.SeventhJitter(h.fallbackBackoff)) h.onHeartbeat(h.fallbackFailed) } } else { @@ -418,7 +417,7 @@ func (h *HeartbeatV2) runWithSender(sender inventory.DownstreamSender) { // can sometimes mean that the last announce failed "silently" from our perspective. if t, ok := h.announce.LastTick(); ok { elapsed := time.Since(t) - dai := utils.SeventhJitter(h.disruptionAnnounceInterval) + dai := retryutils.SeventhJitter(h.disruptionAnnounceInterval) if elapsed >= dai { h.shouldAnnounce = true } else { diff --git a/lib/srv/sessiontracker.go b/lib/srv/sessiontracker.go index c15ea16810fe6..693815c397b05 100644 --- a/lib/srv/sessiontracker.go +++ b/lib/srv/sessiontracker.go @@ -122,7 +122,7 @@ func (s *SessionTracker) retryUpdate(ctx context.Context, clock clockwork.Clock) Max: 3 * time.Minute, First: time.Minute, Step: time.Minute, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if err != nil { return trace.Wrap(err) diff --git a/lib/srv/statichostusers.go b/lib/srv/statichostusers.go index e4d707f811158..ec39cfb44064e 100644 --- a/lib/srv/statichostusers.go +++ b/lib/srv/statichostusers.go @@ -33,7 +33,6 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/services" - "github.com/gravitational/teleport/lib/utils" ) const staticHostUserWatcherTimeout = 30 * time.Second @@ -89,10 +88,10 @@ func NewStaticHostUserHandler(cfg StaticHostUserHandlerConfig) (*StaticHostUserH cfg.clock = clockwork.NewRealClock() } retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.FullJitter(defaults.MaxWatcherBackoff / 10), + First: retryutils.FullJitter(defaults.MaxWatcherBackoff / 10), Step: defaults.MaxWatcherBackoff / 5, Max: defaults.MaxWatcherBackoff, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: cfg.clock, }) if err != nil { diff --git a/lib/tbot/loop.go b/lib/tbot/loop.go index e03694b858703..aa3b2098302cb 100644 --- a/lib/tbot/loop.go +++ b/lib/tbot/loop.go @@ -74,7 +74,7 @@ func runOnInterval(ctx context.Context, cfg runOnIntervalConfig) error { ticker := cfg.clock.NewTicker(cfg.interval) defer ticker.Stop() - jitter := retryutils.NewJitter() + jitter := retryutils.DefaultJitter firstRun := true for { if !firstRun || (firstRun && cfg.waitBeforeFirstRun) { diff --git a/lib/tbot/service_ca_rotation.go b/lib/tbot/service_ca_rotation.go index 86bbe28bffe30..6df13528d62a3 100644 --- a/lib/tbot/service_ca_rotation.go +++ b/lib/tbot/service_ca_rotation.go @@ -147,7 +147,7 @@ func (s *caRotationService) Run(ctx context.Context) error { f: s.reloadBroadcaster.broadcast, debouncePeriod: time.Second * 10, } - jitter := retryutils.NewJitter() + jitter := retryutils.DefaultJitter for { err := s.watchCARotations(ctx, rd.attempt) diff --git a/lib/tbot/service_spiffe_svid_output.go b/lib/tbot/service_spiffe_svid_output.go index b411a6ff95f61..cb0627ab1a51e 100644 --- a/lib/tbot/service_spiffe_svid_output.go +++ b/lib/tbot/service_spiffe_svid_output.go @@ -93,7 +93,7 @@ func (s *SPIFFESVIDOutputService) Run(ctx context.Context) error { return trace.Wrap(err, "getting trust bundle set") } - jitter := retryutils.NewJitter() + jitter := retryutils.DefaultJitter var res *machineidv1pb.SignX509SVIDsResponse var privateKey crypto.Signer var jwtSVIDs map[string]string diff --git a/lib/teleterm/daemon/daemon_headless.go b/lib/teleterm/daemon/daemon_headless.go index 3ea8b5755d678..577c9d8cee420 100644 --- a/lib/teleterm/daemon/daemon_headless.go +++ b/lib/teleterm/daemon/daemon_headless.go @@ -30,7 +30,6 @@ import ( api "github.com/gravitational/teleport/gen/proto/go/teleport/lib/teleterm/v1" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/teleterm/clusters" - "github.com/gravitational/teleport/lib/utils" ) // UpdateHeadlessAuthenticationState updates a headless authentication state. @@ -84,10 +83,10 @@ func (s *Service) startHeadlessWatcher(rootCluster *clusters.Cluster, waitInit b maxBackoffDuration := defaults.MaxWatcherBackoff retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: utils.FullJitter(maxBackoffDuration / 10), + First: retryutils.FullJitter(maxBackoffDuration / 10), Step: maxBackoffDuration / 5, Max: maxBackoffDuration, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, Clock: s.cfg.Clock, }) if err != nil { diff --git a/lib/usagereporter/teleport/aggregating/submitter.go b/lib/usagereporter/teleport/aggregating/submitter.go index e36dca61e32ff..ebb43942da2f4 100644 --- a/lib/usagereporter/teleport/aggregating/submitter.go +++ b/lib/usagereporter/teleport/aggregating/submitter.go @@ -32,7 +32,6 @@ import ( prehogv1 "github.com/gravitational/teleport/gen/proto/go/prehog/v1" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/services" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" ) @@ -105,9 +104,9 @@ func (cfg *SubmitterConfig) CheckAndSetDefaults() error { // CheckAndSetDefaults, and should probably be called in a goroutine. func RunSubmitter(ctx context.Context, cfg SubmitterConfig) { iv := interval.New(interval.Config{ - FirstDuration: utils.HalfJitter(2 * submitInterval), + FirstDuration: retryutils.HalfJitter(2 * submitInterval), Duration: submitInterval, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer iv.Stop() diff --git a/lib/utils/diagnostics/latency/monitor.go b/lib/utils/diagnostics/latency/monitor.go index 81946d9b0c466..bbd2e98fa0782 100644 --- a/lib/utils/diagnostics/latency/monitor.go +++ b/lib/utils/diagnostics/latency/monitor.go @@ -127,7 +127,7 @@ func (c *MonitorConfig) CheckAndSetDefaults() error { } if c.InitialPingInterval <= 0 { - c.InitialReportInterval = fullJitter(500 * time.Millisecond) + c.InitialReportInterval = retryutils.FullJitter(500 * time.Millisecond) } if c.ReportInterval <= 0 { @@ -135,7 +135,7 @@ func (c *MonitorConfig) CheckAndSetDefaults() error { } if c.InitialReportInterval <= 0 { - c.InitialReportInterval = halfJitter(1500 * time.Millisecond) + c.InitialReportInterval = retryutils.HalfJitter(1500 * time.Millisecond) } if c.Clock == nil { @@ -145,12 +145,6 @@ func (c *MonitorConfig) CheckAndSetDefaults() error { return nil } -var ( - seventhJitter = retryutils.NewSeventhJitter() - fullJitter = retryutils.NewFullJitter() - halfJitter = retryutils.NewHalfJitter() -) - // NewMonitor creates an unstarted [Monitor] with the provided configuration. To // begin sampling connection latencies [Monitor.Run] must be called. func NewMonitor(cfg MonitorConfig) (*Monitor, error) { @@ -197,7 +191,7 @@ func (m *Monitor) Run(ctx context.Context) { if err := m.reporter.Report(ctx, m.GetStats()); err != nil { log.WithError(err).Warn("failed to report latency stats") } - m.reportTimer.Reset(seventhJitter(m.reportInterval)) + m.reportTimer.Reset(retryutils.SeventhJitter(m.reportInterval)) case <-ctx.Done(): return } @@ -216,7 +210,7 @@ func (m *Monitor) pingLoop(ctx context.Context, pinger Pinger, timer clockwork.T } else { latency.Store(m.clock.Now().Sub(then).Milliseconds()) } - timer.Reset(seventhJitter(m.pingInterval)) + timer.Reset(retryutils.SeventhJitter(m.pingInterval)) } } } diff --git a/lib/utils/genmap/genmap.go b/lib/utils/genmap/genmap.go index 28ae3da7ba170..6ba1d4e62be01 100644 --- a/lib/utils/genmap/genmap.go +++ b/lib/utils/genmap/genmap.go @@ -92,7 +92,7 @@ func (c *Config[K, V]) CheckAndSetDefaults() error { } if c.Jitter == nil { - c.Jitter = retryutils.NewSeventhJitter() + c.Jitter = retryutils.SeventhJitter } if c.MaxFailures < 1 { diff --git a/lib/utils/retry.go b/lib/utils/retry.go index 4b712d9a28378..51d330ba14ee6 100644 --- a/lib/utils/retry.go +++ b/lib/utils/retry.go @@ -24,23 +24,15 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" ) -// HalfJitter is a global jitter instance used for one-off jitters. -// Prefer instantiating a new jitter instance for operations that require -// repeated calls, and use a dedicated sharded jitter instance for -// any usecases that might scale with cluster size or request count. -var HalfJitter = retryutils.NewHalfJitter() +// HalfJitter is [retryutils.HalfJitter]. +// +// Deprecated: use retryutils.HalfJitter. +func HalfJitter(d time.Duration) time.Duration { return retryutils.HalfJitter(d) } -// SeventhJitter is a global jitter instance used for one-off jitters. -// Prefer instantiating a new jitter instance for operations that require -// repeated calls, and use a dedicated sharded jitter instance for -// any usecases that might scale with cluster size or request count. -var SeventhJitter = retryutils.NewSeventhJitter() - -// FullJitter is a global jitter instance used for one-off jitters. -// Prefer instantiating a new jitter instance for operations that require -// repeated calls, and use a dedicated sharded jitter instance for -// any usecases that might scale with cluster size or request count. -var FullJitter = retryutils.NewFullJitter() +// FullJitter is [retryutils.FullJitter]. +// +// Deprecated: use retryutils.FullJitter. +func FullJitter(d time.Duration) time.Duration { return retryutils.FullJitter(d) } // NewDefaultLinear creates a linear retry with reasonable default parameters for // attempting to restart "critical but potentially load-inducing" operations, such @@ -48,10 +40,10 @@ var FullJitter = retryutils.NewFullJitter() // but this retry will always be configured for automatic reset. func NewDefaultLinear() *retryutils.Linear { retry, err := retryutils.NewLinear(retryutils.LinearConfig{ - First: FullJitter(time.Second * 10), + First: retryutils.FullJitter(time.Second * 10), Step: time.Second * 15, Max: time.Second * 90, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, AutoReset: 5, }) if err != nil { diff --git a/lib/versioncontrol/upgradewindow/upgradewindow.go b/lib/versioncontrol/upgradewindow/upgradewindow.go index 345fc714f8f60..7f90a9d70d41c 100644 --- a/lib/versioncontrol/upgradewindow/upgradewindow.go +++ b/lib/versioncontrol/upgradewindow/upgradewindow.go @@ -34,7 +34,6 @@ import ( "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/kubernetes" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" "github.com/gravitational/teleport/lib/versioncontrol" ) @@ -145,7 +144,7 @@ func (c *ExporterConfig[C]) CheckAndSetDefaults() error { // note: we add an extra millisecond since FullJitter can sometimes return 0, but our interval helpers interpret FirstDuration=0 // as meaning that we don't want a custom first duration. this is usually fine, but since the actual export interval is so long, // it is important that a shorter first duration is always observed. - c.FirstExport = time.Millisecond + utils.FullJitter(c.UnhealthyThreshold/2) + c.FirstExport = time.Millisecond + retryutils.FullJitter(c.UnhealthyThreshold/2) } return nil @@ -168,7 +167,7 @@ func NewExporter[C contextLike](cfg ExporterConfig[C]) (*Exporter[C], error) { retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ Driver: retryutils.NewExponentialDriver(cfg.UnhealthyThreshold / 16), Max: cfg.UnhealthyThreshold, - Jitter: retryutils.NewHalfJitter(), + Jitter: retryutils.HalfJitter, }) if err != nil { @@ -204,9 +203,9 @@ func (e *Exporter[C]) event(event testEvent) { func (e *Exporter[C]) run(ctx context.Context) { exportInterval := interval.New(interval.Config{ - FirstDuration: utils.FullJitter(e.cfg.FirstExport), + FirstDuration: retryutils.FullJitter(e.cfg.FirstExport), Duration: e.cfg.ExportInterval, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer exportInterval.Stop() diff --git a/tool/teleport/common/wait.go b/tool/teleport/common/wait.go index f81cd0f6ee929..3522c37dc935c 100644 --- a/tool/teleport/common/wait.go +++ b/tool/teleport/common/wait.go @@ -103,7 +103,7 @@ func waitNoResolve(ctx context.Context, domain string, period, timeout time.Dura periodic := interval.New(interval.Config{ Duration: period, FirstDuration: time.Millisecond, - Jitter: retryutils.NewSeventhJitter(), + Jitter: retryutils.SeventhJitter, }) defer periodic.Stop()