Skip to content

Commit

Permalink
chore: Add chunk count from GetChunkRef call to stats context (#14636)
Browse files Browse the repository at this point in the history
This PR improves the observability of the GetChunkRef call, especially for when bloom filters are enabled.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Oct 29, 2024
1 parent d4f8a2a commit baaaa83
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 237 deletions.
20 changes: 7 additions & 13 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/plan"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk"
Expand Down Expand Up @@ -236,6 +235,9 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
}

initialChunkCount := len(result.Refs)
result.Stats.TotalChunks = int64(initialChunkCount)
result.Stats.PostFilterChunks = int64(initialChunkCount) // populate early for error reponses

defer func() {
if err == nil {
g.metrics.preFilterChunks.WithLabelValues(routeChunkRefs).Observe(float64(initialChunkCount))
Expand All @@ -262,6 +264,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ

result.Refs = chunkRefs
level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", initialChunkCount, "filtered", len(result.Refs))
result.Stats.PostFilterChunks = int64(len(result.Refs))
return result, nil
}

Expand Down Expand Up @@ -486,16 +489,7 @@ func (g *Gateway) boundedShards(
g.metrics.preFilterChunks.WithLabelValues(routeShards).Observe(float64(ct))
g.metrics.postFilterChunks.WithLabelValues(routeShards).Observe(float64(len(filtered)))

statistics := stats.Result{
Index: stats.Index{
TotalChunks: int64(ct),
PostFilterChunks: int64(len(filtered)),
},
}

resp := &logproto.ShardsResponse{
Statistics: statistics,
}
resp := &logproto.ShardsResponse{}

// Edge case: if there are no chunks after filtering, we still need to return a single shard
if len(filtered) == 0 {
Expand Down Expand Up @@ -530,8 +524,8 @@ func (g *Gateway) boundedShards(
ms := syntax.MatchersExpr{Mts: p.Matchers}
level.Debug(logger).Log(
"msg", "send shards response",
"total_chunks", statistics.Index.TotalChunks,
"post_filter_chunks", statistics.Index.PostFilterChunks,
"total_chunks", ct,
"post_filter_chunks", len(filtered),
"shards", len(resp.Shards),
"query", req.Query,
"target_bytes_per_shard", datasize.ByteSize(req.TargetBytesPerShard).HumanReadable(),
Expand Down
445 changes: 252 additions & 193 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ message GetChunkRefRequest {

message GetChunkRefResponse {
repeated ChunkRef refs = 1;
stats.Index stats = 2 [(gogoproto.nullable) = false];
}

message GetSeriesRequest {
Expand Down
13 changes: 13 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (c *Context) Store() Store {
return c.store
}

// Index returns the index statistics accumulated so far.
func (c *Context) Index() Index {
return c.index
}

// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Expand Down Expand Up @@ -402,6 +407,14 @@ func (c *Context) AddChunksRef(i int64) {
atomic.AddInt64(&c.store.TotalChunksRef, i)
}

func (c *Context) AddIndexTotalChunkRefs(i int64) {
atomic.AddInt64(&c.index.TotalChunks, i)
}

func (c *Context) AddIndexPostFilterChunkRefs(i int64) {
atomic.AddInt64(&c.index.PostFilterChunks, i)
}

// AddCacheEntriesFound counts the number of cache entries requested and found
func (c *Context) AddCacheEntriesFound(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
Expand Down
9 changes: 1 addition & 8 deletions pkg/storage/async_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,8 @@ func mergeShardsFromIngestersAndStore(

shards := sharding.LinearShards(int(totalBytes/targetBytesPerShard), totalBytes)

// increment the total chunks by the number seen from ingesters
// NB(owen-d): this isn't perfect as it mixes signals a bit by joining
// store chunks which _could_ possibly be filtered with ingester chunks which can't,
// but it's still directionally helpful
updatedStats := storeResp.Statistics
updatedStats.Index.TotalChunks += int64(statsResp.Chunks)
return &logproto.ShardsResponse{
Shards: shards,
Statistics: updatedStats,
Shards: shards,
// explicitly nil chunkgroups when we've changed the shards+included chunkrefs from ingesters
ChunkGroups: nil,
}
Expand Down
37 changes: 14 additions & 23 deletions pkg/storage/async_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ import (
"time"

"github.com/go-kit/log"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand Down Expand Up @@ -374,14 +371,9 @@ func TestMergeShardsFromIngestersAndStore(t *testing.T) {
}

// creates n shards with bytesPerShard * n bytes and chks chunks
mkShards := func(n int, bytesPerShard uint64, chks int64) logproto.ShardsResponse {
mkShards := func(n int, bytesPerShard uint64) logproto.ShardsResponse {
return logproto.ShardsResponse{
Shards: sharding.LinearShards(n, bytesPerShard*uint64(n)),
Statistics: stats.Result{
Index: stats.Index{
TotalChunks: chks,
},
},
}
}

Expand All @@ -396,32 +388,32 @@ func TestMergeShardsFromIngestersAndStore(t *testing.T) {
{
desc: "zero bytes returns one full shard",
ingester: mkStats(0, 0),
store: mkShards(0, 0, 0),
exp: mkShards(1, 0, 0),
store: mkShards(0, 0),
exp: mkShards(1, 0),
},
{
desc: "zero ingester bytes honors store",
ingester: mkStats(0, 0),
store: mkShards(10, uint64(targetBytesPerShard), 10),
exp: mkShards(10, uint64(targetBytesPerShard), 10),
store: mkShards(10, uint64(targetBytesPerShard)),
exp: mkShards(10, uint64(targetBytesPerShard)),
},
{
desc: "zero store bytes honors ingester",
ingester: mkStats(uint64(targetBytesPerShard*10), 10),
store: mkShards(0, 0, 0),
exp: mkShards(10, uint64(targetBytesPerShard), 10),
store: mkShards(0, 0),
exp: mkShards(10, uint64(targetBytesPerShard)),
},
{
desc: "ingester bytes below threshold ignored",
ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters
store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store
exp: mkShards(10, uint64(targetBytesPerShard), 10), // use the store's resp
ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters
store: mkShards(10, uint64(targetBytesPerShard)), // 10 shards worth from store
exp: mkShards(10, uint64(targetBytesPerShard)), // use the store's resp
},
{
desc: "ingester bytes above threshold recreate shards",
ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters
store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store
exp: mkShards(14, uint64(targetBytesPerShard), 20), // regenerate 14 shards
ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters
store: mkShards(10, uint64(targetBytesPerShard)), // 10 shards worth from store
exp: mkShards(14, uint64(targetBytesPerShard)), // regenerate 14 shards
},
} {

Expand All @@ -434,7 +426,6 @@ func TestMergeShardsFromIngestersAndStore(t *testing.T) {
)
require.Equal(t, tc.exp.Statistics, got.Statistics)
require.Equal(t, tc.exp.ChunkGroups, got.ChunkGroups)
require.Equal(t, tc.exp.Statistics.Index.TotalChunks, got.Statistics.Index.TotalChunks)
for i, shard := range tc.exp.Shards {
require.Equal(t, shard, got.Shards[i], "shard %d", i)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
statscontext "github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
Expand Down Expand Up @@ -58,6 +59,10 @@ func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, _ string, fr
result[i] = *ref
}

statsCtx := statscontext.FromContext(ctx)
statsCtx.AddIndexTotalChunkRefs(response.Stats.TotalChunks)
statsCtx.AddIndexPostFilterChunkRefs(response.Stats.PostFilterChunks)

return result, nil
}

Expand Down

0 comments on commit baaaa83

Please sign in to comment.