From 43cc81966bb54357d8828b2911b265733ba7e929 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Mon, 16 Dec 2024 18:05:11 +0100 Subject: [PATCH] [v15] Cache the result of `TeleportProcess.GetRotation` (#50281) * Cache the result of TeleportProcess.GetRotation * Backport FnCache.Remove --- lib/service/connect.go | 8 ++++++-- lib/service/service.go | 34 +++++++++++++++++++++++++++++++--- lib/utils/fncache.go | 7 +++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/lib/service/connect.go b/lib/service/connect.go index 5baf87585548d..6049515f90d54 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -553,12 +553,14 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec process.logger.WarnContext(process.ExitContext(), "Failed to write identity to storage.", "identity", role, "error", err) } - if err := process.storage.WriteState(role, state.StateV2{ + err = process.storage.WriteState(role, state.StateV2{ Spec: state.StateSpecV2{ Rotation: ca.GetRotation(), InitialLocalVersion: teleport.Version, }, - }); err != nil { + }) + process.rotationCache.Remove(role) + if err != nil { return nil, trace.NewAggregate(err, connector.Close()) } process.logger.InfoContext(process.ExitContext(), "The process successfully wrote the credentials and state to the disk.", "identity", role) @@ -986,6 +988,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState state.StateV2 } localState.Spec.Rotation = remote err = storage.WriteState(id.Role, localState) + process.rotationCache.Remove(id.Role) if err != nil { return trace.Wrap(err) } @@ -1044,6 +1047,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState state.StateV2 // only update local phase, there is no need to reload localState.Spec.Rotation = remote err = storage.WriteState(id.Role, localState) + process.rotationCache.Remove(id.Role) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/service/service.go b/lib/service/service.go index 283e04140ac63..52f0c3e110f49 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -426,6 +426,11 @@ type TeleportProcess struct { // storage is a server local storage storage *storage.ProcessStorage + // rotationCache is a TTL cache for GetRotation, since it might get called + // frequently if the agent is heartbeating multiple resources. Keys are + // [types.SystemRole], values are [*types.Rotation]. + rotationCache *utils.FnCache + // id is a process id - used to identify different processes // during in-process reloads. id string @@ -968,6 +973,19 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { cfg.Clock = clockwork.NewRealClock() } + // full heartbeat announces are on average every 2/3 * 6/7 of the default + // announce TTL, so we pick a slightly shorter TTL here + const rotationCacheTTL = apidefaults.ServerAnnounceTTL / 2 + rotationCache, err := utils.NewFnCache(utils.FnCacheConfig{ + TTL: rotationCacheTTL, + Clock: cfg.Clock, + Context: supervisor.ExitContext(), + ReloadOnErr: true, + }) + if err != nil { + return nil, trace.Wrap(err) + } + if cfg.PluginRegistry == nil { cfg.PluginRegistry = plugin.NewRegistry() } @@ -1061,6 +1079,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { connectors: make(map[types.SystemRole]*Connector), importedDescriptors: cfg.FileDescriptors, storage: storage, + rotationCache: rotationCache, id: processID, log: cfg.Log, logger: cfg.Logger, @@ -2730,13 +2749,22 @@ func (process *TeleportProcess) NewLocalCache(clt authclient.ClientI, setupConfi }, clt) } -// GetRotation returns the process rotation. +// GetRotation returns the process rotation. The result is internally cached for +// a few minutes, so anything that must get the latest possible version should +// use process.storage.GetState directly, instead (writes to the state that this +// process knows about will invalidate the cache, however). func (process *TeleportProcess) GetRotation(role types.SystemRole) (*types.Rotation, error) { - state, err := process.storage.GetState(context.TODO(), role) + rotation, err := utils.FnCacheGet(process.ExitContext(), process.rotationCache, role, func(ctx context.Context) (*types.Rotation, error) { + state, err := process.storage.GetState(ctx, role) + if err != nil { + return nil, trace.Wrap(err) + } + return &state.Spec.Rotation, nil + }) if err != nil { return nil, trace.Wrap(err) } - return &state.Spec.Rotation, nil + return rotation, nil } func (process *TeleportProcess) proxyPublicAddr() utils.NetAddr { diff --git a/lib/utils/fncache.go b/lib/utils/fncache.go index b76a48602eacb..71bbc1607a0c1 100644 --- a/lib/utils/fncache.go +++ b/lib/utils/fncache.go @@ -170,6 +170,13 @@ func (c *FnCache) Shutdown(ctx context.Context) { } } +// Remove purges a specific item in the cache. +func (c *FnCache) Remove(key any) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.entries, key) +} + // RemoveExpired purges any items from the cache which have exceeded their TTL. func (c *FnCache) RemoveExpired() { c.mu.Lock()