Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom-Gateway cache #11380

Merged
merged 11 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,21 @@ client:
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]

results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# bloom-gateway-client.cache
[cache: <cache_config>]

# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -bloom-gateway-client.cache.compression
[compression: <string> | default = ""]

# Flag to control whether to cache bloom gateway client requests/responses.
# CLI flag: -bloom-gateway-client.cache_results
[cache_results: <boolean> | default = false]

# Number of workers to use for filtering chunks concurrently.
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
Expand Down Expand Up @@ -3028,6 +3043,10 @@ shard_streams:
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]

# Interval for computing the cache key in the Bloom Gateway.
# CLI flag: -bloom-gateway.cache-key-interval
[bloom_gateway_cache_key_interval: <duration> | default = 15m]

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
Expand Down Expand Up @@ -4233,6 +4252,7 @@ The TLS configuration.

The cache block configures the cache backend. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-gateway-client.cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.volume-results-cache`
Expand Down
217 changes: 217 additions & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package bloomgateway

import (
"context"
"flag"
"sort"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
)

const (
cacheParalellism = 1
)

type CacheConfig struct {
resultscache.Config `yaml:",inline"`
}

// RegisterFlags registers flags.
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("bloom-gateway-client.cache.", f)
}

func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(f, prefix)
}

type CacheLimits interface {
resultscache.Limits
BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}

type keyGen struct {
CacheLimits
}

func newCacheKeyGen(limits CacheLimits) keyGen {
return keyGen{limits}
}

func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string {
return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r)
}

type extractor struct{}

func newExtractor() extractor {
return extractor{}
}

// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds.
// We remove chunks that are not within the given time range.
func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64) resultscache.Response {
res := r.(*logproto.FilterChunkRefResponse)

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(res.ChunkRefs))
for _, chunkRef := range res.ChunkRefs {
refs := make([]*logproto.ShortRef, 0, len(chunkRef.Refs))
for _, ref := range chunkRef.Refs {
if model.Time(end) < ref.From || ref.Through <= model.Time(start) {
continue
}
refs = append(refs, ref)
}
Comment on lines +67 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can reduce the mount of iterations by sorting chunkRef.Refs by From date and binary search the first item where From > end .
Then iterate only over chunkRef.Refs[idx:].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but sorting also requires the same count of iterations to ensure each item is in order. and after this, we will do a binary search. so, I believe it's better to leave it as is. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will save us any iterations: Sorting + bin search is O(n*logn + logn), whereas here we are only iterating once so O(n).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Ideally, chunkRef.Refs was already sorted, but I cannot guarantee that in the bloom gateway

if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.Tenant,
Refs: refs,
})
}
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: chunkRefs,
}
}

type merger struct{}

func newMerger() merger {
return merger{}
}

// MergeResponse merges responses from multiple requests into a single Response
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
size += len(res.ChunkRefs)
}

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs = append(chunkRefs, res.ChunkRefs...)
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
}, nil
}

// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
if len(chunkRefs) <= 1 {
return chunkRefs
}

sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})

var lastDiffFP int
for i := 1; i < len(chunkRefs); i++ {
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
} else {
lastDiffFP++
chunkRefs[lastDiffFP] = chunkRefs[i]
}
}
return chunkRefs[:lastDiffFP+1]
}

// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
if len(refs) <= 1 {
return refs
}

sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
return a.Checksum == b.Checksum
})
}

type ClientCache struct {
cache *resultscache.ResultsCache
limits CacheLimits
logger log.Logger
}

func NewBloomGatewayClientCacheMiddleware(
logger log.Logger,
next logproto.BloomGatewayClient,
c cache.Cache,
limits CacheLimits,
cacheGen resultscache.CacheGenNumberLoader,
retentionEnabled bool,
) *ClientCache {
nextAsHandler := resultscache.HandlerFunc(func(ctx context.Context, cacheReq resultscache.Request) (resultscache.Response, error) {
req := cacheReq.(requestWithGrpcCallOptions)
return next.FilterChunkRefs(ctx, req.FilterChunkRefRequest, req.grpcCallOptions...)
})

resultsCache := resultscache.NewResultsCache(
logger,
c,
nextAsHandler,
newCacheKeyGen(limits),
limits,
newMerger(),
newExtractor(),
nil,
nil,
func(_ context.Context, _ []string, _ resultscache.Request) int {
return cacheParalellism
},
cacheGen,
retentionEnabled,
)

return &ClientCache{
cache: resultsCache,
limits: limits,
logger: logger,
}
}

func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
grpcCallOptions: opts,
}
res, err := c.cache.Do(ctx, cacheReq)
if err != nil {
return nil, err
}

return res.(*logproto.FilterChunkRefResponse), nil
}

type requestWithGrpcCallOptions struct {
*logproto.FilterChunkRefRequest
grpcCallOptions []grpc.CallOption
}

func (r requestWithGrpcCallOptions) WithStartEndForCache(start time.Time, end time.Time) resultscache.Request {
return requestWithGrpcCallOptions{
FilterChunkRefRequest: r.FilterChunkRefRequest.WithStartEndForCache(start, end).(*logproto.FilterChunkRefRequest),
grpcCallOptions: r.grpcCallOptions,
}
}
Loading
Loading