Skip to content

Commit

Permalink
feat: Skip writeback for chunks fetched by queriers older than a dura…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
mveitas committed Dec 12, 2024
1 parent b7fe6bf commit 516e84c
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 143 deletions.
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func TestPipeline_JSON(t *testing.T) {
entry string
expectedExtract map[string]interface{}
}{
"successfully run a pipeline with 1 logfmt stage with log not using json formatted string": {
testJSONYamlSingleStageWithoutSource,
"2012-11-01T22:08:41+00:00 [WARN] app:loki duration:125 - this log line is not in logfmt",
map[string]interface{}{},
},
"successfully run a pipeline with 1 json stage without source": {
testJSONYamlSingleStageWithoutSource,
testJSONLogLine,
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/logfmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func TestPipeline_Logfmt(t *testing.T) {
entry string
expectedExtract map[string]interface{}
}{
"successfully run a pipeline with 1 logfmt stage with log not using logfmt formatted string": {
testLogfmtYamlSingleStageWithoutSource,
"2012-11-01T22:08:41+00:00 [WARN] app:loki duration:125 - this log line is not in logfmt",
map[string]interface{}{},
},
"successfully run a pipeline with 1 logfmt stage without source": {
testLogfmtYamlSingleStageWithoutSource,
testLogfmtLogLine,
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID

d.streamShardCount.Inc()
if shardStreamsCfg.LoggingEnabled {
level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount)
level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount, "push_size", pushSize)
}

return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) {
},
}

fetcher, err := fetcher.New(c, nil, false, s, nil, 0)
fetcher, err := fetcher.New(c, nil, false, s, nil, 0, 0)
require.NoError(t, err)
defer fetcher.Stop()

Expand Down
29 changes: 19 additions & 10 deletions pkg/storage/chunk/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type Fetcher struct {
cachel2 cache.Cache
cacheStubs bool

l2CacheHandoff time.Duration
l2CacheHandoff time.Duration
skipQueryWritebackCacheOlderThan time.Duration

wait sync.WaitGroup
decodeRequests chan decodeRequest
Expand All @@ -69,15 +70,16 @@ type decodeResponse struct {
}

// New makes a new ChunkFetcher.
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) {
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration, skipQueryWritebackOlderThan time.Duration) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
skipQueryWritebackCacheOlderThan: skipQueryWritebackOlderThan,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
}

c.wait.Add(chunkDecodeParallelism)
Expand Down Expand Up @@ -138,6 +140,10 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun
l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks))

for _, m := range chunks {
if c.skipQueryWritebackCacheOlderThan > 0 && m.From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) {
continue
}

// Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable
// expectation to find it there.
if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) {
Expand Down Expand Up @@ -211,7 +217,6 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun
st.AddCacheBytesSent(stats.ChunkCache, bytes)

// Always cache any chunks we did get

if cacheErr := c.WriteBackCache(ctx, fromStorage); cacheErr != nil {
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}
Expand All @@ -230,6 +235,10 @@ func (c *Fetcher) WriteBackCache(ctx context.Context, chunks []chunk.Chunk) erro
keysL2 := make([]string, 0, len(chunks))
bufsL2 := make([][]byte, 0, len(chunks))
for i := range chunks {
if c.skipQueryWritebackCacheOlderThan > 0 && chunks[i].From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) {
continue
}

var encoded []byte
var err error
if !c.cacheStubs {
Expand Down
Loading

0 comments on commit 516e84c

Please sign in to comment.