Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): Prefetch bloom blocks as soon as they are built #15050

Merged
merged 13 commits into from
Nov 22, 2024
53 changes: 45 additions & 8 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,35 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

func (g *Gateway) PrefetchBloomBlocks(ctx context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
refs, err := decodeBlockKeys(req.Blocks)
if err != nil {
return nil, err
}
bqs, err := g.bloomStore.FetchBlocks(
ctx,
refs,
bloomshipper.WithFetchAsync(true),
bloomshipper.WithIgnoreNotFound(true),
)
if err != nil {
return nil, err
}

for _, bq := range bqs {
if bq == nil {
// This is the expected case: the blocks is not yet downloaded and the block querier is nil
continue
}

// Close any block querier that were already downloaded
if err := bq.Close(); err != nil {
level.Warn(g.logger).Log("msg", "failed to close block querier", "err", err)
}
}
return &logproto.PrefetchBloomBlocksResponse{}, err
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
Expand Down Expand Up @@ -204,14 +233,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
blocks, err := decodeBlockKeys(req.Blocks)
if err != nil {
stats.Status = labelFailure
return nil, err
}

// Shortcut if request does not contain blocks
Expand Down Expand Up @@ -470,3 +495,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR

cur.Refs = cur.Refs[:len(res)]
}

func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, len(keys))
for _, key := range keys {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}
return blocks, nil
}
8 changes: 8 additions & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.

type ClientCache struct {
cache *resultscache.ResultsCache
next logproto.BloomGatewayClient
limits CacheLimits
logger log.Logger
}
Expand Down Expand Up @@ -149,12 +150,19 @@ func NewBloomGatewayClientCacheMiddleware(
)

return &ClientCache{
next: next,
cache: resultsCache,
limits: limits,
logger: logger,
}
}

// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (c *ClientCache) PrefetchBloomBlocks(ctx context.Context, in *logproto.PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
return c.next.PrefetchBloomBlocks(ctx, in, opts...)
}

// FilterChunkRefs implements logproto.BloomGatewayClient.
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
Expand Down
8 changes: 8 additions & 0 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ type mockServer struct {
res *logproto.FilterChunkRefResponse
}

var _ logproto.BloomGatewayClient = &mockServer{}

func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) {
var calls int
return &mockServer{
Expand All @@ -480,11 +482,17 @@ func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) {
s.res = res
}

// FilterChunkRefs implements logproto.BloomGatewayClient.
func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
*s.calls++
return s.res, nil
}

// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (s *mockServer) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest, _ ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
panic("unimplemented")
}

type mockLimits struct {
cacheFreshness time.Duration
cacheInterval time.Duration
Expand Down
Loading
Loading