diff --git a/CHANGELOG.md b/CHANGELOG.md index 84010533ed5e6..4d14f62788093 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,7 +48,8 @@ * [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks. * [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache. * [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters. -* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. +* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. +* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 51ecb12af62f1..1e94843eacf82 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -frontend.max-cache-freshness [max_cache_freshness_per_query: | default = 10m] +# Do not cache metadata request if the end time is within the +# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such +# limits. Defaults to 24h. +# CLI flag: -frontend.max-metadata-cache-freshness +[max_metadata_cache_freshness: | default = 1d] + # Do not cache requests with an end time that falls within Now minus this # duration. 0 disables this feature (default). # CLI flag: -frontend.max-stats-cache-freshness diff --git a/pkg/querier/queryrange/labels_cache.go b/pkg/querier/queryrange/labels_cache.go index 1e0dd225fa7b0..9fb511a96ae51 100644 --- a/pkg/querier/queryrange/labels_cache.go +++ b/pkg/querier/queryrange/labels_cache.go @@ -91,7 +91,9 @@ func NewLabelsCacheMiddleware( merger, labelsExtractor{}, cacheGenNumberLoader, - shouldCache, + func(ctx context.Context, r queryrangebase.Request) bool { + return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits) + }, parallelismForReq, retentionEnabled, metrics, diff --git a/pkg/querier/queryrange/labels_cache_test.go b/pkg/querier/queryrange/labels_cache_test.go index 73ab9ad8f4f84..fbad52a472bee 100644 --- a/pkg/querier/queryrange/labels_cache_test.go +++ b/pkg/querier/queryrange/labels_cache_test.go @@ -249,3 +249,111 @@ func TestLabelsCache(t *testing.T) { }) } } + +func TestLabelCache_freshness(t *testing.T) { + testTime := time.Now().Add(-1 * time.Hour) + from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime) + start, end := from.Time(), through.Time() + nonOverlappingStart, nonOverlappingEnd := from.Add(-24*time.Hour).Time(), through.Add(-24*time.Hour).Time() + + for _, tt := range []struct { + name string + req *LabelRequest + shouldCache bool + maxMetadataCacheFreshness time.Duration + }{ + { + name: "max metadata freshness not set", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, + }, + shouldCache: true, + }, + { + name: "req overlaps with max cache freshness window", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: false, + }, + { + name: "req does not overlap max cache freshness window", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &nonOverlappingStart, + End: &nonOverlappingEnd, + }, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cacheMiddleware, err := NewLabelsCacheMiddleware( + log.NewNopLogger(), + fakeLimits{ + metadataSplitDuration: map[string]time.Duration{ + "fake": 24 * time.Hour, + }, + maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness, + }, + DefaultCodec, + cache.NewMockCache(), + nil, + nil, + func(_ context.Context, _ []string, _ queryrangebase.Request) int { + return 1 + }, + false, + nil, + nil, + ) + require.NoError(t, err) + + labelsResp := &LokiLabelNamesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []string{"bar", "buzz"}, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: 1, + }, + }, + } + + called := 0 + handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + called++ + + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, tt.req.GetStart(), r.GetStart()) + require.Equal(t, tt.req.GetEnd(), r.GetEnd()) + + return labelsResp, nil + })) + + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, 1, called) // called actual handler, as not cached. + require.Equal(t, labelsResp, got) + + called = 0 + got, err = handler.Do(ctx, tt.req) + require.NoError(t, err) + if !tt.shouldCache { + require.Equal(t, 1, called) + } else { + require.Equal(t, 0, called) + } + require.Equal(t, labelsResp, got) + }) + } +} diff --git a/pkg/querier/queryrange/limits/definitions.go b/pkg/querier/queryrange/limits/definitions.go index 57b2e03c6697b..e12255883bf4a 100644 --- a/pkg/querier/queryrange/limits/definitions.go +++ b/pkg/querier/queryrange/limits/definitions.go @@ -30,5 +30,6 @@ type Limits interface { MaxQueryBytesRead(context.Context, string) int MaxQuerierBytesRead(context.Context, string) int MaxStatsCacheFreshness(context.Context, string) time.Duration + MaxMetadataCacheFreshness(context.Context, string) time.Duration VolumeEnabled(string) bool } diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index fe8799fffe799..c7c7cff4595a1 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1237,23 +1237,24 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) { } type fakeLimits struct { - maxQueryLength time.Duration - maxQueryParallelism int - tsdbMaxQueryParallelism int - maxQueryLookback time.Duration - maxEntriesLimitPerQuery int - maxSeries int - splitDuration map[string]time.Duration - metadataSplitDuration map[string]time.Duration - ingesterSplitDuration map[string]time.Duration - minShardingLookback time.Duration - queryTimeout time.Duration - requiredLabels []string - requiredNumberLabels int - maxQueryBytesRead int - maxQuerierBytesRead int - maxStatsCacheFreshness time.Duration - volumeEnabled bool + maxQueryLength time.Duration + maxQueryParallelism int + tsdbMaxQueryParallelism int + maxQueryLookback time.Duration + maxEntriesLimitPerQuery int + maxSeries int + splitDuration map[string]time.Duration + metadataSplitDuration map[string]time.Duration + ingesterSplitDuration map[string]time.Duration + minShardingLookback time.Duration + queryTimeout time.Duration + requiredLabels []string + requiredNumberLabels int + maxQueryBytesRead int + maxQuerierBytesRead int + maxStatsCacheFreshness time.Duration + maxMetadataCacheFreshness time.Duration + volumeEnabled bool } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -1344,6 +1345,10 @@ func (f fakeLimits) MaxStatsCacheFreshness(_ context.Context, _ string) time.Dur return f.maxStatsCacheFreshness } +func (f fakeLimits) MaxMetadataCacheFreshness(_ context.Context, _ string) time.Duration { + return f.maxMetadataCacheFreshness +} + func (f fakeLimits) VolumeEnabled(_ string) bool { return f.volumeEnabled } diff --git a/pkg/querier/queryrange/series_cache.go b/pkg/querier/queryrange/series_cache.go index 9ad67f70acf55..f1a15b1d220f4 100644 --- a/pkg/querier/queryrange/series_cache.go +++ b/pkg/querier/queryrange/series_cache.go @@ -9,10 +9,15 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + + "github.com/grafana/dskit/tenant" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" + "github.com/grafana/loki/pkg/util/validation" ) type cacheKeySeries struct { @@ -92,9 +97,28 @@ func NewSeriesCacheMiddleware( merger, seriesExtractor{}, cacheGenNumberLoader, - shouldCache, + func(ctx context.Context, r queryrangebase.Request) bool { + return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits) + }, parallelismForReq, retentionEnabled, metrics, ) } + +func shouldCacheMetadataReq(ctx context.Context, logger log.Logger, shouldCache queryrangebase.ShouldCacheFn, req queryrangebase.Request, l Limits) bool { + if shouldCache != nil && !shouldCache(ctx, req) { + return false + } + + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. won't cache", "err", err) + return false + } + + cacheFreshnessCapture := func(id string) time.Duration { return l.MaxMetadataCacheFreshness(ctx, id) } + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) + + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(model.Now().Add(-maxCacheFreshness)) +} diff --git a/pkg/querier/queryrange/series_cache_test.go b/pkg/querier/queryrange/series_cache_test.go index abe9920012172..fa0f04fb799e6 100644 --- a/pkg/querier/queryrange/series_cache_test.go +++ b/pkg/querier/queryrange/series_cache_test.go @@ -312,3 +312,113 @@ func TestSeriesCache(t *testing.T) { } }) } + +func TestSeriesCache_freshness(t *testing.T) { + testTime := time.Now().Add(-1 * time.Hour) + from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime) + + for _, tt := range []struct { + name string + req *LokiSeriesRequest + shouldCache bool + maxMetadataCacheFreshness time.Duration + }{ + { + name: "max metadata freshness not set", + req: &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + shouldCache: true, + }, + { + name: "req overlaps with max cache freshness window", + req: &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: false, + }, + { + name: "req does not overlap max cache freshness window", + req: &LokiSeriesRequest{ + StartTs: from.Add(-24 * time.Hour).Time(), + EndTs: through.Add(-24 * time.Hour).Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cacheMiddleware, err := NewSeriesCacheMiddleware( + log.NewNopLogger(), + fakeLimits{ + metadataSplitDuration: map[string]time.Duration{ + "fake": 24 * time.Hour, + }, + maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness, + }, + DefaultCodec, + cache.NewMockCache(), + nil, + nil, + func(_ context.Context, _ []string, _ queryrangebase.Request) int { + return 1 + }, + false, + nil, + nil, + ) + require.NoError(t, err) + + seriesResp := &LokiSeriesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []logproto.SeriesIdentifier{ + { + Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + }, + }, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: 1, + }, + }, + } + + called := 0 + handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + called++ + + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, tt.req.GetStart(), r.GetStart()) + require.Equal(t, tt.req.GetEnd(), r.GetEnd()) + + return seriesResp, nil + })) + + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, 1, called) // called actual handler, as not cached. + require.Equal(t, seriesResp, got) + + called = 0 + got, err = handler.Do(ctx, tt.req) + require.NoError(t, err) + if !tt.shouldCache { + require.Equal(t, 1, called) + } else { + require.Equal(t, 0, called) + } + require.Equal(t, seriesResp, got) + }) + } +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 45dd34f201e8d..ac25798c33e31 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -97,6 +97,7 @@ type Limits struct { MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` + MaxMetadataCacheFreshness model.Duration `yaml:"max_metadata_cache_freshness" json:"max_metadata_cache_freshness"` MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` MaxQueriersPerTenant uint `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` MaxQueryCapacity float64 `yaml:"max_query_capacity" json:"max_query_capacity"` @@ -277,6 +278,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxCacheFreshness.Set("10m") f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") + _ = l.MaxMetadataCacheFreshness.Set("24h") + f.Var(&l.MaxMetadataCacheFreshness, "frontend.max-metadata-cache-freshness", "Do not cache metadata request if the end time is within the frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such limits. Defaults to 24h.") + _ = l.MaxStatsCacheFreshness.Set("10m") f.Var(&l.MaxStatsCacheFreshness, "frontend.max-stats-cache-freshness", "Do not cache requests with an end time that falls within Now minus this duration. 0 disables this feature (default).") @@ -298,6 +302,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.QuerySplitDuration.Set("1h") f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.") + // with metadata caching, it is not possible to extract a subset of labels/series from a cached extent because unlike samples they are not associated with a timestamp. + // as a result, we could return inaccurate results. example: returning results from an entire 1h extent for a 5m query + // Setting max_metadata_cache_freshness to 24h should help us avoid caching recent data and preseve the correctness. + // For the portion of the request beyond the freshness window, granularity of the cached metadata results is determined by split_metadata_queries_by_interval. _ = l.MetadataQuerySplitDuration.Set("24h") f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.") @@ -624,6 +632,10 @@ func (o *Overrides) MaxCacheFreshness(_ context.Context, userID string) time.Dur return time.Duration(o.getOverridesForUser(userID).MaxCacheFreshness) } +func (o *Overrides) MaxMetadataCacheFreshness(_ context.Context, userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).MaxMetadataCacheFreshness) +} + func (o *Overrides) MaxStatsCacheFreshness(_ context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxStatsCacheFreshness) }