Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v17] Preparation for variable rate heartbeats #49497

Merged
merged 11 commits into from
Nov 29, 2024
588 changes: 295 additions & 293 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2553,10 +2553,10 @@ message InventoryPingRequest {
// ServerID is the ID of the instance to ping.
string ServerID = 1;

// ControlLog forces the ping to use the standard "commit then act" model of control log synchronization
// for the ping. This significantly increases the amount of time it takes for the ping request to
// complete, but is useful for testing/debugging control log issues.
bool ControlLog = 2;
// ControlLog used to signal that the ping should use the control log synchronization.
//
// Deprecated: the control log is unsupported and unsound to use.
bool ControlLog = 2 [deprecated = true];
}

// InventoryPingResponse returns the result of an inventory ping initiated via an
Expand Down
76 changes: 2 additions & 74 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4826,94 +4826,22 @@ func (a *Server) GetInventoryConnectedServiceCount(service types.SystemRole) uin
}

func (a *Server) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
const pingAttempt = "ping-attempt"
const pingSuccess = "ping-success"
const maxAttempts = 16
stream, ok := a.inventory.GetControlStream(req.ServerID)
if !ok {
return proto.InventoryPingResponse{}, trace.NotFound("no control stream found for server %q", req.ServerID)
}

id := mathrand.Uint64()

if !req.ControlLog {
// this ping doesn't pass through the control log, so just execute it immediately.
d, err := stream.Ping(ctx, id)
return proto.InventoryPingResponse{
Duration: d,
}, trace.Wrap(err)
}

// matchEntry is used to check if our log entry has been included
// in the control log.
matchEntry := func(entry types.InstanceControlLogEntry) bool {
return entry.Type == pingAttempt && entry.ID == id
}

var included bool
for i := 1; i <= maxAttempts; i++ {
stream.VisitInstanceState(func(ref inventory.InstanceStateRef) (update inventory.InstanceStateUpdate) {
// check if we've already successfully included the ping entry
if ref.LastHeartbeat != nil {
if slices.IndexFunc(ref.LastHeartbeat.GetControlLog(), matchEntry) >= 0 {
included = true
return
}
}

// if the entry pending already, we just need to wait
if slices.IndexFunc(ref.QualifiedPendingControlLog, matchEntry) >= 0 {
return
}

// either this is the first iteration, or the pending control log was reset.
update.QualifiedPendingControlLog = append(update.QualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: pingAttempt,
ID: id,
Time: time.Now(),
})
stream.HeartbeatInstance()
return
})

if included {
// entry appeared in control log
break
}

// pause briefly, then re-sync our state. note that this strategy is not scalable. control log usage is intended only
// for periodic operations. control-log based pings are a mechanism for testing/debugging only, hence the use of a
// simple sleep loop.
select {
case <-time.After(time.Millisecond * 100 * time.Duration(i)):
case <-stream.Done():
return proto.InventoryPingResponse{}, trace.Errorf("control stream closed during ping attempt")
case <-ctx.Done():
return proto.InventoryPingResponse{}, trace.Wrap(ctx.Err())
}
}

if !included {
return proto.InventoryPingResponse{}, trace.LimitExceeded("failed to include ping %d in control log for instance %q (max attempts exceeded)", id, req.ServerID)
if req.ControlLog { //nolint:staticcheck // SA1019. Checking deprecated field that may be sent by older clients.
return proto.InventoryPingResponse{}, trace.BadParameter("ControlLog pings are not supported")
}

d, err := stream.Ping(ctx, id)
if err != nil {
return proto.InventoryPingResponse{}, trace.Wrap(err)
}

stream.VisitInstanceState(func(_ inventory.InstanceStateRef) (update inventory.InstanceStateUpdate) {
update.UnqualifiedPendingControlLog = append(update.UnqualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: pingSuccess,
ID: id,
Labels: map[string]string{
"duration": d.String(),
},
})
return
})
stream.HeartbeatInstance()

return proto.InventoryPingResponse{
Duration: d,
}, nil
Expand Down
Loading
Loading