Skip to content

Commit

Permalink
Limits notification length, renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
vapopov committed Jul 27, 2024
1 parent 2b789f6 commit f0ec46b
Show file tree
Hide file tree
Showing 11 changed files with 1,002 additions and 1,010 deletions.
1,874 changes: 937 additions & 937 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2317,11 +2317,11 @@ message DownstreamInventoryPing {
// ping (used for testing/debug purposes).
message UpstreamInventoryPong {
uint64 ID = 1;
// LocalTime advertises the local time of the upstream.
google.protobuf.Timestamp LocalTime = 9 [
// SystemClock advertises the system clock of the upstream.
google.protobuf.Timestamp SystemClock = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "local_time,omitempty"
(gogoproto.jsontag) = "system_clock,omitempty"
];
}

Expand Down
4 changes: 2 additions & 2 deletions api/types/databaseserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ func (s *DatabaseServerV3) SetDatabase(database Database) error {
return nil
}

// GetProxyIDs returns a list of proxy ids this server is connected to.
// GetProxyID returns a list of proxy ids this server is connected to.
func (s *DatabaseServerV3) GetProxyIDs() []string {
return s.Spec.ProxyIDs
}

// SetProxyIDs sets the proxy ids this server is connected to.
// SetProxyID sets the proxy ids this server is connected to.
func (s *DatabaseServerV3) SetProxyIDs(proxyIDs []string) {
s.Spec.ProxyIDs = proxyIDs
}
Expand Down
6 changes: 3 additions & 3 deletions api/types/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type WindowsDesktopService interface {
ResourceWithLabels
// GetAddr returns the network address of this service.
GetAddr() string
// GetTeleportVersion returns the teleport binary version of this service.
// GetVersion returns the teleport binary version of this service.
GetTeleportVersion() string
// GetHostname returns the hostname of this service
GetHostname() string
Expand Down Expand Up @@ -105,12 +105,12 @@ func (s *WindowsDesktopServiceV3) GetTeleportVersion() string {
return s.Spec.TeleportVersion
}

// GetProxyIDs returns a list of proxy ids this server is connected to.
// GetProxyID returns a list of proxy ids this server is connected to.
func (s *WindowsDesktopServiceV3) GetProxyIDs() []string {
return s.Spec.ProxyIDs
}

// SetProxyIDs sets the proxy ids this server is connected to.
// SetProxyID sets the proxy ids this server is connected to.
func (s *WindowsDesktopServiceV3) SetProxyIDs(proxyIDs []string) {
s.Spec.ProxyIDs = proxyIDs
}
Expand Down
2 changes: 1 addition & 1 deletion api/types/kubernetes_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *KubernetesServerV3) GetProxyIDs() []string {
return s.Spec.ProxyIDs
}

// SetProxyIDs sets the proxy ids this server is connected to.
// SetProxyID sets the proxy ids this server is connected to.
func (s *KubernetesServerV3) SetProxyIDs(proxyIDs []string) {
s.Spec.ProxyIDs = proxyIDs
}
Expand Down
7 changes: 4 additions & 3 deletions integration/time_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestTimeSyncDifference(t *testing.T) {
require.NoError(t, err)

// Start the agent service with Node and WindowsDesktop capabilities.
agentClock := clockwork.NewFakeClockAt(time.Now().Add(5 * time.Hour))
agentClock := clockwork.NewFakeClockAt(time.Now())
agentCfg := servicecfg.MakeDefaultConfig()
agentCfg.Clock = agentClock
agentCfg.Version = defaults.TeleportConfigVersionV3
Expand Down Expand Up @@ -146,7 +146,8 @@ func TestTimeSyncDifference(t *testing.T) {
require.NoError(t, err, "timeout waiting for Teleport readiness")

// Must trigger the monitor watch logic to create the global notification.
authClock.Advance(time.Hour)
agentClock.Advance(15 * time.Minute)
authClock.Advance(10 * time.Minute)

err = retryutils.RetryStaticFor(20*time.Second, time.Second, func() error {
notifications, _, err := authService.GetAuthServer().ListGlobalNotifications(ctx, 100, "")
Expand All @@ -155,7 +156,7 @@ func TestTimeSyncDifference(t *testing.T) {
}
var found bool
for _, notification := range notifications {
found = found || notification.GetMetadata().GetName() == "cluster-monitor-time-sync"
found = found || notification.GetMetadata().GetName() == "cluster-monitor-system-clock-warning"
}
if !found {
return trace.BadParameter("expected notification is not found")
Expand Down
93 changes: 43 additions & 50 deletions lib/auth/server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,61 +31,54 @@ import (
headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/inventory"
"github.com/gravitational/teleport/lib/utils/interval"
)

const (
// timeCheckCycle is the period when local times are compared
// for collected resources from heartbeats, and notification must be added.
timeCheckCycle = 10 * time.Minute
// timeShiftThreshold is the duration threshold for triggering a warning
// systemClockCheckCycle is the period when system clock comparison is launched
// across all inventories, to be gathered for global notifications, if any.
systemClockCheckCycle = 10 * time.Minute
// systemClockThreshold is the duration threshold for triggering a warning
// if the time difference exceeds this threshold.
timeShiftThreshold = time.Minute
// timeCheckNotificationName is id for adding the global notification.
timeCheckNotificationName = "cluster-monitor-time-sync"
systemClockThreshold = time.Minute
// systemClockNotificationWarningName is the ID for adding the global notification.
systemClockNotificationWarningName = "cluster-monitor-system-clock-warning"
)

// inventoryMonitor stores info about resource and time difference with local time.
type inventoryMonitor struct {
serverID string
services types.SystemRoles
diff time.Duration
}

// String returns the inventory representation.
func (res *inventoryMonitor) String() string {
return fmt.Sprintf(
"%s[%s] is %s",
res.serverID,
res.services.String(),
durationText(res.diff),
)
}
// MonitorSystemTime runs the periodic check for iterating through all inventories
// to ping them and receive the system clock difference.
func (a *Server) MonitorSystemTime(ctx context.Context) error {
checkInterval := interval.New(interval.Config{
FirstDuration: time.Minute,
Duration: systemClockCheckCycle,
Clock: a.GetClock(),
Jitter: retryutils.NewSeventhJitter(),
})
defer checkInterval.Stop()

// MonitorNodeInfos consumes heartbeat events of other services to periodically
// compare the auth server time with the time of remote services,
// and notifying about the time difference between servers.
func (a *Server) MonitorNodeInfos(ctx context.Context) error {
ticker := a.clock.NewTicker(timeCheckCycle)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.Chan():
var inventories []inventoryMonitor
case <-checkInterval.Next():
var inventories []string
a.inventory.Iter(func(handle inventory.UpstreamHandle) {
info := handle.Hello()
diff, err := handle.TimeReconciliation(ctx, insecurerand.Uint64())
if err != nil {
slog.ErrorContext(ctx, "error getting time reconciliation")
return
}

if (diff > 0 && diff > timeShiftThreshold) || (diff < 0 && -diff > timeShiftThreshold) {
inventories = append(inventories, inventoryMonitor{
serverID: info.GetServerID(),
services: info.GetServices(),
diff: diff,
})
if (diff > 0 && diff > systemClockThreshold) || (diff < 0 && -diff > systemClockThreshold) {
inventories = append(inventories, fmt.Sprintf(
" - %s[%s] is %s",
info.GetServerID(),
types.SystemRoles(info.GetServices()).String(),
durationText(diff),
))
slog.WarnContext(ctx, "server time difference detected",
"server", info.GetServerID(),
"services", info.GetServices(),
Expand All @@ -97,20 +90,21 @@ func (a *Server) MonitorNodeInfos(ctx context.Context) error {
if len(inventories) > 0 {
err := upsertGlobalNotification(ctx, a.Services, generateNotificationMessage(inventories))
if err != nil {
slog.ErrorContext(ctx, "can't set notification about time difference", "error", err)
slog.ErrorContext(ctx, "can't set notification about system clock issue", "error", err)
continue
}
}
}
}
}

// upsertGlobalNotification sets predefined global notification for notifying the issues with the cluster
// servers related to the time difference in nodes.
// servers related to the system clock difference in nodes.
func upsertGlobalNotification(ctx context.Context, services *Services, text string) error {
_, err := services.UpsertGlobalNotification(ctx, &notificationsv1.GlobalNotification{
Kind: types.KindGlobalNotification,
Version: types.V1,
Metadata: &headerv1.Metadata{Name: timeCheckNotificationName},
Metadata: &headerv1.Metadata{Name: systemClockNotificationWarningName},
Spec: &notificationsv1.GlobalNotificationSpec{
Matcher: &notificationsv1.GlobalNotificationSpec_All{
All: true,
Expand All @@ -119,7 +113,7 @@ func upsertGlobalNotification(ctx context.Context, services *Services, text stri
SubKind: types.NotificationDefaultWarningSubKind,
Spec: &notificationsv1.NotificationSpec{},
Metadata: &headerv1.Metadata{
Name: timeCheckNotificationName,
Name: systemClockNotificationWarningName,
Labels: map[string]string{
types.NotificationTitleLabel: text,
},
Expand All @@ -130,22 +124,21 @@ func upsertGlobalNotification(ctx context.Context, services *Services, text stri
return trace.Wrap(err)
}

// generateNotificationMessage formats the notification message for the user with detailed information
// about the server name and time difference in comparison with the auth node.
func generateNotificationMessage(inventories []inventoryMonitor) string {
var messages []string
for _, inv := range inventories {
messages = append(messages, inv.String())
// generateNotificationMessage formats the notification message with the inventory list.
func generateNotificationMessage(messages []string) string {
var more string
if len(messages) > 10 {
more = fmt.Sprintf("Only 10 servers are shown out of the list of %d.", len(messages))
messages = messages[:10]
}

return "Incorrect system clock detected in the cluster, which may lead to certificate validation issues.\n" +
"Ensure that the clock is accurate on all nodes to avoid potential access problems.\n" +
"All comparisons are made with the Auth service system clock." +
"List of servers with a time drift: \n" + strings.Join(messages, "\n")
"List of servers with a time drift: \n" + strings.Join(messages, "\n") + more
}

// durationText formats specified duration to text by adding suffix ahead/behind and
// transforms nanoseconds to formatted time with hours, minutes, seconds.
// durationText formats the specified duration to text by adding the suffix "ahead" or "behind"
// and converts nanoseconds to a formatted text with hours, minutes and seconds.
func durationText(duration time.Duration) string {
if duration > 0 {
return fmt.Sprintf("%s ahead", duration.String())
Expand Down
4 changes: 2 additions & 2 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInvent
rsp := pingResponse{
reqDuration: c.clock.Since(pending.start),
}
if !msg.LocalTime.IsZero() {
rsp.clockDiff = c.clock.Since(msg.LocalTime)
if !msg.SystemClock.IsZero() {
rsp.clockDiff = c.clock.Since(msg.SystemClock)
}
pending.rspC <- rsp
delete(handle.pings, msg.ID)
Expand Down
4 changes: 2 additions & 2 deletions lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,8 @@ func TestTimeReconciliation(t *testing.T) {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
LocalTime: time.Now().Add(-time.Minute).UTC(),
ID: msg.(proto.DownstreamInventoryPing).ID,
SystemClock: time.Now().Add(-time.Minute).UTC(),
})
case <-downstream.Done():
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions lib/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,9 @@ func (h *upstreamHandle) TimeReconciliation(ctx context.Context, id uint64) (d t
case rsp.clockDiff == 0:
return 0, nil
case rsp.clockDiff > 0:
return rsp.clockDiff - rsp.reqDuration, nil
return rsp.clockDiff - rsp.reqDuration/2, nil
default:
return rsp.clockDiff + rsp.reqDuration, nil
return rsp.clockDiff + rsp.reqDuration/2, nil
}
case <-h.Done():
return 0, trace.Errorf("failed to recv upstream pong (stream closed)")
Expand Down
8 changes: 3 additions & 5 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,8 +1309,8 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
process.inventoryHandle.RegisterPingHandler(func(sender inventory.DownstreamSender, ping proto.DownstreamInventoryPing) {
process.logger.InfoContext(process.ExitContext(), "Handling incoming inventory ping.", "id", ping.ID)
err := sender.Send(process.ExitContext(), proto.UpstreamInventoryPong{
ID: ping.ID,
LocalTime: process.Clock.Now().UTC(),
ID: ping.ID,
SystemClock: process.Clock.Now().UTC(),
})
if err != nil {
process.logger.WarnContext(process.ExitContext(), "Failed to respond to inventory ping.", "id", ping.ID, "error", err)
Expand Down Expand Up @@ -2512,7 +2512,7 @@ func (process *TeleportProcess) initAuthService() error {
return trace.Wrap(authServer.ReconcileServerInfos(process.GracefulExitContext()))
})
process.RegisterFunc("auth.server.monitor", func() error {
return trace.Wrap(authServer.MonitorNodeInfos(process.GracefulExitContext()))
return trace.Wrap(authServer.MonitorSystemTime(process.GracefulExitContext()))
})

// execute this when process is asked to exit:
Expand Down Expand Up @@ -2960,7 +2960,6 @@ func (process *TeleportProcess) initSSH() error {
Component: teleport.ComponentNode,
Logger: process.log.WithField(teleport.ComponentKey, teleport.Component(teleport.ComponentNode, process.id)).WithField(teleport.ComponentKey, "sessionctrl"),
TracerProvider: process.TracingProvider,
Clock: process.Clock,
ServerID: serverID,
})
if err != nil {
Expand Down Expand Up @@ -3002,7 +3001,6 @@ func (process *TeleportProcess) initSSH() error {
regular.SetTracerProvider(process.TracingProvider),
regular.SetSessionController(sessionController),
regular.SetPublicAddrs(cfg.SSH.PublicAddrs),
regular.SetClock(process.Clock),
)
if err != nil {
return trace.Wrap(err)
Expand Down

0 comments on commit f0ec46b

Please sign in to comment.