Skip to content

Commit

Permalink
Heartbeat static desktops with the inventory control stream
Browse files Browse the repository at this point in the history
  • Loading branch information
zmb3 committed Oct 26, 2024
1 parent e3a4f3c commit 9175970
Show file tree
Hide file tree
Showing 11 changed files with 1,261 additions and 990 deletions.
1,957 changes: 1,009 additions & 948 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ const (
// KeepAliveDatabase is the keep alive type for database server.
KeepAliveDatabase = "db"

// KeepAliveWindowsDesktop is the keep alive type for a Windows desktop.
KeepAliveWindowsDesktop = "windows_desktop"

// KeepAliveWindowsDesktopService is the keep alive type for a Windows
// desktop service.
KeepAliveWindowsDesktopService = "windows_desktop_service"
Expand Down
4 changes: 3 additions & 1 deletion api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2493,8 +2493,10 @@ message InventoryHeartbeat {
types.AppServerV3 AppServer = 2;
// DatabaseServer is a complete db server spec to be heartbeated.
types.DatabaseServerV3 DatabaseServer = 3;
// KubeServer is a complete kube server spec to be heartbeated.
// KubernetesServer is a complete kube server spec to be heartbeated.
types.KubernetesServerV3 KubernetesServer = 4;
// WindowsDesktop is a complete Windows desktop spec to be heartbeated.
types.WindowsDesktopV3 WindowsDesktop = 5;
}

// UpstreamInventoryGoodbye informs the upstream service that instance
Expand Down
16 changes: 9 additions & 7 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4715,13 +4715,15 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
Version: teleport.Version,
ServerID: a.ServerID,
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
NodeHeartbeats: true,
AppHeartbeats: true,
AppCleanup: true,
DatabaseHeartbeats: true,
DatabaseCleanup: true,
KubernetesHeartbeats: true,
KubernetesCleanup: true,
NodeHeartbeats: true,
AppHeartbeats: true,
AppCleanup: true,
DatabaseHeartbeats: true,
DatabaseCleanup: true,
KubernetesHeartbeats: true,
KubernetesCleanup: true,
WindowsDesktopHeartbeats: true,
WindowsDesktopCleanup: true,
},
}
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {
Expand Down
1 change: 1 addition & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ var connectedResourceGauges = map[string]prometheus.Gauge{
constants.KeepAliveApp: connectedResources.WithLabelValues(constants.KeepAliveApp),
constants.KeepAliveDatabase: connectedResources.WithLabelValues(constants.KeepAliveDatabase),
constants.KeepAliveDatabaseService: connectedResources.WithLabelValues(constants.KeepAliveDatabaseService),
constants.KeepAliveWindowsDesktop: connectedResources.WithLabelValues(constants.KeepAliveWindowsDesktop),
constants.KeepAliveWindowsDesktopService: connectedResources.WithLabelValues(constants.KeepAliveWindowsDesktopService),
}

Expand Down
87 changes: 87 additions & 0 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Auth interface {
UpsertKubernetesServer(context.Context, types.KubeServer) (*types.KeepAlive, error)
DeleteKubernetesServer(ctx context.Context, hostID, name string) error

UpsertWindowsDesktop(ctx context.Context, s types.WindowsDesktop) error
DeleteWindowsDesktop(ctx context.Context, hostID, name string) error

KeepAliveServer(context.Context, types.KeepAlive) error
UpsertInstance(ctx context.Context, instance types.Instance) error
}
Expand Down Expand Up @@ -98,6 +101,16 @@ const (
kubeUpsertRetryOk testEvent = "kube-upsert-retry-ok"
kubeUpsertRetryErr testEvent = "kube-upsert-retry-err"

windowsDesktopKeepAliveOk testEvent = "windows-desktop-keep-alive-ok"
windowsDesktopKeepAliveErr testEvent = "windows-desktop-keep-alive-err"
windowsDesktopKeepAliveDel testEvent = "windows-desktop-keep-alive-del"

windowsDesktopUpsertOk testEvent = "windows-desktop-upsert-ok"
windowsDesktopUpsertErr testEvent = "windows-desktop-upsert-err"

windowsDesktopUpsertRetryOk testEvent = "windows-desktop-upsert-retry-ok"
windowsDesktopUpsertRetryErr testEvent = "windows-desktop-upsert-retry-err"

instanceHeartbeatOk testEvent = "instance-heartbeat-ok"
instanceHeartbeatErr testEvent = "instance-heartbeat-err"

Expand Down Expand Up @@ -354,6 +367,7 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
"apps": len(handle.appServers),
"dbs": len(handle.databaseServers),
"kube": len(handle.kubernetesServers),
"desktops": len(handle.windowsDesktops),
"server_id": handle.Hello().ServerID,
}).Debug("Cleaning up resources in response to instance termination")
for _, app := range handle.appServers {
Expand All @@ -373,6 +387,12 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
log.Warnf("Failed to remove kube server %q on termination: %v.", handle.Hello().ServerID, err)
}
}

for _, desktop := range handle.windowsDesktops {
if err := c.auth.DeleteWindowsDesktop(c.closeContext, desktop.resource.GetHostID(), desktop.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove Windows desktop %q on termination: %v", handle.Hello().ServerID, err)
}
}
}

c.instanceHBVariableDuration.Dec()
Expand All @@ -399,9 +419,14 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
c.onDisconnectFunc(constants.KeepAliveKube, len(handle.kubernetesServers))
}

if len(handle.windowsDesktops) > 0 {
c.onDisconnectFunc(constants.KeepAliveWindowsDesktop, len(handle.windowsDesktops))
}

clear(handle.appServers)
clear(handle.databaseServers)
clear(handle.kubernetesServers)
clear(handle.windowsDesktops)
c.testEvent(handlerClose)
}()

Expand Down Expand Up @@ -587,6 +612,12 @@ func (c *Controller) handleHeartbeatMsg(handle *upstreamHandle, hb proto.Invento
}
}

if hb.WindowsDesktop != nil {
if err := c.handleWindowsDesktopHB(handle, hb.WindowsDesktop); err != nil {
return trace.Wrap(err)
}
}

return nil
}

Expand Down Expand Up @@ -779,6 +810,53 @@ func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetes
return nil
}

func (c *Controller) handleWindowsDesktopHB(handle *upstreamHandle, desktop *types.WindowsDesktopV3) error {
// the auth layer verifies that a stream's hello message matches the identity and capabilities of the
// client cert. after that point it is our responsibility to ensure that heartbeated information is
// consistent with the identity and capabilities claimed in the initial hello.
if !(handle.HasService(types.RoleWindowsDesktop) || handle.HasService(types.RoleProxy)) {
return trace.AccessDenied("control stream not configured to support Windows desktop heartbeats")
}
if desktop.GetHostID() != handle.Hello().ServerID {
return trace.AccessDenied("incorrect desktop ID (expected %q, got %q)", handle.Hello().ServerID, desktop.GetHostID())
}

if handle.windowsDesktops == nil {
handle.windowsDesktops = make(map[resourceKey]*heartBeatInfo[*types.WindowsDesktopV3])
}

key := resourceKey{hostID: desktop.GetHostID(), name: desktop.GetName()}

if _, ok := handle.windowsDesktops[key]; !ok {
c.onConnectFunc(constants.KeepAliveWindowsDesktop)
handle.windowsDesktops[key] = &heartBeatInfo[*types.WindowsDesktopV3]{}
}

now := time.Now()
desktop.SetExpiry(now.Add(c.serverTTL).UTC())

err := c.auth.UpsertWindowsDesktop(c.closeContext, desktop)
if err == nil {
c.testEvent(windowsDesktopUpsertOk)
// store the new lease and reset retry state
srv := handle.windowsDesktops[key]
// srv.lease = lease TODO(zmb3)
srv.retryUpsert = false
srv.resource = desktop
} else {
c.testEvent(windowsDesktopUpsertErr)
log.Warnf("Failed to upsert Windows desktop %q on heartbeat: %v.", handle.Hello().ServerID, err)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
srv := handle.windowsDesktops[key]
srv.lease = nil
srv.retryUpsert = true
srv.resource = desktop
}
return nil
}

func (c *Controller) handleAgentMetadata(handle *upstreamHandle, m proto.UpstreamInventoryAgentMetadata) {
handle.SetAgentMetadata(m)

Expand Down Expand Up @@ -824,6 +902,10 @@ func (c *Controller) keepAliveServer(handle *upstreamHandle, now time.Time) erro
return trace.Wrap(err)
}

if err := c.keepAliveWindowsDesktop(handle, now); err != nil {
return trace.Wrap(err)
}

return nil
}

Expand Down Expand Up @@ -1001,6 +1083,11 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
return nil
}

func (c *Controller) keepAliveWindowsDesktop(handle *upstreamHandle, now time.Time) error {
// TODO(zmb3)
return nil
}

// Close terminates all control streams registered with this controller. Control streams
// registered after Close() is called are closed immediately.
func (c *Controller) Close() error {
Expand Down
17 changes: 17 additions & 0 deletions lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ func (a *fakeAuth) DeleteKubernetesServer(ctx context.Context, hostID, name stri
return nil
}

// TODO(zmb3) do we need to return a types.KeepAlive?
func (a *fakeAuth) UpsertWindowsDesktop(ctx context.Context, s types.WindowsDesktop) error {
a.mu.Lock()
defer a.mu.Unlock()
a.upserts++

if a.failUpserts > 0 {
a.failUpserts--
return trace.Errorf("upsert failed as test condition")
}
return nil
}

func (a *fakeAuth) DeleteWindowsDesktop(ctx context.Context, hostID, name string) error {
return nil
}

func (a *fakeAuth) KeepAliveServer(_ context.Context, _ types.KeepAlive) error {
a.mu.Lock()
defer a.mu.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions lib/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ type upstreamHandle struct {

// kubernetesServers track kubernetesServers server details.
kubernetesServers map[resourceKey]*heartBeatInfo[*types.KubernetesServerV3]

// windowsDesktops tracks Windows desktop details
windowsDesktops map[resourceKey]*heartBeatInfo[*types.WindowsDesktopV3]
}

type resourceKey struct {
Expand Down
9 changes: 5 additions & 4 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(logger *slog
Labels: cfg.WindowsDesktop.Labels,
HostLabelsFn: cfg.WindowsDesktop.HostLabels.LabelsForHost,
Heartbeat: desktop.HeartbeatConfig{
HostUUID: cfg.HostUUID,
PublicAddr: publicAddr,
StaticHosts: cfg.WindowsDesktop.StaticHosts,
OnHeartbeat: process.OnHeartbeat(teleport.ComponentWindowsDesktop),
HostUUID: cfg.HostUUID,
PublicAddr: publicAddr,
StaticHosts: cfg.WindowsDesktop.StaticHosts,
OnHeartbeat: process.OnHeartbeat(teleport.ComponentWindowsDesktop),
InventoryHandle: process.inventoryHandle,
},
ShowDesktopWallpaper: cfg.WindowsDesktop.ShowDesktopWallpaper,
LDAPConfig: windows.LDAPConfig(cfg.WindowsDesktop.LDAP),
Expand Down
70 changes: 40 additions & 30 deletions lib/srv/desktop/windows_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/gravitational/teleport/lib/defaults"
libevents "github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/recorder"
"github.com/gravitational/teleport/lib/inventory"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/reversetunnel"
Expand Down Expand Up @@ -123,6 +124,7 @@ type WindowsService struct {
ldapConfigured bool
ldapInitialized bool
ldapCertRenew *time.Timer
heartbeats map[string]*srv.HeartbeatV2

// lastDisoveryResults stores the results of the most recent LDAP search
// when desktop discovery is enabled.
Expand All @@ -144,7 +146,7 @@ type WindowsService struct {
// creating shared directory audit events.
auditCache sharedDirectoryAuditCache

// NLA indicates whether this service will attempt to perform
// enableNLA indicates whether this service will attempt to perform
// Network Level Authentication (NLA) when attempting to connect
// to domain-joined Windows hosts
enableNLA bool
Expand Down Expand Up @@ -225,6 +227,8 @@ type HeartbeatConfig struct {
OnHeartbeat func(error)
// StaticHosts is an optional list of static Windows hosts to register
StaticHosts []servicecfg.WindowsHost
// InventoryHandle is used to send desktop heartbeats via the inventory control stream.
InventoryHandle inventory.DownstreamHandle
}

func (cfg *WindowsServiceConfig) checkAndSetDiscoveryDefaults() error {
Expand Down Expand Up @@ -297,6 +301,9 @@ func (cfg *HeartbeatConfig) CheckAndSetDefaults() error {
if cfg.OnHeartbeat == nil {
return trace.BadParameter("HeartbeatConfig is missing OnHeartbeat")
}
if cfg.InventoryHandle == nil {
return trace.BadParameter("HeartbeatConfig is missing InventoryHandle")
}
return nil
}

Expand Down Expand Up @@ -360,6 +367,7 @@ func NewWindowsService(cfg WindowsServiceConfig) (*WindowsService, error) {
ClusterName: clustername.GetClusterName(),
AcceptedUsage: []string{teleport.UsageWindowsDesktopOnly},
},
heartbeats: make(map[string]*srv.HeartbeatV2),
dnsResolver: resolver,
lc: &windows.LDAPClient{Cfg: cfg.LDAPConfig},
clusterName: clusterName.GetClusterName(),
Expand Down Expand Up @@ -407,7 +415,7 @@ func NewWindowsService(cfg WindowsServiceConfig) (*WindowsService, error) {
return nil, trace.Wrap(err)
}

if err := s.startStaticHostHeartbeats(); err != nil {
if err := s.startInventoryHeartbeats(); err != nil {
return nil, trace.Wrap(err)
}

Expand Down Expand Up @@ -606,36 +614,37 @@ func (s *WindowsService) startServiceHeartbeat() error {
// service itself is running.
func (s *WindowsService) startStaticHostHeartbeats() error {
for _, host := range s.cfg.Heartbeat.StaticHosts {
if err := s.startStaticHostHeartbeat(host); err != nil {
return err
hb, err := srv.NewWindowsDesktopHeartbeat(srv.HeartbeatV2Config[*types.WindowsDesktopV3]{
InventoryHandle: s.cfg.Heartbeat.InventoryHandle,
Announcer: s.cfg.AccessPoint,
GetResource: s.staticHostHeartbeatInfo(host, s.cfg.HostLabelsFn),
OnHeartbeat: s.cfg.Heartbeat.OnHeartbeat,
})
if err != nil {
return trace.Wrap(err)
}

go hb.Run()

s.mu.Lock()
s.heartbeats[host.Name] = hb
s.mu.Unlock()
}
return nil
}

// startStaticHostHeartbeats spawns heartbeat goroutine for single host
func (s *WindowsService) startStaticHostHeartbeat(host servicecfg.WindowsHost) error {
heartbeat, err := srv.NewHeartbeat(srv.HeartbeatConfig{
Context: s.closeCtx,
Component: teleport.ComponentWindowsDesktop,
Mode: srv.HeartbeatModeWindowsDesktop,
Announcer: s.cfg.AccessPoint,
GetServerInfo: s.staticHostHeartbeatInfo(host, s.cfg.HostLabelsFn),
KeepAlivePeriod: apidefaults.ServerKeepAliveTTL(),
AnnouncePeriod: apidefaults.ServerAnnounceTTL/2 + utils.RandomDuration(apidefaults.ServerAnnounceTTL/10),
CheckPeriod: 5 * time.Minute,
ServerTTL: apidefaults.ServerAnnounceTTL,
OnHeartbeat: s.cfg.Heartbeat.OnHeartbeat,
})
if err != nil {
return trace.Wrap(err)
// TODO(zmb3): where do I use this?
func (s *WindowsService) stopHeartbeat(name string) error {
s.mu.Lock()
defer s.mu.Unlock()

hb, ok := s.heartbeats[name]
if !ok {
return nil
}
go func() {
if err := heartbeat.Run(); err != nil {
s.cfg.Logger.ErrorContext(s.closeCtx, "static host heartbeat ended", "error", err)
}
}()
return nil

delete(s.heartbeats, name)
return trace.Wrap(hb.Close())
}

// Close instructs the server to stop accepting new connections and abort all
Expand Down Expand Up @@ -1147,11 +1156,12 @@ func (s *WindowsService) getServiceHeartbeatInfo() (types.Resource, error) {
}

// staticHostHeartbeatInfo generates the Windows Desktop resource
// for heartbeating statically defined hosts
func (s *WindowsService) staticHostHeartbeatInfo(host servicecfg.WindowsHost,
// for heartbeating statically defined hosts.
func (s *WindowsService) staticHostHeartbeatInfo(
host servicecfg.WindowsHost,
getHostLabels func(string) map[string]string,
) func() (types.Resource, error) {
return func() (types.Resource, error) {
) func(ctx context.Context) (*types.WindowsDesktopV3, error) {
return func(ctx context.Context) (*types.WindowsDesktopV3, error) {
addr := host.Address.String()
labels := getHostLabels(addr)
for k, v := range host.Labels {
Expand Down
Loading

0 comments on commit 9175970

Please sign in to comment.