Skip to content

Commit

Permalink
[v15] Cache the result of TeleportProcess.GetRotation (#50281)
Browse files Browse the repository at this point in the history
* Cache the result of TeleportProcess.GetRotation

* Backport FnCache.Remove
  • Loading branch information
espadolini authored Dec 16, 2024
1 parent 1abab08 commit 43cc819
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
8 changes: 6 additions & 2 deletions lib/service/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,14 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec
process.logger.WarnContext(process.ExitContext(), "Failed to write identity to storage.", "identity", role, "error", err)
}

if err := process.storage.WriteState(role, state.StateV2{
err = process.storage.WriteState(role, state.StateV2{
Spec: state.StateSpecV2{
Rotation: ca.GetRotation(),
InitialLocalVersion: teleport.Version,
},
}); err != nil {
})
process.rotationCache.Remove(role)
if err != nil {
return nil, trace.NewAggregate(err, connector.Close())
}
process.logger.InfoContext(process.ExitContext(), "The process successfully wrote the credentials and state to the disk.", "identity", role)
Expand Down Expand Up @@ -986,6 +988,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState state.StateV2
}
localState.Spec.Rotation = remote
err = storage.WriteState(id.Role, localState)
process.rotationCache.Remove(id.Role)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -1044,6 +1047,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState state.StateV2
// only update local phase, there is no need to reload
localState.Spec.Rotation = remote
err = storage.WriteState(id.Role, localState)
process.rotationCache.Remove(id.Role)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
34 changes: 31 additions & 3 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ type TeleportProcess struct {
// storage is a server local storage
storage *storage.ProcessStorage

// rotationCache is a TTL cache for GetRotation, since it might get called
// frequently if the agent is heartbeating multiple resources. Keys are
// [types.SystemRole], values are [*types.Rotation].
rotationCache *utils.FnCache

// id is a process id - used to identify different processes
// during in-process reloads.
id string
Expand Down Expand Up @@ -968,6 +973,19 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
cfg.Clock = clockwork.NewRealClock()
}

// full heartbeat announces are on average every 2/3 * 6/7 of the default
// announce TTL, so we pick a slightly shorter TTL here
const rotationCacheTTL = apidefaults.ServerAnnounceTTL / 2
rotationCache, err := utils.NewFnCache(utils.FnCacheConfig{
TTL: rotationCacheTTL,
Clock: cfg.Clock,
Context: supervisor.ExitContext(),
ReloadOnErr: true,
})
if err != nil {
return nil, trace.Wrap(err)
}

if cfg.PluginRegistry == nil {
cfg.PluginRegistry = plugin.NewRegistry()
}
Expand Down Expand Up @@ -1061,6 +1079,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
connectors: make(map[types.SystemRole]*Connector),
importedDescriptors: cfg.FileDescriptors,
storage: storage,
rotationCache: rotationCache,
id: processID,
log: cfg.Log,
logger: cfg.Logger,
Expand Down Expand Up @@ -2730,13 +2749,22 @@ func (process *TeleportProcess) NewLocalCache(clt authclient.ClientI, setupConfi
}, clt)
}

// GetRotation returns the process rotation.
// GetRotation returns the process rotation. The result is internally cached for
// a few minutes, so anything that must get the latest possible version should
// use process.storage.GetState directly, instead (writes to the state that this
// process knows about will invalidate the cache, however).
func (process *TeleportProcess) GetRotation(role types.SystemRole) (*types.Rotation, error) {
state, err := process.storage.GetState(context.TODO(), role)
rotation, err := utils.FnCacheGet(process.ExitContext(), process.rotationCache, role, func(ctx context.Context) (*types.Rotation, error) {
state, err := process.storage.GetState(ctx, role)
if err != nil {
return nil, trace.Wrap(err)
}
return &state.Spec.Rotation, nil
})
if err != nil {
return nil, trace.Wrap(err)
}
return &state.Spec.Rotation, nil
return rotation, nil
}

func (process *TeleportProcess) proxyPublicAddr() utils.NetAddr {
Expand Down
7 changes: 7 additions & 0 deletions lib/utils/fncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ func (c *FnCache) Shutdown(ctx context.Context) {
}
}

// Remove purges a specific item in the cache.
func (c *FnCache) Remove(key any) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.entries, key)
}

// RemoveExpired purges any items from the cache which have exceeded their TTL.
func (c *FnCache) RemoveExpired() {
c.mu.Lock()
Expand Down

0 comments on commit 43cc819

Please sign in to comment.