Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): Prefetch bloom blocks as soon as they are built #15050

Merged
merged 13 commits into from
Nov 22, 2024
39 changes: 28 additions & 11 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in pkg/bloomgateway/bloomgateway.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

: # github.com/grafana/loki/v3/pkg/bloomgateway [github.com/grafana/loki/v3/pkg/bloomgateway.test]
The bloom gateway is a component that can be run as a standalone microserivce
target and provides capabilities for filtering ChunkRefs based on a given list
of line filter expressions.
Expand Down Expand Up @@ -161,9 +161,18 @@
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

func (g *Gateway) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
// TODO: Implement prefetching of bloom blocks
return &logproto.PrefetchBloomBlocksResponse{}, nil
func (g *Gateway) PrefetchBloomBlocks(ctx context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
refs, err := decodeBlockKeys(req.Blocks)
if err != nil {
return nil, err
}
_, err = g.bloomStore.FetchBlocks(
chaudum marked this conversation as resolved.
Show resolved Hide resolved
ctx,
refs,
bloomshipper.WithFetchAsync(true),
bloomshipper.WithIgnoreNotFound(true),
)
return &logproto.PrefetchBloomBlocksResponse{}, err
}

// FilterChunkRefs implements BloomGatewayServer
Expand Down Expand Up @@ -209,14 +218,10 @@
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
blocks, err := decodeBlockKeys(req.Blocks)
if err != nil {
stats.Status = labelFailure
return nil, err
}

// Shortcut if request does not contain blocks
Expand Down Expand Up @@ -475,3 +480,15 @@

cur.Refs = cur.Refs[:len(res)]
}

func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, len(keys))
for _, key := range keys {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}
return blocks, nil
}
Loading