From 3adf8b18e1660509f7a5f4a0fc9e92263f3e5ec2 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Mon, 28 Oct 2024 12:22:35 -0400 Subject: [PATCH 1/3] cache /find endpoint --- lib/web/apiserver.go | 122 ++++++++++++++++++++------------- lib/web/apiserver_ping_test.go | 5 ++ 2 files changed, 80 insertions(+), 47 deletions(-) diff --git a/lib/web/apiserver.go b/lib/web/apiserver.go index faaffa2ff3e1e..0df54870ea7a3 100644 --- a/lib/web/apiserver.go +++ b/lib/web/apiserver.go @@ -130,6 +130,10 @@ const ( // DefaultFeatureWatchInterval is the default time in which the feature watcher // should ping the auth server to check for updated features DefaultFeatureWatchInterval = time.Minute * 5 + // findEndpointCacheTTL is the cache TTL for the find endpoint generic answer. + // This cache is here to protect against accidental or intentional DDoS, the TTL must be low to quickly reflect + // cluster configuration changes. + findEndpointCacheTTL = 10 * time.Second ) // healthCheckAppServerFunc defines a function used to perform a health check @@ -173,6 +177,11 @@ type Handler struct { // an authenticated websocket so unauthenticated sockets dont get left // open. wsIODeadline time.Duration + + // findCache is used to cache the find endpoint answer. As this endpoint is unprotected and has high rate-limits, + // each call must cause minimal work. The cached answer can be modulated after, for example if the caller specified + // a specific Automatic Updates UUID or group. + findEndpointCache *utils.FnCache } // HandlerOption is a functional argument - an option that can be passed @@ -452,16 +461,28 @@ func NewHandler(cfg Config, opts ...HandlerOption) (*APIHandler, error) { const apiPrefix = "/" + teleport.WebAPIVersion cfg.SetDefaults() + clock := clockwork.NewRealClock() + + findCache, err := utils.NewFnCache(utils.FnCacheConfig{ + TTL: findEndpointCacheTTL, + Clock: clock, + Context: cfg.Context, + ReloadOnErr: false, + }) + if err != nil { + return nil, trace.Wrap(err, "creating /find cache") + } h := &Handler{ cfg: cfg, log: newPackageLogger(), logger: slog.Default().With(teleport.ComponentKey, teleport.ComponentWeb), - clock: clockwork.NewRealClock(), + clock: clock, clusterFeatures: cfg.ClusterFeatures, healthCheckAppServer: cfg.HealthCheckAppServer, tracer: cfg.TracerProvider.Tracer(teleport.ComponentWeb), wsIODeadline: wsIODeadline, + findEndpointCache: findCache, } if automaticUpgrades(cfg.ClusterFeatures) && h.cfg.AutomaticUpgradesChannels == nil { @@ -1521,56 +1542,63 @@ func (h *Handler) ping(w http.ResponseWriter, r *http.Request, p httprouter.Para } func (h *Handler) find(w http.ResponseWriter, r *http.Request, p httprouter.Params) (interface{}, error) { - // TODO(jent,espadolini): add a time-based cache to further reduce load on this endpoint - proxyConfig, err := h.cfg.ProxySettings.GetProxySettings(r.Context()) - if err != nil { - return nil, trace.Wrap(err) - } - authPref, err := h.cfg.AccessPoint.GetAuthPreference(r.Context()) - if err != nil { - return nil, trace.Wrap(err) - } - response := webclient.PingResponse{ - Auth: webclient.AuthenticationSettings{ - // Nodes need the signature algorithm suite when joining to generate - // keys with the correct algorithm. - SignatureAlgorithmSuite: authPref.GetSignatureAlgorithmSuite(), - }, - Proxy: *proxyConfig, - ServerVersion: teleport.Version, - MinClientVersion: teleport.MinClientVersion, - ClusterName: h.auth.clusterName, - } + // cache the generic answer to avoid doing work for each request + resp, err := utils.FnCacheGet[webclient.PingResponse](r.Context(), h.findEndpointCache, "find", func(ctx context.Context) (webclient.PingResponse, error) { + response := webclient.PingResponse{ + ServerVersion: teleport.Version, + MinClientVersion: teleport.MinClientVersion, + ClusterName: h.auth.clusterName, + } - autoUpdateConfig, err := h.cfg.AccessPoint.GetAutoUpdateConfig(r.Context()) - // TODO(vapopov) DELETE IN v18.0.0 check of IsNotImplemented, must be backported to all latest supported versions. - if err != nil && !trace.IsNotFound(err) && !trace.IsNotImplemented(err) { - h.logger.ErrorContext(r.Context(), "failed to receive AutoUpdateConfig", "error", err) - } - // If we can't get the AU config or tools AU are not configured, we default to "disabled". - // This ensures we fail open and don't accidentally update agents if something is going wrong. - // If we want to enable AUs by default, it would be better to create a default "autoupdate_config" resource - // than changing this logic. - if autoUpdateConfig.GetSpec().GetTools() == nil { - response.AutoUpdate.ToolsMode = autoupdate.ToolsUpdateModeDisabled - } else { - response.AutoUpdate.ToolsMode = autoUpdateConfig.GetSpec().GetTools().GetMode() - } + proxyConfig, err := h.cfg.ProxySettings.GetProxySettings(r.Context()) + if err != nil { + return response, trace.Wrap(err) + } + response.Proxy = *proxyConfig - autoUpdateVersion, err := h.cfg.AccessPoint.GetAutoUpdateVersion(r.Context()) - // TODO(vapopov) DELETE IN v18.0.0 check of IsNotImplemented, must be backported to all latest supported versions. - if err != nil && !trace.IsNotFound(err) && !trace.IsNotImplemented(err) { - h.logger.ErrorContext(r.Context(), "failed to receive AutoUpdateVersion", "error", err) - } - // If we can't get the AU version or tools AU version is not specified, we default to the current proxy version. - // This ensures we always advertise a version compatible with the cluster. - if autoUpdateVersion.GetSpec().GetTools() == nil { - response.AutoUpdate.ToolsVersion = api.Version - } else { - response.AutoUpdate.ToolsVersion = autoUpdateVersion.GetSpec().GetTools().GetTargetVersion() + authPref, err := h.cfg.AccessPoint.GetAuthPreference(r.Context()) + if err != nil { + return response, trace.Wrap(err) + } + response.Auth = webclient.AuthenticationSettings{SignatureAlgorithmSuite: authPref.GetSignatureAlgorithmSuite()} + + autoUpdateConfig, err := h.cfg.AccessPoint.GetAutoUpdateConfig(r.Context()) + // TODO(vapopov) DELETE IN v18.0.0 check of IsNotImplemented, must be backported to all latest supported versions. + if err != nil && !trace.IsNotFound(err) && !trace.IsNotImplemented(err) { + h.logger.ErrorContext(r.Context(), "failed to receive AutoUpdateConfig", "error", err) + } + // If we can't get the AU config or tools AU are not configured, we default to "disabled". + // This ensures we fail open and don't accidentally update agents if something is going wrong. + // If we want to enable AUs by default, it would be better to create a default "autoupdate_config" resource + // than changing this logic. + if autoUpdateConfig.GetSpec().GetTools() == nil { + response.AutoUpdate.ToolsMode = autoupdate.ToolsUpdateModeDisabled + } else { + response.AutoUpdate.ToolsMode = autoUpdateConfig.GetSpec().GetTools().GetMode() + } + + autoUpdateVersion, err := h.cfg.AccessPoint.GetAutoUpdateVersion(r.Context()) + // TODO(vapopov) DELETE IN v18.0.0 check of IsNotImplemented, must be backported to all latest supported versions. + if err != nil && !trace.IsNotFound(err) && !trace.IsNotImplemented(err) { + h.logger.ErrorContext(r.Context(), "failed to receive AutoUpdateVersion", "error", err) + } + // If we can't get the AU version or tools AU version is not specified, we default to the current proxy version. + // This ensures we always advertise a version compatible with the cluster. + if autoUpdateVersion.GetSpec().GetTools() == nil { + response.AutoUpdate.ToolsVersion = api.Version + } else { + response.AutoUpdate.ToolsVersion = autoUpdateVersion.GetSpec().GetTools().GetTargetVersion() + } + + return response, nil + }) + if err != nil { + return nil, trace.Wrap(err) } - return response, nil + // If you need to modulate the response based on the request params (will need to do this for automatic updates) + // Do it here. + return resp, nil } func (h *Handler) pingWithConnector(w http.ResponseWriter, r *http.Request, p httprouter.Params) (interface{}, error) { diff --git a/lib/web/apiserver_ping_test.go b/lib/web/apiserver_ping_test.go index 903a204fe2228..854a5339c7b69 100644 --- a/lib/web/apiserver_ping_test.go +++ b/lib/web/apiserver_ping_test.go @@ -395,6 +395,11 @@ func TestPing_autoUpdateResources(t *testing.T) { require.NoError(t, err) } + // clear the fn cache to force the next answer to be fresh + for _, proxy := range env.proxies { + proxy.handler.handler.findEndpointCache.Remove("find") + } + resp, err := client.NewInsecureWebClient().Do(req) require.NoError(t, err) From 9683a392f5c6b2e3ee6dfb98106c11b0586c6c37 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Mon, 28 Oct 2024 13:22:10 -0400 Subject: [PATCH 2/3] address feedback --- lib/web/apiserver.go | 34 +++++++++++++++++----------------- lib/web/apiserver_ping_test.go | 4 ++-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/web/apiserver.go b/lib/web/apiserver.go index 0df54870ea7a3..a4b9c9350e122 100644 --- a/lib/web/apiserver.go +++ b/lib/web/apiserver.go @@ -461,28 +461,16 @@ func NewHandler(cfg Config, opts ...HandlerOption) (*APIHandler, error) { const apiPrefix = "/" + teleport.WebAPIVersion cfg.SetDefaults() - clock := clockwork.NewRealClock() - - findCache, err := utils.NewFnCache(utils.FnCacheConfig{ - TTL: findEndpointCacheTTL, - Clock: clock, - Context: cfg.Context, - ReloadOnErr: false, - }) - if err != nil { - return nil, trace.Wrap(err, "creating /find cache") - } h := &Handler{ cfg: cfg, log: newPackageLogger(), logger: slog.Default().With(teleport.ComponentKey, teleport.ComponentWeb), - clock: clock, + clock: clockwork.NewRealClock(), clusterFeatures: cfg.ClusterFeatures, healthCheckAppServer: cfg.HealthCheckAppServer, tracer: cfg.TracerProvider.Tracer(teleport.ComponentWeb), wsIODeadline: wsIODeadline, - findEndpointCache: findCache, } if automaticUpgrades(cfg.ClusterFeatures) && h.cfg.AutomaticUpgradesChannels == nil { @@ -498,6 +486,18 @@ func NewHandler(cfg Config, opts ...HandlerOption) (*APIHandler, error) { } } + // We create the cache after applying the options to make sure we use the fake clock if it was passed. + findCache, err := utils.NewFnCache(utils.FnCacheConfig{ + TTL: findEndpointCacheTTL, + Clock: h.clock, + Context: cfg.Context, + ReloadOnErr: false, + }) + if err != nil { + return nil, trace.Wrap(err, "creating /find cache") + } + h.findEndpointCache = findCache + sessionLingeringThreshold := cachedSessionLingeringThreshold if cfg.CachedSessionLingeringThreshold != nil { sessionLingeringThreshold = *cfg.CachedSessionLingeringThreshold @@ -1543,7 +1543,7 @@ func (h *Handler) ping(w http.ResponseWriter, r *http.Request, p httprouter.Para func (h *Handler) find(w http.ResponseWriter, r *http.Request, p httprouter.Params) (interface{}, error) { // cache the generic answer to avoid doing work for each request - resp, err := utils.FnCacheGet[webclient.PingResponse](r.Context(), h.findEndpointCache, "find", func(ctx context.Context) (webclient.PingResponse, error) { + resp, err := utils.FnCacheGet[*webclient.PingResponse](r.Context(), h.findEndpointCache, "find", func(ctx context.Context) (*webclient.PingResponse, error) { response := webclient.PingResponse{ ServerVersion: teleport.Version, MinClientVersion: teleport.MinClientVersion, @@ -1552,13 +1552,13 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, p httprouter.Para proxyConfig, err := h.cfg.ProxySettings.GetProxySettings(r.Context()) if err != nil { - return response, trace.Wrap(err) + return nil, trace.Wrap(err) } response.Proxy = *proxyConfig authPref, err := h.cfg.AccessPoint.GetAuthPreference(r.Context()) if err != nil { - return response, trace.Wrap(err) + return nil, trace.Wrap(err) } response.Auth = webclient.AuthenticationSettings{SignatureAlgorithmSuite: authPref.GetSignatureAlgorithmSuite()} @@ -1590,7 +1590,7 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, p httprouter.Para response.AutoUpdate.ToolsVersion = autoUpdateVersion.GetSpec().GetTools().GetTargetVersion() } - return response, nil + return &response, nil }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/web/apiserver_ping_test.go b/lib/web/apiserver_ping_test.go index 854a5339c7b69..231c8625ffacd 100644 --- a/lib/web/apiserver_ping_test.go +++ b/lib/web/apiserver_ping_test.go @@ -395,9 +395,9 @@ func TestPing_autoUpdateResources(t *testing.T) { require.NoError(t, err) } - // clear the fn cache to force the next answer to be fresh + // expire the fn cache to force the next answer to be fresh for _, proxy := range env.proxies { - proxy.handler.handler.findEndpointCache.Remove("find") + proxy.clock.Advance(2 * findEndpointCacheTTL) } resp, err := client.NewInsecureWebClient().Do(req) From e74b91cad0fe088a7f1472c1f02ea77e23ee6264 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Mon, 28 Oct 2024 13:56:55 -0400 Subject: [PATCH 3/3] typo in godoc --- lib/web/apiserver.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/web/apiserver.go b/lib/web/apiserver.go index a4b9c9350e122..2bb156dbca990 100644 --- a/lib/web/apiserver.go +++ b/lib/web/apiserver.go @@ -178,9 +178,9 @@ type Handler struct { // open. wsIODeadline time.Duration - // findCache is used to cache the find endpoint answer. As this endpoint is unprotected and has high rate-limits, - // each call must cause minimal work. The cached answer can be modulated after, for example if the caller specified - // a specific Automatic Updates UUID or group. + // findEndpointCache is used to cache the find endpoint answer. As this endpoint is unprotected and has high + // rate-limits, each call must cause minimal work. The cached answer can be modulated after, for example if the + // caller specified its Automatic Updates UUID or group. findEndpointCache *utils.FnCache }