Skip to content

Commit

Permalink
feat(metadata cache): adds max_metadata_cache_freshness (grafana#11682)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Adds `max_metadata_cache_freshness` to limit the metadata requests that
get cached. When configured, only metadata requests with end time before
`now - max_metadata_cache_freshness` are cacheable.

_reason for setting the default to 24h?_
metric results cache can [extract samples for the desired time range
from an
extent](https://github.com/grafana/loki/blob/b6e64e1ef1fb2a2155661c815d0198e147579c8e/pkg/querier/queryrange/queryrangebase/results_cache.go#L78)
since the samples are associated with a timestamp. But the same is not
true for metadata caching, it is not possible to extract a subset of
labels/series from a cached extent. As a result, we could return
inaccurate results, more that what was requested. for ex: returning
results from an entire 1h extent for a 5m query

Setting `max_metadata_cache_freshness` to 24h should help us avoid
caching recent data. For anything older, we would report cached metadata
results at a granularity controlled by
`split_metadata_queries_by_interval`

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](grafana@0d4416a)
  • Loading branch information
ashwanthgoli authored Jan 16, 2024
1 parent 86f2001 commit 51899b5
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 20 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks.
* [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache.
* [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness_per_query: <duration> | default = 10m]

# Do not cache metadata request if the end time is within the
# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such
# limits. Defaults to 24h.
# CLI flag: -frontend.max-metadata-cache-freshness
[max_metadata_cache_freshness: <duration> | default = 1d]

# Do not cache requests with an end time that falls within Now minus this
# duration. 0 disables this feature (default).
# CLI flag: -frontend.max-stats-cache-freshness
Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/queryrange/labels_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func NewLabelsCacheMiddleware(
merger,
labelsExtractor{},
cacheGenNumberLoader,
shouldCache,
func(ctx context.Context, r queryrangebase.Request) bool {
return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits)
},
parallelismForReq,
retentionEnabled,
metrics,
Expand Down
108 changes: 108 additions & 0 deletions pkg/querier/queryrange/labels_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,111 @@ func TestLabelsCache(t *testing.T) {
})
}
}

func TestLabelCache_freshness(t *testing.T) {
testTime := time.Now().Add(-1 * time.Hour)
from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime)
start, end := from.Time(), through.Time()
nonOverlappingStart, nonOverlappingEnd := from.Add(-24*time.Hour).Time(), through.Add(-24*time.Hour).Time()

for _, tt := range []struct {
name string
req *LabelRequest
shouldCache bool
maxMetadataCacheFreshness time.Duration
}{
{
name: "max metadata freshness not set",
req: &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
},
shouldCache: true,
},
{
name: "req overlaps with max cache freshness window",
req: &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: false,
},
{
name: "req does not overlap max cache freshness window",
req: &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &nonOverlappingStart,
End: &nonOverlappingEnd,
},
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
cacheMiddleware, err := NewLabelsCacheMiddleware(
log.NewNopLogger(),
fakeLimits{
metadataSplitDuration: map[string]time.Duration{
"fake": 24 * time.Hour,
},
maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness,
},
DefaultCodec,
cache.NewMockCache(),
nil,
nil,
func(_ context.Context, _ []string, _ queryrangebase.Request) int {
return 1
},
false,
nil,
nil,
)
require.NoError(t, err)

labelsResp := &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []string{"bar", "buzz"},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
},
}

called := 0
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++

// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, tt.req.GetStart(), r.GetStart())
require.Equal(t, tt.req.GetEnd(), r.GetEnd())

return labelsResp, nil
}))

ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, tt.req)
require.NoError(t, err)
require.Equal(t, 1, called) // called actual handler, as not cached.
require.Equal(t, labelsResp, got)

called = 0
got, err = handler.Do(ctx, tt.req)
require.NoError(t, err)
if !tt.shouldCache {
require.Equal(t, 1, called)
} else {
require.Equal(t, 0, called)
}
require.Equal(t, labelsResp, got)
})
}
}
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ type Limits interface {
MaxQueryBytesRead(context.Context, string) int
MaxQuerierBytesRead(context.Context, string) int
MaxStatsCacheFreshness(context.Context, string) time.Duration
MaxMetadataCacheFreshness(context.Context, string) time.Duration
VolumeEnabled(string) bool
}
39 changes: 22 additions & 17 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,23 +1237,24 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) {
}

type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
requiredNumberLabels int
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
volumeEnabled bool
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
requiredNumberLabels int
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
maxMetadataCacheFreshness time.Duration
volumeEnabled bool
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand Down Expand Up @@ -1344,6 +1345,10 @@ func (f fakeLimits) MaxStatsCacheFreshness(_ context.Context, _ string) time.Dur
return f.maxStatsCacheFreshness
}

func (f fakeLimits) MaxMetadataCacheFreshness(_ context.Context, _ string) time.Duration {
return f.maxMetadataCacheFreshness
}

func (f fakeLimits) VolumeEnabled(_ string) bool {
return f.volumeEnabled
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/querier/queryrange/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/pkg/util/validation"
)

type cacheKeySeries struct {
Expand Down Expand Up @@ -92,9 +97,28 @@ func NewSeriesCacheMiddleware(
merger,
seriesExtractor{},
cacheGenNumberLoader,
shouldCache,
func(ctx context.Context, r queryrangebase.Request) bool {
return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits)
},
parallelismForReq,
retentionEnabled,
metrics,
)
}

func shouldCacheMetadataReq(ctx context.Context, logger log.Logger, shouldCache queryrangebase.ShouldCacheFn, req queryrangebase.Request, l Limits) bool {
if shouldCache != nil && !shouldCache(ctx, req) {
return false
}

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. won't cache", "err", err)
return false
}

cacheFreshnessCapture := func(id string) time.Duration { return l.MaxMetadataCacheFreshness(ctx, id) }
maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture)

return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(model.Now().Add(-maxCacheFreshness))
}
110 changes: 110 additions & 0 deletions pkg/querier/queryrange/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,113 @@ func TestSeriesCache(t *testing.T) {
}
})
}

func TestSeriesCache_freshness(t *testing.T) {
testTime := time.Now().Add(-1 * time.Hour)
from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime)

for _, tt := range []struct {
name string
req *LokiSeriesRequest
shouldCache bool
maxMetadataCacheFreshness time.Duration
}{
{
name: "max metadata freshness not set",
req: &LokiSeriesRequest{
StartTs: from.Time(),
EndTs: through.Time(),
Match: []string{`{namespace=~".*"}`},
Path: seriesAPIPath,
},
shouldCache: true,
},
{
name: "req overlaps with max cache freshness window",
req: &LokiSeriesRequest{
StartTs: from.Time(),
EndTs: through.Time(),
Match: []string{`{namespace=~".*"}`},
Path: seriesAPIPath,
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: false,
},
{
name: "req does not overlap max cache freshness window",
req: &LokiSeriesRequest{
StartTs: from.Add(-24 * time.Hour).Time(),
EndTs: through.Add(-24 * time.Hour).Time(),
Match: []string{`{namespace=~".*"}`},
Path: seriesAPIPath,
},
maxMetadataCacheFreshness: 24 * time.Hour,
shouldCache: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
cacheMiddleware, err := NewSeriesCacheMiddleware(
log.NewNopLogger(),
fakeLimits{
metadataSplitDuration: map[string]time.Duration{
"fake": 24 * time.Hour,
},
maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness,
},
DefaultCodec,
cache.NewMockCache(),
nil,
nil,
func(_ context.Context, _ []string, _ queryrangebase.Request) int {
return 1
},
false,
nil,
nil,
)
require.NoError(t, err)

seriesResp := &LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []logproto.SeriesIdentifier{
{
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}},
},
},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
},
}

called := 0
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++

// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, tt.req.GetStart(), r.GetStart())
require.Equal(t, tt.req.GetEnd(), r.GetEnd())

return seriesResp, nil
}))

ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, tt.req)
require.NoError(t, err)
require.Equal(t, 1, called) // called actual handler, as not cached.
require.Equal(t, seriesResp, got)

called = 0
got, err = handler.Do(ctx, tt.req)
require.NoError(t, err)
if !tt.shouldCache {
require.Equal(t, 1, called)
} else {
require.Equal(t, 0, called)
}
require.Equal(t, seriesResp, got)
})
}
}
Loading

0 comments on commit 51899b5

Please sign in to comment.