Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom-Gateway cache #11380

Merged
merged 11 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,21 @@ client:
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]

results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# bloom-gateway-client.cache
[cache: <cache_config>]

# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -bloom-gateway-client.cache.compression
[compression: <string> | default = ""]

[parallelism: <int>]

[cache_results: <boolean>]

# Number of workers to use for filtering chunks concurrently.
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
Expand Down Expand Up @@ -3028,6 +3043,10 @@ shard_streams:
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]

# Interval for computing the cache key in the Bloom Gateway.
# CLI flag: -bloom-gateway.cache-key-interval
[bloom_gateway_cache_key_interval: <duration> | default = 15m]

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
Expand Down Expand Up @@ -4233,6 +4252,7 @@ The TLS configuration.

The cache block configures the cache backend. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-gateway-client.cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.volume-results-cache`
Expand Down
201 changes: 201 additions & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package bloomgateway

import (
"context"
"flag"
"sort"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
)

const (
cacheParalellism = 1
)

type CacheConfig struct {
resultscache.Config `yaml:",inline"`
}

// RegisterFlags registers flags.
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(f, "bloom-gateway.cache.")
}

type CacheLimits interface {
resultscache.Limits
BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}

type keyGen struct {
CacheLimits
}

func newCacheKeyGen(limits CacheLimits) keyGen {
return keyGen{limits}
}

func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string {
return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r)
}

type extractor struct{}

func newExtractor() extractor {
return extractor{}
}

// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds.
// We remove chunks that are not within the given time range.
func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64) resultscache.Response {
res := r.(*logproto.FilterChunkRefResponse)

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(res.ChunkRefs))
for _, chunkRef := range res.ChunkRefs {
refs := make([]*logproto.ShortRef, 0, len(chunkRef.Refs))
for _, ref := range chunkRef.Refs {
if model.Time(end) < ref.From || ref.Through <= model.Time(start) {
continue
}
refs = append(refs, ref)
}
Comment on lines +67 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can reduce the mount of iterations by sorting chunkRef.Refs by From date and binary search the first item where From > end .
Then iterate only over chunkRef.Refs[idx:].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but sorting also requires the same count of iterations to ensure each item is in order. and after this, we will do a binary search. so, I believe it's better to leave it as is. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will save us any iterations: Sorting + bin search is O(n*logn + logn), whereas here we are only iterating once so O(n).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Ideally, chunkRef.Refs was already sorted, but I cannot guarantee that in the bloom gateway

if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.Tenant,
Refs: refs,
})
}
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: chunkRefs,
}
}

type merger struct{}

func newMerger() merger {
return merger{}
}

// MergeResponse merges responses from multiple requests into a single Response
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
size += len(res.ChunkRefs)
}

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs = append(chunkRefs, res.ChunkRefs...)
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
}, nil
}

// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Append all chunks with the same fingerprint into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})
return slices.CompactFunc(chunkRefs, func(next, prev *logproto.GroupedChunkRefs) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: There is a slices.CompactFunc :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ;) great function ;) previously I would do it manually ;))))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like it will join the chunks only for the first neighbor elements with the same fingerprint. if we have more than 2 neighbors with the same fingerprint, we will just lose chunk ids...
I am not sure if it's expected right now to have more than 2 elements with the same fingerprint but I would not leave this function because in the future somebody will spend days trying to find this bug.

look at this code snippet, https://go.dev/play/p/c_bt6c1HNUj?v= ,
I created a snippet with users, the first 6 users have ID a , and the next 4 users have ID b. if we use CompactFunc to aggregate all the scores of the users, we won't have the expected results...

compacted 2
&{id:a scores:1}
&{id:b scores:13}

but it has to be:

compacted 2
&{id:a scores:15}
&{id:b scores:30}

Copy link
Contributor Author

@salvacorts salvacorts Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! It's quite useful to remove / aggregate duplicated items in a sorted list As Vlad mentioned, this is buggy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right @vlad-diachenko good catch. I pushed a new commit with a test for this and a fix.

if next.Fingerprint == prev.Fingerprint {
prev.Refs = mergeShortRefs(append(prev.Refs, next.Refs...))
return true
}
return false
})
}

// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
return a.Checksum == b.Checksum
})
}

type ClientCache struct {
cache *resultscache.ResultsCache
limits CacheLimits
logger log.Logger
}

func NewBloomGatewayClientCacheMiddleware(
logger log.Logger,
next logproto.BloomGatewayClient,
c cache.Cache,
limits CacheLimits,
cacheGen resultscache.CacheGenNumberLoader,
retentionEnabled bool,
) *ClientCache {
nextAsHandler := resultscache.HandlerFunc(func(ctx context.Context, cacheReq resultscache.Request) (resultscache.Response, error) {
req := cacheReq.(requestWithGrpcCallOptions)
return next.FilterChunkRefs(ctx, req.FilterChunkRefRequest, req.grpcCallOptions...)
})

resultsCache := resultscache.NewResultsCache(
logger,
c,
nextAsHandler,
newCacheKeyGen(limits),
limits,
newMerger(),
newExtractor(),
nil,
nil,
func(_ context.Context, _ []string, _ resultscache.Request) int {
return cacheParalellism
},
cacheGen,
retentionEnabled,
)

return &ClientCache{
cache: resultsCache,
limits: limits,
logger: logger,
}
}

func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
grpcCallOptions: opts,
}
res, err := c.cache.Do(ctx, cacheReq)
if err != nil {
return nil, err
}

return res.(*logproto.FilterChunkRefResponse), nil
}

type requestWithGrpcCallOptions struct {
*logproto.FilterChunkRefRequest
grpcCallOptions []grpc.CallOption
}

func (r requestWithGrpcCallOptions) WithStartEndForCache(start time.Time, end time.Time) resultscache.Request {
return requestWithGrpcCallOptions{
FilterChunkRefRequest: r.FilterChunkRefRequest.WithStartEndForCache(start, end).(*logproto.FilterChunkRefRequest),
grpcCallOptions: r.grpcCallOptions,
}
}
Loading
Loading