From 516e84cbf982922076eebeb41ee84e7bb868fa31 Mon Sep 17 00:00:00 2001 From: Matt Veitas Date: Thu, 12 Dec 2024 08:01:21 -0500 Subject: [PATCH 1/3] feat: Skip writeback for chunks fetched by queriers older than a duration --- clients/pkg/logentry/stages/json_test.go | 5 + clients/pkg/logentry/stages/logfmt_test.go | 5 + pkg/distributor/distributor.go | 2 +- pkg/storage/chunk/cache/cache_test.go | 2 +- pkg/storage/chunk/fetcher/fetcher.go | 29 +- pkg/storage/chunk/fetcher/fetcher_test.go | 274 ++++++++++-------- pkg/storage/config/store.go | 9 +- pkg/storage/store.go | 2 +- pkg/storage/stores/series_store_write_test.go | 2 +- pkg/storage/util_test.go | 2 +- 10 files changed, 189 insertions(+), 143 deletions(-) diff --git a/clients/pkg/logentry/stages/json_test.go b/clients/pkg/logentry/stages/json_test.go index 9e0d2dffaea4c..df8ed3ed5c186 100644 --- a/clients/pkg/logentry/stages/json_test.go +++ b/clients/pkg/logentry/stages/json_test.go @@ -56,6 +56,11 @@ func TestPipeline_JSON(t *testing.T) { entry string expectedExtract map[string]interface{} }{ + "successfully run a pipeline with 1 logfmt stage with log not using json formatted string": { + testJSONYamlSingleStageWithoutSource, + "2012-11-01T22:08:41+00:00 [WARN] app:loki duration:125 - this log line is not in logfmt", + map[string]interface{}{}, + }, "successfully run a pipeline with 1 json stage without source": { testJSONYamlSingleStageWithoutSource, testJSONLogLine, diff --git a/clients/pkg/logentry/stages/logfmt_test.go b/clients/pkg/logentry/stages/logfmt_test.go index 1406aa080cf88..60c680b22d9db 100644 --- a/clients/pkg/logentry/stages/logfmt_test.go +++ b/clients/pkg/logentry/stages/logfmt_test.go @@ -44,6 +44,11 @@ func TestPipeline_Logfmt(t *testing.T) { entry string expectedExtract map[string]interface{} }{ + "successfully run a pipeline with 1 logfmt stage with log not using logfmt formatted string": { + testLogfmtYamlSingleStageWithoutSource, + "2012-11-01T22:08:41+00:00 [WARN] app:loki duration:125 - this log line is not in logfmt", + map[string]interface{}{}, + }, "successfully run a pipeline with 1 logfmt stage without source": { testLogfmtYamlSingleStageWithoutSource, testLogfmtLogLine, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f871336e26e9d..45853bce14f58 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -848,7 +848,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID d.streamShardCount.Inc() if shardStreamsCfg.LoggingEnabled { - level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount) + level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount, "push_size", pushSize) } return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream) diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 3ff473934cdb1..2607595b30cb9 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -132,7 +132,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) { }, } - fetcher, err := fetcher.New(c, nil, false, s, nil, 0) + fetcher, err := fetcher.New(c, nil, false, s, nil, 0, 0) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index 45b6970045a91..8811cdba3ecc9 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -49,7 +49,8 @@ type Fetcher struct { cachel2 cache.Cache cacheStubs bool - l2CacheHandoff time.Duration + l2CacheHandoff time.Duration + skipQueryWritebackCacheOlderThan time.Duration wait sync.WaitGroup decodeRequests chan decodeRequest @@ -69,15 +70,16 @@ type decodeResponse struct { } // New makes a new ChunkFetcher. -func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) { +func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration, skipQueryWritebackOlderThan time.Duration) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cache, - cachel2: cachel2, - l2CacheHandoff: l2CacheHandoff, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), + schema: schema, + storage: storage, + cache: cache, + cachel2: cachel2, + l2CacheHandoff: l2CacheHandoff, + skipQueryWritebackCacheOlderThan: skipQueryWritebackOlderThan, + cacheStubs: cacheStubs, + decodeRequests: make(chan decodeRequest), } c.wait.Add(chunkDecodeParallelism) @@ -138,6 +140,10 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks)) for _, m := range chunks { + if c.skipQueryWritebackCacheOlderThan > 0 && m.From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) { + continue + } + // Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable // expectation to find it there. if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) { @@ -211,7 +217,6 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun st.AddCacheBytesSent(stats.ChunkCache, bytes) // Always cache any chunks we did get - if cacheErr := c.WriteBackCache(ctx, fromStorage); cacheErr != nil { level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } @@ -230,6 +235,10 @@ func (c *Fetcher) WriteBackCache(ctx context.Context, chunks []chunk.Chunk) erro keysL2 := make([]string, 0, len(chunks)) bufsL2 := make([][]byte, 0, len(chunks)) for i := range chunks { + if c.skipQueryWritebackCacheOlderThan > 0 && chunks[i].From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) { + continue + } + var encoded []byte var err error if !c.cacheStubs { diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 2251c93022b29..28d356c779eb4 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -25,132 +25,156 @@ import ( func Test(t *testing.T) { now := time.Now() tests := []struct { - name string - handoff time.Duration - storeStart []chunk.Chunk - l1Start []chunk.Chunk - l2Start []chunk.Chunk - fetch []chunk.Chunk - l1KeysRequested int - l1End []chunk.Chunk - l2KeysRequested int - l2End []chunk.Chunk + name string + handoff time.Duration + skipQueryWriteback time.Duration + storeStart []chunk.Chunk + l1Start []chunk.Chunk + l2Start []chunk.Chunk + fetch []chunk.Chunk + l1KeysRequested int + l1End []chunk.Chunk + l2KeysRequested int + l2End []chunk.Chunk }{ { - name: "all found in L1 cache", - handoff: 0, - storeStart: []chunk.Chunk{}, - l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2Start: []chunk.Chunk{}, - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l1KeysRequested: 3, - l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2End: []chunk.Chunk{}, + name: "all found in L1 cache", + handoff: 0, + skipQueryWriteback: 0, + storeStart: []chunk.Chunk{}, + l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2End: []chunk.Chunk{}, }, { - name: "all found in L2 cache", - handoff: 1, // Only needs to be greater than zero so that we check L2 cache - storeStart: []chunk.Chunk{}, - l1Start: []chunk.Chunk{}, - l2Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l1End: []chunk.Chunk{}, - l2KeysRequested: 3, - l2End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + name: "all found in L2 cache", + handoff: 1, // Only needs to be greater than zero so that we check L2 cache + skipQueryWriteback: 0, + storeStart: []chunk.Chunk{}, + l1Start: []chunk.Chunk{}, + l2Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l1End: []chunk.Chunk{}, + l2KeysRequested: 3, + l2End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), }, { - name: "some in L1, some in L2", - handoff: 5 * time.Hour, - storeStart: []chunk.Chunk{}, - l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2Start: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), - l1KeysRequested: 3, - l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2KeysRequested: 3, - l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), + name: "some in L1, some in L2", + handoff: 5 * time.Hour, + skipQueryWriteback: 0, + storeStart: []chunk.Chunk{}, + l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2Start: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2KeysRequested: 3, + l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), }, { - name: "some in L1, some in L2, some in store", - handoff: 5 * time.Hour, - storeStart: makeChunks(now, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), - l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}), - l2Start: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}), - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), - l1KeysRequested: 3, - l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2KeysRequested: 3, - l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), + name: "some in L1, some in L2, some in store", + handoff: 5 * time.Hour, + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), + l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}), + l2Start: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}), + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2KeysRequested: 3, + l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), }, { - name: "writeback l1", - handoff: 24 * time.Hour, - storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l1Start: []chunk.Chunk{}, - l2Start: []chunk.Chunk{}, - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l1KeysRequested: 3, - l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2End: []chunk.Chunk{}, + name: "skipQueryWriteback", + handoff: 24 * time.Hour, + skipQueryWriteback: 3 * 24 * time.Hour, + storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}), + l1Start: []chunk.Chunk{}, + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2KeysRequested: 0, + l2End: []chunk.Chunk{}, }, { - name: "writeback l2", - handoff: 24 * time.Hour, - storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1Start: []chunk.Chunk{}, - l2Start: []chunk.Chunk{}, - fetch: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1End: []chunk.Chunk{}, - l2KeysRequested: 3, - l2End: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + name: "writeback l1", + handoff: 24 * time.Hour, + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l1Start: []chunk.Chunk{}, + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2End: []chunk.Chunk{}, }, { - name: "writeback l1 and l2", - handoff: 24 * time.Hour, - storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1Start: []chunk.Chunk{}, - l2Start: []chunk.Chunk{}, - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1KeysRequested: 3, - l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2KeysRequested: 3, - l2End: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + name: "writeback l2", + handoff: 24 * time.Hour, + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1Start: []chunk.Chunk{}, + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1End: []chunk.Chunk{}, + l2KeysRequested: 3, + l2End: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), }, { - name: "verify l1 skip optimization", - handoff: 24 * time.Hour, - storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1Start: []chunk.Chunk{}, - l2Start: []chunk.Chunk{}, - fetch: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1KeysRequested: 0, - l1End: []chunk.Chunk{}, - l2KeysRequested: 3, - l2End: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + name: "writeback l1 and l2", + handoff: 24 * time.Hour, + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1Start: []chunk.Chunk{}, + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2KeysRequested: 3, + l2End: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), }, { - name: "verify l1 skip optimization plus extended", - handoff: 20 * time.Hour, // 20 hours, 10% extension should be 22 hours - storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1Start: makeChunks(now, c{20 * time.Hour, 21 * time.Hour}, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), - l2Start: makeChunks(now, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), - fetch: makeChunks(now, c{20 * time.Hour, 21 * time.Hour}, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), - l1KeysRequested: 2, - l1End: makeChunks(now, c{20 * time.Hour, 21 * time.Hour}, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), - l2KeysRequested: 1, // We won't look for the extended handoff key in L2, so only one lookup should go to L2 - l2End: makeChunks(now, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), + name: "verify l1 skip optimization", + handoff: 24 * time.Hour, + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1Start: []chunk.Chunk{}, + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1KeysRequested: 0, + l1End: []chunk.Chunk{}, + l2KeysRequested: 3, + l2End: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), }, { - name: "verify l2 skip optimization", - handoff: 24 * time.Hour, - storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), - l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2Start: []chunk.Chunk{}, - fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l1KeysRequested: 3, - l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2KeysRequested: 0, - l2End: []chunk.Chunk{}, + name: "verify l1 skip optimization plus extended", + handoff: 20 * time.Hour, // 20 hours, 10% extension should be 22 hours + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1Start: makeChunks(now, c{20 * time.Hour, 21 * time.Hour}, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), + l2Start: makeChunks(now, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), + fetch: makeChunks(now, c{20 * time.Hour, 21 * time.Hour}, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), + l1KeysRequested: 2, + l1End: makeChunks(now, c{20 * time.Hour, 21 * time.Hour}, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), + l2KeysRequested: 1, // We won't look for the extended handoff key in L2, so only one lookup should go to L2 + l2End: makeChunks(now, c{21 * time.Hour, 22 * time.Hour}, c{22 * time.Hour, 23 * time.Hour}), + }, + { + name: "verify l2 skip optimization", + handoff: 24 * time.Hour, + skipQueryWriteback: 0, + storeStart: makeChunks(now, c{31 * time.Hour, 32 * time.Hour}, c{32 * time.Hour, 33 * time.Hour}, c{33 * time.Hour, 34 * time.Hour}), + l1Start: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2KeysRequested: 0, + l2End: []chunk.Chunk{}, }, } for _, test := range tests { @@ -194,7 +218,7 @@ func Test(t *testing.T) { assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart)) // Build fetcher - f, err := New(c1, c2, false, sc, chunkClient, test.handoff) + f, err := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback) assert.NoError(t, err) // Run the test @@ -235,23 +259,25 @@ func BenchmarkFetch(b *testing.B) { fetch = append(fetch, storeStart...) test := struct { - name string - handoff time.Duration - storeStart []chunk.Chunk - l1Start []chunk.Chunk - l2Start []chunk.Chunk - fetch []chunk.Chunk - l1KeysRequested int - l1End []chunk.Chunk - l2KeysRequested int - l2End []chunk.Chunk + name string + handoff time.Duration + skipQueryWriteback time.Duration + storeStart []chunk.Chunk + l1Start []chunk.Chunk + l2Start []chunk.Chunk + fetch []chunk.Chunk + l1KeysRequested int + l1End []chunk.Chunk + l2KeysRequested int + l2End []chunk.Chunk }{ - name: "some in L1, some in L2", - handoff: time.Duration(numchunks/3+100) * time.Hour, - storeStart: storeStart, - l1Start: l1Start, - l2Start: l2Start, - fetch: fetch, + name: "some in L1, some in L2", + handoff: time.Duration(numchunks/3+100) * time.Hour, + skipQueryWriteback: 0, + storeStart: storeStart, + l1Start: l1Start, + l2Start: l2Start, + fetch: fetch, } c1 := cache.NewMockCache() @@ -291,7 +317,7 @@ func BenchmarkFetch(b *testing.B) { _ = chunkClient.PutChunks(context.Background(), test.storeStart) // Build fetcher - f, _ := New(c1, c2, false, sc, chunkClient, test.handoff) + f, _ := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback) for i := 0; i < b.N; i++ { _, err := f.FetchChunks(context.Background(), test.fetch) diff --git a/pkg/storage/config/store.go b/pkg/storage/config/store.go index 8dbd57cdc2503..fecea5ef6f040 100644 --- a/pkg/storage/config/store.go +++ b/pkg/storage/config/store.go @@ -10,9 +10,10 @@ import ( ) type ChunkStoreConfig struct { - ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"` - ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"` - WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."` + ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"` + ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"` + WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."` + SkipQueryWritebackOlderThan time.Duration `yaml:"skip_query_writeback_cache_older_than"` L2ChunkCacheHandoff time.Duration `yaml:"l2_chunk_cache_handoff"` CacheLookupsOlderThan model.Duration `yaml:"cache_lookups_older_than"` @@ -38,7 +39,7 @@ func (cfg *ChunkStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.L2ChunkCacheHandoff, "store.chunks-cache-l2.handoff", 0, "Chunks will be handed off to the L2 cache after this duration. 0 to disable L2 cache.") f.BoolVar(&cfg.chunkCacheStubs, "store.chunks-cache.cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.") cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "", f) - + f.DurationVar(&cfg.SkipQueryWritebackOlderThan, "store.skip-query-writeback-older-than", 0, "Chunks fetched from queriers before this duration will not be written to the cache. A value of 0 will write all chunks to the cache") f.Var(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", "Cache index entries older than this period. 0 to disable.") } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a8e6a1add3239..8daf27ce265f8 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -198,7 +198,7 @@ func (s *LokiStore) init() error { if err != nil { return err } - f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff) + f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff, s.storeCfg.SkipQueryWritebackOlderThan) if err != nil { return err } diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 5ff8a00d99706..a6e0a9a55cb93 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -160,7 +160,7 @@ func TestChunkWriter_PutOne(t *testing.T) { idx := &mockIndexWriter{} client := &mockChunksClient{} - f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0) + f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0, 0) require.NoError(t, err) cw := NewChunkWriter(f, schemaConfig, idx, true) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index a0dc75999692f..cc9ded1c53447 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -261,7 +261,7 @@ func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, panic(err) } - f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0) + f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0, 0) if err != nil { panic(err) } From 7d7e1a30007bc96d3e62001390c5131c2ec522a7 Mon Sep 17 00:00:00 2001 From: Matt Veitas Date: Thu, 12 Dec 2024 09:34:35 -0500 Subject: [PATCH 2/3] Updating docs --- docs/sources/shared/configuration.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9cf3b2a18ed0f..5fb053785e69e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1719,6 +1719,11 @@ The `chunk_store_config` block configures how chunks will be cached and how long # The CLI flags prefix for this block configuration is: store.index-cache-write [write_dedupe_cache_config: ] +# Chunks fetched from queriers before this duration will not be written to the +# cache. A value of 0 will write all chunks to the cache +# CLI flag: -store.skip-query-writeback-older-than +[skip_query_writeback_cache_older_than: | default = 0s] + # Chunks will be handed off to the L2 cache after this duration. 0 to disable L2 # cache. # CLI flag: -store.chunks-cache-l2.handoff From b4319926139bb95731b2a7566b46977ed1df527a Mon Sep 17 00:00:00 2001 From: Matt Veitas Date: Thu, 12 Dec 2024 14:26:51 -0500 Subject: [PATCH 3/3] Remove filtering of chunks --- pkg/storage/chunk/fetcher/fetcher.go | 4 ---- pkg/storage/chunk/fetcher/fetcher_test.go | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index 8811cdba3ecc9..87679f995ff3b 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -140,10 +140,6 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks)) for _, m := range chunks { - if c.skipQueryWritebackCacheOlderThan > 0 && m.From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) { - continue - } - // Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable // expectation to find it there. if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) { diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 28d356c779eb4..b13f4cc6cf2ef 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -97,7 +97,7 @@ func Test(t *testing.T) { fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}), l1KeysRequested: 3, l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), - l2KeysRequested: 0, + l2KeysRequested: 2, l2End: []chunk.Chunk{}, }, {