Skip to content

Commit

Permalink
Heartbeat static desktops with the inventory control stream
Browse files Browse the repository at this point in the history
  • Loading branch information
zmb3 committed Oct 26, 2024
1 parent e3a4f3c commit 5dda1b0
Show file tree
Hide file tree
Showing 11 changed files with 1,318 additions and 972 deletions.
1,957 changes: 1,009 additions & 948 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ const (
// KeepAliveDatabase is the keep alive type for database server.
KeepAliveDatabase = "db"

// KeepAliveWindowsDesktop is the keep alive type for a Windows desktop.
KeepAliveWindowsDesktop = "windows_desktop"

// KeepAliveWindowsDesktopService is the keep alive type for a Windows
// desktop service.
KeepAliveWindowsDesktopService = "windows_desktop_service"
Expand Down
4 changes: 3 additions & 1 deletion api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2493,8 +2493,10 @@ message InventoryHeartbeat {
types.AppServerV3 AppServer = 2;
// DatabaseServer is a complete db server spec to be heartbeated.
types.DatabaseServerV3 DatabaseServer = 3;
// KubeServer is a complete kube server spec to be heartbeated.
// KubernetesServer is a complete kube server spec to be heartbeated.
types.KubernetesServerV3 KubernetesServer = 4;
// WindowsDesktop is a complete Windows desktop spec to be heartbeated.
types.WindowsDesktopV3 WindowsDesktop = 5;
}

// UpstreamInventoryGoodbye informs the upstream service that instance
Expand Down
16 changes: 9 additions & 7 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4715,13 +4715,15 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
Version: teleport.Version,
ServerID: a.ServerID,
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
NodeHeartbeats: true,
AppHeartbeats: true,
AppCleanup: true,
DatabaseHeartbeats: true,
DatabaseCleanup: true,
KubernetesHeartbeats: true,
KubernetesCleanup: true,
NodeHeartbeats: true,
AppHeartbeats: true,
AppCleanup: true,
DatabaseHeartbeats: true,
DatabaseCleanup: true,
KubernetesHeartbeats: true,
KubernetesCleanup: true,
WindowsDesktopHeartbeats: true,
WindowsDesktopCleanup: true,
},
}
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {
Expand Down
1 change: 1 addition & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ var connectedResourceGauges = map[string]prometheus.Gauge{
constants.KeepAliveApp: connectedResources.WithLabelValues(constants.KeepAliveApp),
constants.KeepAliveDatabase: connectedResources.WithLabelValues(constants.KeepAliveDatabase),
constants.KeepAliveDatabaseService: connectedResources.WithLabelValues(constants.KeepAliveDatabaseService),
constants.KeepAliveWindowsDesktop: connectedResources.WithLabelValues(constants.KeepAliveWindowsDesktop),
constants.KeepAliveWindowsDesktopService: connectedResources.WithLabelValues(constants.KeepAliveWindowsDesktopService),
}

Expand Down
87 changes: 87 additions & 0 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Auth interface {
UpsertKubernetesServer(context.Context, types.KubeServer) (*types.KeepAlive, error)
DeleteKubernetesServer(ctx context.Context, hostID, name string) error

UpsertWindowsDesktop(ctx context.Context, s types.WindowsDesktop) error
DeleteWindowsDesktop(ctx context.Context, hostID, name string) error

KeepAliveServer(context.Context, types.KeepAlive) error
UpsertInstance(ctx context.Context, instance types.Instance) error
}
Expand Down Expand Up @@ -98,6 +101,16 @@ const (
kubeUpsertRetryOk testEvent = "kube-upsert-retry-ok"
kubeUpsertRetryErr testEvent = "kube-upsert-retry-err"

windowsDesktopKeepAliveOk testEvent = "windows-desktop-keep-alive-ok"
windowsDesktopKeepAliveErr testEvent = "windows-desktop-keep-alive-err"
windowsDesktopKeepAliveDel testEvent = "windows-desktop-keep-alive-del"

windowsDesktopUpsertOk testEvent = "windows-desktop-upsert-ok"
windowsDesktopUpsertErr testEvent = "windows-desktop-upsert-err"

windowsDesktopUpsertRetryOk testEvent = "windows-desktop-upsert-retry-ok"
windowsDesktopUpsertRetryErr testEvent = "windows-desktop-upsert-retry-err"

instanceHeartbeatOk testEvent = "instance-heartbeat-ok"
instanceHeartbeatErr testEvent = "instance-heartbeat-err"

Expand Down Expand Up @@ -354,6 +367,7 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
"apps": len(handle.appServers),
"dbs": len(handle.databaseServers),
"kube": len(handle.kubernetesServers),
"desktops": len(handle.windowsDesktops),
"server_id": handle.Hello().ServerID,
}).Debug("Cleaning up resources in response to instance termination")
for _, app := range handle.appServers {
Expand All @@ -373,6 +387,12 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
log.Warnf("Failed to remove kube server %q on termination: %v.", handle.Hello().ServerID, err)
}
}

for _, desktop := range handle.windowsDesktops {
if err := c.auth.DeleteWindowsDesktop(c.closeContext, desktop.resource.GetHostID(), desktop.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove Windows desktop %q on termination: %v", handle.Hello().ServerID, err)
}
}
}

c.instanceHBVariableDuration.Dec()
Expand All @@ -399,9 +419,14 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
c.onDisconnectFunc(constants.KeepAliveKube, len(handle.kubernetesServers))
}

if len(handle.windowsDesktops) > 0 {
c.onDisconnectFunc(constants.KeepAliveWindowsDesktop, len(handle.windowsDesktops))
}

clear(handle.appServers)
clear(handle.databaseServers)
clear(handle.kubernetesServers)
clear(handle.windowsDesktops)
c.testEvent(handlerClose)
}()

Expand Down Expand Up @@ -587,6 +612,12 @@ func (c *Controller) handleHeartbeatMsg(handle *upstreamHandle, hb proto.Invento
}
}

if hb.WindowsDesktop != nil {
if err := c.handleWindowsDesktopHB(handle, hb.WindowsDesktop); err != nil {
return trace.Wrap(err)
}
}

return nil
}

Expand Down Expand Up @@ -779,6 +810,53 @@ func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetes
return nil
}

func (c *Controller) handleWindowsDesktopHB(handle *upstreamHandle, desktop *types.WindowsDesktopV3) error {
// the auth layer verifies that a stream's hello message matches the identity and capabilities of the
// client cert. after that point it is our responsibility to ensure that heartbeated information is
// consistent with the identity and capabilities claimed in the initial hello.
if !(handle.HasService(types.RoleWindowsDesktop) || handle.HasService(types.RoleProxy)) {
return trace.AccessDenied("control stream not configured to support Windows desktop heartbeats")
}
if desktop.GetHostID() != handle.Hello().ServerID {
return trace.AccessDenied("incorrect desktop ID (expected %q, got %q)", handle.Hello().ServerID, desktop.GetHostID())
}

if handle.windowsDesktops == nil {
handle.windowsDesktops = make(map[resourceKey]*heartBeatInfo[*types.WindowsDesktopV3])
}

key := resourceKey{hostID: desktop.GetHostID(), name: desktop.GetName()}

if _, ok := handle.windowsDesktops[key]; !ok {
c.onConnectFunc(constants.KeepAliveWindowsDesktop)
handle.windowsDesktops[key] = &heartBeatInfo[*types.WindowsDesktopV3]{}
}

now := time.Now()
desktop.SetExpiry(now.Add(c.serverTTL).UTC())

err := c.auth.UpsertWindowsDesktop(c.closeContext, desktop)
if err == nil {
c.testEvent(windowsDesktopUpsertOk)
// store the new lease and reset retry state
srv := handle.windowsDesktops[key]
// srv.lease = lease TODO(zmb3)
srv.retryUpsert = false
srv.resource = desktop
} else {
c.testEvent(windowsDesktopUpsertErr)
log.Warnf("Failed to upsert Windows desktop %q on heartbeat: %v.", handle.Hello().ServerID, err)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
srv := handle.windowsDesktops[key]
srv.lease = nil
srv.retryUpsert = true
srv.resource = desktop
}
return nil
}

func (c *Controller) handleAgentMetadata(handle *upstreamHandle, m proto.UpstreamInventoryAgentMetadata) {
handle.SetAgentMetadata(m)

Expand Down Expand Up @@ -824,6 +902,10 @@ func (c *Controller) keepAliveServer(handle *upstreamHandle, now time.Time) erro
return trace.Wrap(err)
}

if err := c.keepAliveWindowsDesktop(handle, now); err != nil {
return trace.Wrap(err)
}

return nil
}

Expand Down Expand Up @@ -1001,6 +1083,11 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
return nil
}

func (c *Controller) keepAliveWindowsDesktop(handle *upstreamHandle, now time.Time) error {
// TODO(zmb3)
return nil
}

// Close terminates all control streams registered with this controller. Control streams
// registered after Close() is called are closed immediately.
func (c *Controller) Close() error {
Expand Down
17 changes: 17 additions & 0 deletions lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ func (a *fakeAuth) DeleteKubernetesServer(ctx context.Context, hostID, name stri
return nil
}

// TODO(zmb3) do we need to return a types.KeepAlive?
func (a *fakeAuth) UpsertWindowsDesktop(ctx context.Context, s types.WindowsDesktop) error {
a.mu.Lock()
defer a.mu.Unlock()
a.upserts++

if a.failUpserts > 0 {
a.failUpserts--
return trace.Errorf("upsert failed as test condition")
}
return nil
}

func (a *fakeAuth) DeleteWindowsDesktop(ctx context.Context, hostID, name string) error {
return nil
}

func (a *fakeAuth) KeepAliveServer(_ context.Context, _ types.KeepAlive) error {
a.mu.Lock()
defer a.mu.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions lib/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ type upstreamHandle struct {

// kubernetesServers track kubernetesServers server details.
kubernetesServers map[resourceKey]*heartBeatInfo[*types.KubernetesServerV3]

// windowsDesktops tracks Windows desktop details
windowsDesktops map[resourceKey]*heartBeatInfo[*types.WindowsDesktopV3]
}

type resourceKey struct {
Expand Down
25 changes: 13 additions & 12 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,19 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(logger *slog
}

srv, err := desktop.NewWindowsService(desktop.WindowsServiceConfig{
DataDir: process.Config.DataDir,
Logger: process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentWindowsDesktop, process.id)),
Clock: process.Clock,
Authorizer: authorizer,
Emitter: conn.Client,
TLS: tlsConfig,
AccessPoint: accessPoint,
ConnLimiter: connLimiter,
LockWatcher: lockWatcher,
AuthClient: conn.Client,
Labels: cfg.WindowsDesktop.Labels,
HostLabelsFn: cfg.WindowsDesktop.HostLabels.LabelsForHost,
DataDir: process.Config.DataDir,
Logger: process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentWindowsDesktop, process.id)),
Clock: process.Clock,
InventoryHandle: process.inventoryHandle,
Authorizer: authorizer,
Emitter: conn.Client,
TLS: tlsConfig,
AccessPoint: accessPoint,
ConnLimiter: connLimiter,
LockWatcher: lockWatcher,
AuthClient: conn.Client,
Labels: cfg.WindowsDesktop.Labels,
HostLabelsFn: cfg.WindowsDesktop.HostLabels.LabelsForHost,
Heartbeat: desktop.HeartbeatConfig{
HostUUID: cfg.HostUUID,
PublicAddr: publicAddr,
Expand Down
Loading

0 comments on commit 5dda1b0

Please sign in to comment.