Skip to content

Commit

Permalink
heartbeat slightly more frequently on disruption
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Aug 5, 2024
1 parent 4003f79 commit 2defa53
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 14 deletions.
44 changes: 35 additions & 9 deletions lib/srv/heartbeatv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type HeartbeatV2Config[T any] struct {
OnHeartbeat func(error)
// AnnounceInterval is the interval at which heartbeats are attempted (optional).
AnnounceInterval time.Duration
// DisruptionAnnounceInterval is the interval at which heartbeats are attempted when
// if there was a disuption in the control stream since the last heartbeat (optional).
DisruptionAnnounceInterval time.Duration
// PollInterval is the interval at which checks for change are performed (optional).
PollInterval time.Duration
}
Expand Down Expand Up @@ -99,9 +102,10 @@ func NewSSHServerHeartbeat(cfg HeartbeatV2Config[*types.ServerV2]) (*HeartbeatV2
}

return newHeartbeatV2(cfg.InventoryHandle, inner, heartbeatV2Config{
onHeartbeatInner: cfg.OnHeartbeat,
announceInterval: cfg.AnnounceInterval,
pollInterval: cfg.PollInterval,
onHeartbeatInner: cfg.OnHeartbeat,
announceInterval: cfg.AnnounceInterval,
disruptionAnnounceInterval: cfg.DisruptionAnnounceInterval,
pollInterval: cfg.PollInterval,
}), nil
}

Expand All @@ -118,9 +122,10 @@ func NewAppServerHeartbeat(cfg HeartbeatV2Config[*types.AppServerV3]) (*Heartbea
}

return newHeartbeatV2(cfg.InventoryHandle, inner, heartbeatV2Config{
onHeartbeatInner: cfg.OnHeartbeat,
announceInterval: cfg.AnnounceInterval,
pollInterval: cfg.PollInterval,
onHeartbeatInner: cfg.OnHeartbeat,
announceInterval: cfg.AnnounceInterval,
disruptionAnnounceInterval: cfg.DisruptionAnnounceInterval,
pollInterval: cfg.PollInterval,
}), nil
}

Expand Down Expand Up @@ -208,9 +213,10 @@ type HeartbeatV2 struct {
}

type heartbeatV2Config struct {
announceInterval time.Duration
pollInterval time.Duration
onHeartbeatInner func(error)
announceInterval time.Duration
disruptionAnnounceInterval time.Duration
pollInterval time.Duration
onHeartbeatInner func(error)

// -- below values only used in tests

Expand All @@ -226,6 +232,11 @@ func (c *heartbeatV2Config) SetDefaults() {
// from the average of ~5m30s that was used for V1 ssh server heartbeats.
c.announceInterval = 2 * (apidefaults.ServerAnnounceTTL / 3)
}
if c.disruptionAnnounceInterval == 0 {
// if there was a disruption in the control stream, we want to heartbeat a bit
// sooner in case the disruption affected the most recent announce's success.
c.disruptionAnnounceInterval = 2 * (c.announceInterval / 3)
}
if c.pollInterval == 0 {
c.pollInterval = defaults.HeartbeatCheckPeriod
}
Expand Down Expand Up @@ -356,6 +367,21 @@ func (h *HeartbeatV2) runWithSender(sender inventory.DownstreamSender) {
h.shouldAnnounce = true
}

// in the event of disruption, we want to heartbeat a bit sooner than the normal.
// this helps prevent node heartbeats from getting too stale when auth servers fail
// in a manner that isn't immediately detected by the agent (e.g. deadlock,
// i/o timeout, etc). Since we're heartbeating over a channel, such failure modes
// can sometimes mean that the last announce failed "silently" from our perspective.
if t, ok := h.announce.LastTick(); ok {
elapsed := time.Since(t)
dai := utils.SeventhJitter(h.disruptionAnnounceInterval)
if elapsed >= dai {
h.shouldAnnounce = true
} else {
h.announce.ResetTo(dai - elapsed)
}
}

for {
if h.shouldAnnounce {
if ok := h.inner.Announce(h.closeContext, sender); ok {
Expand Down
39 changes: 34 additions & 5 deletions lib/utils/interval/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package interval
import (
"errors"
"sync"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
Expand All @@ -38,8 +39,9 @@ import (
type Interval struct {
cfg Config
ch chan time.Time
reset chan struct{}
reset chan time.Duration
fire chan struct{}
lastTick atomic.Pointer[time.Time]
closeOnce sync.Once
done chan struct{}
}
Expand Down Expand Up @@ -88,7 +90,7 @@ func New(cfg Config) *Interval {
interval := &Interval{
ch: make(chan time.Time, 1),
cfg: cfg,
reset: make(chan struct{}),
reset: make(chan time.Duration),
fire: make(chan struct{}),
done: make(chan struct{}),
}
Expand Down Expand Up @@ -121,7 +123,15 @@ func (i *Interval) Stop() {
// jitter(duration) regardless of current timer progress).
func (i *Interval) Reset() {
select {
case i.reset <- struct{}{}:
case i.reset <- time.Duration(0):
case <-i.done:
}
}

// ResetTo resets the interval to the target duration for the next tick.
func (i *Interval) ResetTo(d time.Duration) {
select {
case i.reset <- d:
case <-i.done:
}
}
Expand All @@ -140,6 +150,20 @@ func (i *Interval) Next() <-chan time.Time {
return i.ch
}

// LastTick gets the most recent tick if the interval has fired at least once. Note that the
// tick returned by this method is the last *generated* tick, not necessarily the last tick
// that was *observed* by the consumer of the interval.
func (i *Interval) LastTick() (tick time.Time, ok bool) {
if t := i.lastTick.Load(); t != nil {
return *t, true
}
return time.Time{}, false
}

func (i *Interval) setLastTick(tick time.Time) {
i.lastTick.Store(&tick)
}

// duration gets the duration of the interval. Each call applies the jitter
// if one was supplied.
func (i *Interval) duration() time.Duration {
Expand All @@ -163,13 +187,17 @@ func (i *Interval) run(timer clockwork.Timer) {
// output channel is set.
timer.Reset(i.duration())
ch = i.ch
case <-i.reset:
i.setLastTick(tick)
case d := <-i.reset:
// stop and drain timer
if !timer.Stop() {
<-timer.Chan()
}
if d == 0 {
d = i.duration()
}
// re-set the timer
timer.Reset(i.duration())
timer.Reset(d)
// ensure we don't send any pending ticks
ch = nil
case <-i.fire:
Expand All @@ -182,6 +210,7 @@ func (i *Interval) run(timer clockwork.Timer) {
// simulate firing of the timer
tick = time.Now()
ch = i.ch
i.setLastTick(tick)
case ch <- tick:
// tick has been sent, set ch back to nil to prevent
// double-send and wait for next timer firing
Expand Down
77 changes: 77 additions & 0 deletions lib/utils/interval/interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,39 @@ import (
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)

// TestLastTick verifies that the LastTick method returns the last tick time as expected.
func TestLastTick(t *testing.T) {
clock := clockwork.NewFakeClock()
interval := New(Config{
Duration: time.Minute,
Clock: clock,
})

_, ok := interval.LastTick()
require.False(t, ok)

timeout := time.After(time.Second * 30)
for i := 0; i < 3; i++ {
clock.Advance(time.Minute)

var tick time.Time
select {
case tick = <-interval.Next():
case <-timeout:
t.Fatal("timeout waiting for tick")
}
require.Equal(t, clock.Now(), tick)

tick, ok = interval.LastTick()
require.True(t, ok)
require.Equal(t, clock.Now(), tick)
}
}

// TestIntervalReset verifies the basic behavior of the interval reset functionality.
// Since time based tests tend to be flaky, this test passes if it has a >50% success
// rate (i.e. >50% of resets seemed to have actually extended the timer successfully).
Expand Down Expand Up @@ -83,6 +113,53 @@ func TestIntervalReset(t *testing.T) {
require.Greater(t, success.Load(), failure.Load())
}

// TestIntervalResetTo verifies the basic behavior of the interval ResetTo method.
// Since time based tests tend to be flaky, this test passes if it has a >50% success
// rate (i.e. >50% of ResetTo calls seemed to have changed the timer's behavior as expected).
func TestIntervalResetTo(t *testing.T) {
const workers = 1_000
const ticks = 12
const longDuration = time.Millisecond * 800
const shortDuration = time.Millisecond * 200
t.Parallel()

var success, failure atomic.Uint64
var wg sync.WaitGroup

for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()

interval := New(Config{
Duration: longDuration,
})
defer interval.Stop()

start := time.Now()

for i := 0; i < ticks; i++ {
interval.ResetTo(shortDuration)
<-interval.Next()
}

elapsed := time.Since(start)
// if the above works completed before the expected minimum time
// to complete all ticks as long ticks, we assume that ResetTo has
// successfully shortened the interval.
if elapsed < longDuration*time.Duration(ticks) {
success.Add(1)
} else {
failure.Add(1)
}
}()
}

wg.Wait()

require.Greater(t, success.Load(), failure.Load())
}

func TestNewNoop(t *testing.T) {
t.Parallel()
i := NewNoop()
Expand Down

0 comments on commit 2defa53

Please sign in to comment.