From a0b462d366317081d0b4e897776f0e11d7fb3456 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 5 Dec 2023 16:24:18 +0100 Subject: [PATCH] Bloom-Gateway cache (#11380) **What this PR does / why we need it**: This PR adds caching to the bloom-gateway client. It uses the result cache from https://github.com/grafana/loki/pull/11343. Here's how we: - Merge responses: group all chunks by FP and remove duplicated chunk checksums. - Extract responses based on time span: For all chunks in each FP, add to the extracted response only the chunks that overlaps with the desired start and end time. --- docs/sources/configure/_index.md | 20 + pkg/bloomgateway/cache.go | 217 ++++++++ pkg/bloomgateway/cache_test.go | 494 ++++++++++++++++++ pkg/bloomgateway/client.go | 62 ++- pkg/bloomgateway/client_test.go | 4 +- pkg/bloomgateway/config.go | 1 + pkg/bloomgateway/querier.go | 2 - pkg/logproto/compat.go | 82 +++ pkg/logproto/compat_test.go | 68 +++ pkg/logqlmodel/stats/context.go | 1 + pkg/loki/modules.go | 10 +- pkg/storage/chunk/cache/resultscache/cache.go | 2 +- .../chunk/cache/resultscache/config.go | 4 + pkg/validation/limits.go | 6 + 14 files changed, 962 insertions(+), 11 deletions(-) create mode 100644 pkg/bloomgateway/cache.go create mode 100644 pkg/bloomgateway/cache_test.go diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 52466dd5e6241..0323f0c652538 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1838,6 +1838,21 @@ client: # CLI flag: -bloom-gateway-client.log-gateway-requests [log_gateway_requests: | default = false] + results_cache: + # The cache block configures the cache backend. + # The CLI flags prefix for this block configuration is: + # bloom-gateway-client.cache + [cache: ] + + # Use compression in cache. The default is an empty value '', which disables + # compression. Supported values are: 'snappy' and ''. + # CLI flag: -bloom-gateway-client.cache.compression + [compression: | default = ""] + + # Flag to control whether to cache bloom gateway client requests/responses. + # CLI flag: -bloom-gateway-client.cache_results + [cache_results: | default = false] + # Number of workers to use for filtering chunks concurrently. # CLI flag: -bloom-gateway.worker-concurrency [worker_concurrency: | default = 4] @@ -3028,6 +3043,10 @@ shard_streams: # CLI flag: -bloom-gateway.blocks-downloading-parallelism [bloom_gateway_blocks_downloading_parallelism: | default = 50] +# Interval for computing the cache key in the Bloom Gateway. +# CLI flag: -bloom-gateway.cache-key-interval +[bloom_gateway_cache_key_interval: | default = 15m] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] @@ -4233,6 +4252,7 @@ The TLS configuration. The cache block configures the cache backend. The supported CLI flags `` used to reference this configuration block are: +- `bloom-gateway-client.cache` - `frontend` - `frontend.index-stats-results-cache` - `frontend.volume-results-cache` diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go new file mode 100644 index 0000000000000..fe40b87e95488 --- /dev/null +++ b/pkg/bloomgateway/cache.go @@ -0,0 +1,217 @@ +package bloomgateway + +import ( + "context" + "flag" + "sort" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/common/model" + "golang.org/x/exp/slices" + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" +) + +const ( + cacheParalellism = 1 +) + +type CacheConfig struct { + resultscache.Config `yaml:",inline"` +} + +// RegisterFlags registers flags. +func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("bloom-gateway-client.cache.", f) +} + +func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.Config.RegisterFlagsWithPrefix(f, prefix) +} + +type CacheLimits interface { + resultscache.Limits + BloomGatewayCacheKeyInterval(tenantID string) time.Duration +} + +type keyGen struct { + CacheLimits +} + +func newCacheKeyGen(limits CacheLimits) keyGen { + return keyGen{limits} +} + +func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string { + return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r) +} + +type extractor struct{} + +func newExtractor() extractor { + return extractor{} +} + +// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds. +// We remove chunks that are not within the given time range. +func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64) resultscache.Response { + res := r.(*logproto.FilterChunkRefResponse) + + chunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(res.ChunkRefs)) + for _, chunkRef := range res.ChunkRefs { + refs := make([]*logproto.ShortRef, 0, len(chunkRef.Refs)) + for _, ref := range chunkRef.Refs { + if model.Time(end) < ref.From || ref.Through <= model.Time(start) { + continue + } + refs = append(refs, ref) + } + if len(refs) > 0 { + chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{ + Fingerprint: chunkRef.Fingerprint, + Tenant: chunkRef.Tenant, + Refs: refs, + }) + } + } + + return &logproto.FilterChunkRefResponse{ + ChunkRefs: chunkRefs, + } +} + +type merger struct{} + +func newMerger() merger { + return merger{} +} + +// MergeResponse merges responses from multiple requests into a single Response +// We merge all chunks grouped by their fingerprint. +func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) { + var size int + for _, r := range responses { + res := r.(*logproto.FilterChunkRefResponse) + size += len(res.ChunkRefs) + } + + chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size) + for _, r := range responses { + res := r.(*logproto.FilterChunkRefResponse) + chunkRefs = append(chunkRefs, res.ChunkRefs...) + } + + return &logproto.FilterChunkRefResponse{ + ChunkRefs: mergeGroupedChunkRefs(chunkRefs), + }, nil +} + +// Merge duplicated fingerprints by: +// 1. Sort the chunkRefs by their stream fingerprint +// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list. +func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { + if len(chunkRefs) <= 1 { + return chunkRefs + } + + sort.Slice(chunkRefs, func(i, j int) bool { + return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint + }) + + var lastDiffFP int + for i := 1; i < len(chunkRefs); i++ { + if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint { + chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...)) + } else { + lastDiffFP++ + chunkRefs[lastDiffFP] = chunkRefs[i] + } + } + return chunkRefs[:lastDiffFP+1] +} + +// mergeShortRefs merges short-refs by removing duplicated checksums. +func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef { + if len(refs) <= 1 { + return refs + } + + sort.Slice(refs, func(i, j int) bool { + return refs[i].Checksum < refs[j].Checksum + }) + return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool { + return a.Checksum == b.Checksum + }) +} + +type ClientCache struct { + cache *resultscache.ResultsCache + limits CacheLimits + logger log.Logger +} + +func NewBloomGatewayClientCacheMiddleware( + logger log.Logger, + next logproto.BloomGatewayClient, + c cache.Cache, + limits CacheLimits, + cacheGen resultscache.CacheGenNumberLoader, + retentionEnabled bool, +) *ClientCache { + nextAsHandler := resultscache.HandlerFunc(func(ctx context.Context, cacheReq resultscache.Request) (resultscache.Response, error) { + req := cacheReq.(requestWithGrpcCallOptions) + return next.FilterChunkRefs(ctx, req.FilterChunkRefRequest, req.grpcCallOptions...) + }) + + resultsCache := resultscache.NewResultsCache( + logger, + c, + nextAsHandler, + newCacheKeyGen(limits), + limits, + newMerger(), + newExtractor(), + nil, + nil, + func(_ context.Context, _ []string, _ resultscache.Request) int { + return cacheParalellism + }, + cacheGen, + retentionEnabled, + ) + + return &ClientCache{ + cache: resultsCache, + limits: limits, + logger: logger, + } +} + +func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) { + cacheReq := requestWithGrpcCallOptions{ + FilterChunkRefRequest: req, + grpcCallOptions: opts, + } + res, err := c.cache.Do(ctx, cacheReq) + if err != nil { + return nil, err + } + + return res.(*logproto.FilterChunkRefResponse), nil +} + +type requestWithGrpcCallOptions struct { + *logproto.FilterChunkRefRequest + grpcCallOptions []grpc.CallOption +} + +func (r requestWithGrpcCallOptions) WithStartEndForCache(start time.Time, end time.Time) resultscache.Request { + return requestWithGrpcCallOptions{ + FilterChunkRefRequest: r.FilterChunkRefRequest.WithStartEndForCache(start, end).(*logproto.FilterChunkRefRequest), + grpcCallOptions: r.grpcCallOptions, + } +} diff --git a/pkg/bloomgateway/cache_test.go b/pkg/bloomgateway/cache_test.go new file mode 100644 index 0000000000000..5a66162000a46 --- /dev/null +++ b/pkg/bloomgateway/cache_test.go @@ -0,0 +1,494 @@ +package bloomgateway + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" + "github.com/grafana/loki/pkg/util/constants" +) + +// Range is 1000-4000 +var templateResponse = &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 1, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 3000, + Through: 4000, + Checksum: 30, + }, + { + From: 1000, + Through: 3000, + Checksum: 40, + }, + }, + }, + }, +} + +func TestExtract(t *testing.T) { + for _, tc := range []struct { + name string + start int64 + end int64 + input *logproto.FilterChunkRefResponse + expected *logproto.FilterChunkRefResponse + }{ + { + name: "start and end out of range", + start: 100, + end: 200, + input: templateResponse, + expected: &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{}, + }, + }, + { + name: "start spans exact range", + start: 1000, + end: 4000, + input: templateResponse, + expected: templateResponse, + }, + { + name: "start spans more than range", + start: 100, + end: 5000, + input: templateResponse, + expected: templateResponse, + }, + { + name: "start and end within range", + start: 1700, + end: 2700, + input: templateResponse, + expected: &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 1, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 3000, + Checksum: 40, + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + e := newExtractor() + actual := e.Extract(tc.start, tc.end, tc.input, 0, 0) + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestMerge(t *testing.T) { + for _, tc := range []struct { + name string + input []*logproto.FilterChunkRefResponse + expected *logproto.FilterChunkRefResponse + }{ + { + name: "empy input", + input: []*logproto.FilterChunkRefResponse{}, + expected: &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{}, + }, + }, + { + name: "single input", + input: []*logproto.FilterChunkRefResponse{templateResponse}, + expected: templateResponse, + }, + { + name: "repeating and non-repeating fingerprint with repeating and non-repeating chunks", + input: []*logproto.FilterChunkRefResponse{ + { + ChunkRefs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 1, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + }, + }, + { + ChunkRefs: []*logproto.GroupedChunkRefs{ + // Same FP as in previous input and same chunks + { + Fingerprint: 1, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + // Same FP as in previous input, but different chunks + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + // Same chunk as in previous input + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + // New chunk + { + From: 2000, + Through: 2500, + Checksum: 30, + }, + }, + }, + // New FP + { + Fingerprint: 3, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + }, + }, + { + ChunkRefs: []*logproto.GroupedChunkRefs{ + // Same FP as in previous input and diff chunks + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 700, + Through: 1000, + Checksum: 40, + }, + { + From: 2000, + Through: 2700, + Checksum: 50, + }, + }, + }, + }, + }, + }, + expected: &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 1, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + { + From: 2000, + Through: 2500, + Checksum: 30, + }, + { + From: 700, + Through: 1000, + Checksum: 40, + }, + { + From: 2000, + Through: 2700, + Checksum: 50, + }, + }, + }, + { + Fingerprint: 3, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + From: 1500, + Through: 2500, + Checksum: 20, + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + input := make([]resultscache.Response, 0, len(tc.input)) + for _, i := range tc.input { + input = append(input, i) + } + + m := newMerger() + actual, err := m.MergeResponse(input...) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestCache(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "fake") + + limits := mockLimits{ + cacheInterval: 15 * time.Minute, + } + + cfg := CacheConfig{ + Config: resultscache.Config{ + CacheConfig: cache.Config{ + Cache: cache.NewMockCache(), + }, + }, + } + c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger(), stats.BloomFilterCache, constants.Loki) + require.NoError(t, err) + defer c.Stop() + + chunkRefs := []*logproto.ChunkRef{ + { + Fingerprint: 2, + UserID: "fake", + From: 1500, + Through: 2500, + Checksum: 30, + }, + { + Fingerprint: 3, + UserID: "fake", + From: 2500, + Through: 3500, + }, + } + req := &logproto.FilterChunkRefRequest{ + From: model.Time(2000), + Through: model.Time(3000), + Refs: groupRefs(t, chunkRefs), + Filters: []*logproto.LineFilterExpression{ + {Operator: 1, Match: "foo"}, + }, + } + expectedRes := &logproto.FilterChunkRefResponse{ + ChunkRefs: groupRefs(t, chunkRefs), + } + + server, calls := newMockServer(expectedRes) + + cacheMiddleware := NewBloomGatewayClientCacheMiddleware( + log.NewNopLogger(), + server, + c, + limits, + nil, + false, + ) + + // First call should go to the server + *calls = 0 + res, err := cacheMiddleware.FilterChunkRefs(ctx, req) + require.NoError(t, err) + require.Equal(t, 1, *calls) + require.Equal(t, expectedRes, res) + + // Second call should go to the cache + *calls = 0 + res, err = cacheMiddleware.FilterChunkRefs(ctx, req) + require.NoError(t, err) + require.Equal(t, 0, *calls) + require.Equal(t, expectedRes, res) + + // Doing a request with new start and end should: + // 1. hit the server the leading time + // 2. hit the cache the cached span + // 3. hit the server for the trailing time + newChunkRefs := []*logproto.ChunkRef{ + { + Fingerprint: 1, + UserID: "fake", + From: 1000, + Through: 1500, + Checksum: 10, + }, + { + Fingerprint: 4, + UserID: "fake", + From: 3500, + Through: 4500, + }, + } + server.SetResponse(&logproto.FilterChunkRefResponse{ + ChunkRefs: groupRefs(t, newChunkRefs), + }) + expectedRes = &logproto.FilterChunkRefResponse{ + ChunkRefs: groupRefs(t, append(chunkRefs, newChunkRefs...)), + } + req.From = model.Time(100) + req.Through = model.Time(5000) + *calls = 0 + res, err = cacheMiddleware.FilterChunkRefs(ctx, req) + require.NoError(t, err) + require.Equal(t, 2, *calls) + require.Equal(t, expectedRes, res) + + // Doing a request again should only hit the cache + *calls = 0 + res, err = cacheMiddleware.FilterChunkRefs(ctx, req) + require.NoError(t, err) + require.Equal(t, 0, *calls) + require.Equal(t, expectedRes, res) +} + +type mockServer struct { + calls *int + res *logproto.FilterChunkRefResponse +} + +func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) { + var calls int + return &mockServer{ + calls: &calls, + res: res, + }, &calls +} + +func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) { + s.res = res +} + +func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) { + *s.calls++ + return s.res, nil +} + +type mockLimits struct { + cacheFreshness time.Duration + cacheInterval time.Duration +} + +func (m mockLimits) MaxCacheFreshness(_ context.Context, _ string) time.Duration { + return m.cacheFreshness +} + +func (m mockLimits) BloomGatewayCacheKeyInterval(_ string) time.Duration { + return m.cacheInterval +} diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 05686a3ab0ab9..cfbb6c60284ec 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -25,8 +25,11 @@ import ( "github.com/grafana/loki/pkg/distributor/clientpool" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/queue" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/pkg/util/constants" ) @@ -99,6 +102,10 @@ type ClientConfig struct { // Ring is the Bloom Gateway ring used to find the appropriate Bloom Gateway instance // this client should talk to. Ring ring.ReadRing `yaml:"-"` + + // Cache configures the cache used to store the results of the Bloom Gateway server. + Cache CacheConfig `yaml:"results_cache,omitempty"` + CacheResults bool `yaml:"cache_results"` } // RegisterFlags registers flags for the Bloom Gateway client configuration. @@ -109,9 +116,25 @@ func (i *ClientConfig) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix. func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc", f) + i.Cache.RegisterFlagsWithPrefix(prefix+"cache.", f) + f.BoolVar(&i.CacheResults, prefix+"cache_results", false, "Flag to control whether to cache bloom gateway client requests/responses.") f.BoolVar(&i.LogGatewayRequests, prefix+"log-gateway-requests", false, "Flag to control whether requests sent to the gateway should be logged or not.") } +func (i *ClientConfig) Validate() error { + if err := i.GRPCClientConfig.Validate(); err != nil { + return errors.Wrap(err, "grpc client config") + } + + if i.CacheResults { + if err := i.Cache.Validate(); err != nil { + return errors.Wrap(err, "cache config") + } + } + + return nil +} + type Client interface { FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) } @@ -124,7 +147,15 @@ type GatewayClient struct { ring ring.ReadRing } -func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (*GatewayClient, error) { +func NewGatewayClient( + cfg ClientConfig, + limits Limits, + registerer prometheus.Registerer, + logger log.Logger, + metricsNamespace string, + cacheGen resultscache.CacheGenNumberLoader, + retentionEnabled bool, +) (*GatewayClient, error) { latency := promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: constants.Loki, Subsystem: "bloom_gateway", @@ -138,22 +169,43 @@ func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Reg return nil, err } + var c cache.Cache + if cfg.CacheResults { + c, err = cache.New(cfg.Cache.CacheConfig, registerer, logger, stats.BloomFilterCache, constants.Loki) + if err != nil { + return nil, errors.Wrap(err, "new bloom gateway cache") + } + if cfg.Cache.Compression == "snappy" { + c = cache.NewSnappy(c, logger) + } + } + poolFactory := func(addr string) (ringclient.PoolClient, error) { pool, err := NewBloomGatewayGRPCPool(addr, dialOpts) if err != nil { return nil, errors.Wrap(err, "new bloom gateway grpc pool") } + + if cfg.CacheResults { + pool.BloomGatewayClient = NewBloomGatewayClientCacheMiddleware( + logger, + pool.BloomGatewayClient, + c, + limits, + cacheGen, + retentionEnabled, + ) + } + return pool, nil } - c := &GatewayClient{ + return &GatewayClient{ cfg: cfg, logger: logger, limits: limits, pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace), - } - - return c, nil + }, nil } func shuffleAddrs(addrs []string) []string { diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index f50c3f7cd6540..f0d5b2edf5c07 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -27,7 +27,7 @@ func TestBloomGatewayClient(t *testing.T) { flagext.DefaultValues(&cfg) t.Run("", func(t *testing.T) { - _, err := NewGatewayClient(cfg, l, reg, logger, "loki") + _, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false) require.NoError(t, err) }) } @@ -194,7 +194,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { cfg := ClientConfig{} flagext.DefaultValues(&cfg) - c, err := NewGatewayClient(cfg, l, reg, logger, "loki") + c, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false) require.NoError(t, err) instances := []ring.InstanceDesc{ diff --git a/pkg/bloomgateway/config.go b/pkg/bloomgateway/config.go index e5d35c42edf5e..3eb94324bd7e8 100644 --- a/pkg/bloomgateway/config.go +++ b/pkg/bloomgateway/config.go @@ -38,6 +38,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } type Limits interface { + CacheLimits BloomGatewayShardSize(tenantID string) int BloomGatewayEnabled(tenantID string) bool BloomGatewayBlocksDownloadingParallelism(tenantID string) int diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index ab67a549b4e65..ec9e2a45842d6 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -41,8 +41,6 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from return nil, err } - // TODO(chaudum): Cache response - // Flatten response from client and return result := make([]*logproto.ChunkRef, 0, len(chunkRefs)) for i := range refs { diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index 648fa17f0f0c7..268e588d3455c 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -1,6 +1,7 @@ package logproto import ( + "encoding/binary" stdjson "encoding/json" "fmt" "math" @@ -10,6 +11,7 @@ import ( "time" "unsafe" + "github.com/cespare/xxhash/v2" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -334,3 +336,83 @@ func (m *VolumeRequest) LogToSpan(sp opentracing.Span) { otlog.String("end", timestamp.Time(int64(m.Through)).String()), ) } + +// Satisfy definitions.Request for FilterChunkRefRequest + +// GetStart returns the start timestamp of the request in milliseconds. +func (m *FilterChunkRefRequest) GetStart() time.Time { + return time.UnixMilli(int64(m.From)) +} + +// GetEnd returns the end timestamp of the request in milliseconds. +func (m *FilterChunkRefRequest) GetEnd() time.Time { + return time.UnixMilli(int64(m.Through)) +} + +// GetStep returns the step of the request in milliseconds. Always 0. +func (m *FilterChunkRefRequest) GetStep() int64 { + return 0 +} + +// GetQuery returns the query of the request. +// The query is the hash for the input chunks refs and the filter expressions. +func (m *FilterChunkRefRequest) GetQuery() string { + var encodeBuf []byte + var chunksHash uint64 + if len(m.Refs) > 0 { + h := xxhash.New() + for _, ref := range m.Refs { + _, _ = h.Write(binary.AppendUvarint(encodeBuf[:0], ref.Fingerprint)) + } + chunksHash = h.Sum64() + } + + // Short circuit if there are no filters. + if len(m.Filters) == 0 { + return fmt.Sprintf("%d", chunksHash) + } + + var sb strings.Builder + for i, filter := range m.Filters { + if i > 0 { + sb.WriteString(",") + } + sb.Write(fmt.Appendf(encodeBuf[:0], "%d", filter.Operator)) + sb.WriteString("-") + sb.WriteString(filter.Match) + } + + return fmt.Sprintf("%d/%s", chunksHash, sb.String()) +} + +// GetCachingOptions returns the caching options. +func (m *FilterChunkRefRequest) GetCachingOptions() (res resultscache.CachingOptions) { return } + +// WithStartEndForCache implements resultscache.Request. +func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resultscache.Request { + // We Remove the chunks that are not within the given time range. + chunkRefs := make([]*GroupedChunkRefs, 0, len(m.Refs)) + for _, chunkRef := range m.Refs { + refs := make([]*ShortRef, 0, len(chunkRef.Refs)) + for _, ref := range chunkRef.Refs { + if end.Before(ref.From.Time()) || ref.Through.Time().Before(start) { + continue + } + refs = append(refs, ref) + } + if len(refs) > 0 { + chunkRefs = append(chunkRefs, &GroupedChunkRefs{ + Fingerprint: chunkRef.Fingerprint, + Tenant: chunkRef.Tenant, + Refs: refs, + }) + } + } + + clone := *m + clone.From = model.TimeFromUnixNano(start.UnixNano()) + clone.Through = model.TimeFromUnixNano(end.UnixNano()) + clone.Refs = chunkRefs + + return &clone +} diff --git a/pkg/logproto/compat_test.go b/pkg/logproto/compat_test.go index 84afa501b68dd..2547c12de968f 100644 --- a/pkg/logproto/compat_test.go +++ b/pkg/logproto/compat_test.go @@ -278,6 +278,74 @@ func TestMergeSeriesResponses(t *testing.T) { } } +func TestFilterChunkRefRequestGetQuery(t *testing.T) { + for _, tc := range []struct { + desc string + request FilterChunkRefRequest + expected string + }{ + { + desc: "empty request", + expected: `0`, + }, + { + desc: "request no filters", + request: FilterChunkRefRequest{ + Refs: []*GroupedChunkRefs{ + { + Fingerprint: 1, + Tenant: "test", + }, + }, + }, + expected: `9962287286179718960`, + }, + { + desc: "request with filters but no chunks", + request: FilterChunkRefRequest{ + Filters: []*LineFilterExpression{ + { + Operator: 0, + Match: "uuid", + }, + }, + }, + expected: `0/0-uuid`, + }, + { + desc: "request with filters and chunks", + request: FilterChunkRefRequest{ + Refs: []*GroupedChunkRefs{ + { + Fingerprint: 1, + Tenant: "test", + }, + { + Fingerprint: 2, + Tenant: "test", + }, + }, + Filters: []*LineFilterExpression{ + { + Operator: 0, + Match: "uuid", + }, + { + Operator: 1, + Match: "trace", + }, + }, + }, + expected: `8827404902424034886/0-uuid,1-trace`, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + actual := tc.request.GetQuery() + require.Equal(t, tc.expected, actual) + }) + } +} + func benchmarkMergeLabelResponses(b *testing.B, responses []*LabelResponse) { b.ReportAllocs() for n := 0; n < b.N; n++ { diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 339d934c10eb5..597da62805bb6 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -61,6 +61,7 @@ const ( StatsResultCache = "stats-result" VolumeResultCache = "volume-result" WriteDedupeCache = "write-dedupe" + BloomFilterCache = "bloom-filter" ) // NewContext creates a new statistics context diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 20a15801823b6..246c2ef782417 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1334,7 +1334,15 @@ func (t *Loki) initIndexGateway() (services.Service, error) { var bloomQuerier indexgateway.BloomQuerier if t.Cfg.BloomGateway.Enabled { - bloomGatewayClient, err := bloomgateway.NewGatewayClient(t.Cfg.BloomGateway.Client, t.Overrides, prometheus.DefaultRegisterer, logger, t.Cfg.MetricsNamespace) + bloomGatewayClient, err := bloomgateway.NewGatewayClient( + t.Cfg.BloomGateway.Client, + t.Overrides, + prometheus.DefaultRegisterer, + logger, + t.Cfg.MetricsNamespace, + t.cacheGenerationLoader, + t.Cfg.CompactorConfig.RetentionEnabled, + ) if err != nil { return nil, err } diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index a162aff7e7327..0999ca3271068 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -158,7 +158,7 @@ func (s ResultsCache) handleMiss(ctx context.Context, r Request, maxCacheTime in return nil, nil, err } - if !s.shouldCacheRes(ctx, r, response, maxCacheTime) { + if s.shouldCacheRes != nil && !s.shouldCacheRes(ctx, r, response, maxCacheTime) { return response, []Extent{}, nil } diff --git a/pkg/storage/chunk/cache/resultscache/config.go b/pkg/storage/chunk/cache/resultscache/config.go index 54088639b831f..5a329168e8372 100644 --- a/pkg/storage/chunk/cache/resultscache/config.go +++ b/pkg/storage/chunk/cache/resultscache/config.go @@ -33,6 +33,10 @@ func (cfg *Config) Validate() error { return errors.Errorf("unsupported compression type: %s", cfg.Compression) } + if !cache.IsCacheConfigured(cfg.CacheConfig) { + return errors.New("no cache configured") + } + return nil } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 0234730245e92..cc55662aa27ef 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -191,6 +191,7 @@ type Limits struct { BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` + BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -313,6 +314,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.") f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.") f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.") + f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -811,6 +813,10 @@ func (o *Overrides) BloomGatewayBlocksDownloadingParallelism(userID string) int return o.getOverridesForUser(userID).BloomGatewayBlocksDownloadingParallelism } +func (o *Overrides) BloomGatewayCacheKeyInterval(userID string) time.Duration { + return o.getOverridesForUser(userID).BloomGatewayCacheKeyInterval +} + func (o *Overrides) BloomGatewayEnabled(userID string) bool { return o.getOverridesForUser(userID).BloomGatewayEnabled }