Skip to content

Commit

Permalink
fix app keepalive (#42269)
Browse files Browse the repository at this point in the history
Co-authored-by: Gus Luxton <[email protected]>
  • Loading branch information
fspmarshall and webvictim authored Jun 3, 2024
1 parent 65e1a2f commit e5c8e16
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
19 changes: 14 additions & 5 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (

appKeepAliveOk testEvent = "app-keep-alive-ok"
appKeepAliveErr testEvent = "app-keep-alive-err"
appKeepAliveDel testEvent = "app-keep-alive-del"

appUpsertOk testEvent = "app-upsert-ok"
appUpsertErr testEvent = "app-upsert-err"
Expand All @@ -75,6 +76,8 @@ const (

handlerStart = "handler-start"
handlerClose = "handler-close"

keepAliveTick = "keep-alive-tick"
)

// intervalKey is used to uniquely identify the subintervals registered with the interval.MultiInterval
Expand Down Expand Up @@ -622,6 +625,10 @@ func (c *Controller) handleAgentMetadata(handle *upstreamHandle, m proto.Upstrea
}

func (c *Controller) keepAliveServer(handle *upstreamHandle, now time.Time) error {
// always fire off 'tick' event after keepalive processing to ensure
// that waiting for N ticks maps intuitively to waiting for N keepalive
// processing steps.
defer c.testEvent(keepAliveTick)
if err := c.keepAliveSSHServer(handle, now); err != nil {
return trace.Wrap(err)
}
Expand All @@ -647,14 +654,15 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e

srv.keepAliveErrs++
handle.appServers[name] = srv
shouldClose := srv.keepAliveErrs > c.maxKeepAliveErrs

log.Warnf("Failed to keep alive app server %q: %v (count=%d, closing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldClose)
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive app server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)

if shouldClose {
return trace.Errorf("failed to keep alive app server: %v", err)
if shouldRemove {
c.testEvent(appKeepAliveDel)
delete(handle.appServers, name)
}
} else {
srv.keepAliveErrs = 0
c.testEvent(appKeepAliveOk)
}
} else if srv.retryUpsert {
Expand Down Expand Up @@ -697,6 +705,7 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
return trace.Errorf("failed to keep alive ssh server: %v", err)
}
} else {
handle.sshServer.keepAliveErrs = 0
c.testEvent(sshKeepAliveOk)
}
} else if handle.sshServer.retryUpsert {
Expand Down
39 changes: 36 additions & 3 deletions lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func TestSSHServerBasics(t *testing.T) {
// an app service.
func TestAppServerBasics(t *testing.T) {
const serverID = "test-server"
const appCount = 3

t.Parallel()

Expand Down Expand Up @@ -311,7 +312,7 @@ func TestAppServerBasics(t *testing.T) {
require.True(t, ok)

// send a fake app server heartbeat
for i := 0; i < 3; i++ {
for i := 0; i < appCount; i++ {
err := downstream.Send(ctx, proto.InventoryHeartbeat{
AppServer: &types.AppServerV3{
Metadata: types.Metadata{
Expand Down Expand Up @@ -353,7 +354,7 @@ func TestAppServerBasics(t *testing.T) {
deny(appUpsertErr, handlerClose),
)

for i := 0; i < 3; i++ {
for i := 0; i < appCount; i++ {
err := downstream.Send(ctx, proto.InventoryHeartbeat{
AppServer: &types.AppServerV3{
Metadata: types.Metadata{
Expand Down Expand Up @@ -402,6 +403,38 @@ func TestAppServerBasics(t *testing.T) {
_, err := handle.Ping(pingCtx, 1)
require.NoError(t, err)

// ensure that local app keepalive states have reset to healthy by waiting
// on a full cycle+ worth of keepalives without errors.
awaitEvents(t, events,
expect(keepAliveTick, keepAliveTick),
deny(appKeepAliveErr, handlerClose),
)

// set up to induce enough consecutive keepalive errors to cause removal
// of server-side keepalive state.
auth.mu.Lock()
auth.failKeepAlives = 3 * appCount
auth.mu.Unlock()

// expect that all app keepalives fail, then the app is removed.
var expectedEvents []testEvent
for i := 0; i < appCount; i++ {
expectedEvents = append(expectedEvents, []testEvent{appKeepAliveErr, appKeepAliveErr, appKeepAliveErr, appKeepAliveDel}...)
}

// wait for failed keepalives to trigger removal
awaitEvents(t, events,
expect(expectedEvents...),
deny(handlerClose),
)

// verify that further keepalive ticks to not result in attempts to keepalive
// apps (successful or not).
awaitEvents(t, events,
expect(keepAliveTick, keepAliveTick, keepAliveTick),
deny(appKeepAliveOk, appKeepAliveErr, handlerClose),
)

// set up to induce enough consecutive errors to cause stream closure
auth.mu.Lock()
auth.failUpserts = 5
Expand Down Expand Up @@ -736,7 +769,7 @@ func awaitEvents(t *testing.T, ch <-chan testEvent, opts ...eventOption) {
opt(&options)
}

timeout := time.After(time.Second * 5)
timeout := time.After(time.Second * 30)
for {
if len(options.expect) == 0 {
return
Expand Down

0 comments on commit e5c8e16

Please sign in to comment.