From d84026cab6a2de6bccbb65d25c970969dbd8afbb Mon Sep 17 00:00:00 2001
From: Christian Haudum <christian.haudum@gmail.com>
Date: Wed, 18 Oct 2023 17:28:04 +0200
Subject: [PATCH] Adjust bloom gateway request structure (#10926)

**What this PR does / why we need it**:

Since we already group chunkRefs by fingerprint on the client side to
determine the correct bloom gateway to send the request to, we can keep
the grouping for the gRPC request.
This will not only optimize the request size, but also helps to reduce
complexity of querying blooms on the bloom gateway itself.

There is still conversion between `[]*logproto.ShortRef` and
`v1.ChunkRefs` required, which could be removed by sending
`v1.ChunkRefs` over the wire.

**Special notes for your reviewer**:

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
---
 pkg/bloomgateway/bloomgateway.go              | 23 +-----
 pkg/bloomgateway/bloomgateway_test.go         | 10 ++-
 pkg/bloomgateway/client.go                    | 66 ++++++----------
 pkg/bloomgateway/client_test.go               | 47 +++++-------
 pkg/bloomgateway/querier.go                   | 60 +++++++++------
 pkg/bloomgateway/querier_test.go              |  4 +-
 pkg/logproto/bloomgateway.pb.go               | 68 ++++++++---------
 pkg/logproto/bloomgateway.proto               |  2 +-
 .../stores/shipper/bloomshipper/shipper.go    |  2 +-
 .../stores/shipper/bloomshipper/store.go      | 75 ++++++++++++++++---
 10 files changed, 193 insertions(+), 164 deletions(-)

diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go
index 39937fda67db5..44eae46e4b84f 100644
--- a/pkg/bloomgateway/bloomgateway.go
+++ b/pkg/bloomgateway/bloomgateway.go
@@ -121,8 +121,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
 	}
 
 	for _, ref := range req.Refs {
-		if ref.UserID != tenantID {
-			return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.UserID)
+		if ref.Tenant != tenantID {
+			return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.Tenant)
 		}
 	}
 
@@ -141,22 +141,5 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
 		}
 	}
 
-	// TODO(chaudum): Re-use buffers for response.
-	resp := make([]*logproto.GroupedChunkRefs, 0)
-	for idx, chunkRef := range chunkRefs {
-		fp := chunkRef.Fingerprint
-		shortRef := &logproto.ShortRef{From: chunkRef.From, Through: chunkRef.Through, Checksum: chunkRef.Checksum}
-		if idx == 0 || fp > resp[len(resp)-1].Fingerprint {
-			r := &logproto.GroupedChunkRefs{
-				Fingerprint: fp,
-				Tenant:      tenantID,
-				Refs:        []*logproto.ShortRef{shortRef},
-			}
-			resp = append(resp, r)
-			continue
-		}
-		resp[len(resp)-1].Refs = append(resp[len(resp)-1].Refs, shortRef)
-	}
-
-	return &logproto.FilterChunkRefResponse{ChunkRefs: resp}, nil
+	return &logproto.FilterChunkRefResponse{ChunkRefs: chunkRefs}, nil
 }
diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go
index 58d178b21c0c0..cc492dc4fd678 100644
--- a/pkg/bloomgateway/bloomgateway_test.go
+++ b/pkg/bloomgateway/bloomgateway_test.go
@@ -32,6 +32,12 @@ func parseDayTime(s string) config.DayTime {
 	}
 }
 
+func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
+	t.Helper()
+	grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs))
+	return groupChunkRefs(chunkRefs, grouped)
+}
+
 func TestBloomGateway_StartStopService(t *testing.T) {
 
 	ss := NewNoopStrategy()
@@ -145,7 +151,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
 		req := &logproto.FilterChunkRefRequest{
 			From:    now.Add(-24 * time.Hour),
 			Through: now,
-			Refs:    chunkRefs,
+			Refs:    groupRefs(t, chunkRefs),
 		}
 
 		ctx := user.InjectOrgID(context.Background(), tenantID)
@@ -181,7 +187,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
 		req := &logproto.FilterChunkRefRequest{
 			From:    now.Add(-24 * time.Hour),
 			Through: now,
-			Refs:    chunkRefs,
+			Refs:    groupRefs(t, chunkRefs),
 		}
 
 		ctx := user.InjectOrgID(context.Background(), tenantID)
diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go
index 5ee8c910e7da4..af9440ff18cea 100644
--- a/pkg/bloomgateway/client.go
+++ b/pkg/bloomgateway/client.go
@@ -81,7 +81,7 @@ func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
 }
 
 type Client interface {
-	FilterChunks(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error)
+	FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error)
 }
 
 type GatewayClient struct {
@@ -132,20 +132,19 @@ func shuffleAddrs(addrs []string) []string {
 }
 
 // FilterChunkRefs implements Client
-func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error) {
+func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
 	// Get the addresses of corresponding bloom gateways for each series.
-	_, addrs, err := c.serverAddrsForFingerprints(tenant, fingerprints)
+	fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups)
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 
 	// Group chunk refs by addresses of one or more bloom gateways.
 	// All chunk refs of series that belong to one and the same bloom gateway are set in one batch.
-	streamsByAddr := c.groupStreamsByAddr(fingerprints, chunkRefs, addrs)
+	streamsByAddr := c.groupStreamsByAddr(groups, addrs)
 
 	// TODO(chaudum): We might over-allocate for the filtered responses here?
-	filteredChunkRefs := make([][]*logproto.ChunkRef, 0, len(fingerprints))
-	filteredFingerprints := make([]uint64, 0, len(fingerprints))
+	filteredChunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(fingerprints))
 
 	for _, item := range streamsByAddr {
 		// randomize order of addresses so we don't hotspot the first server in the list
@@ -161,29 +160,14 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
 			if err != nil {
 				return err
 			}
-			for _, refGroup := range resp.ChunkRefs {
-				chunkRefs := make([]*logproto.ChunkRef, 0, len(refGroup.Refs))
-				for _, shortRef := range refGroup.Refs {
-					chunkRefs = append(chunkRefs,
-						&logproto.ChunkRef{
-							Fingerprint: refGroup.Fingerprint,
-							UserID:      refGroup.Tenant,
-							From:        shortRef.From,
-							Through:     shortRef.Through,
-							Checksum:    shortRef.Checksum,
-						},
-					)
-				}
-				filteredFingerprints = append(filteredFingerprints, refGroup.Fingerprint)
-				filteredChunkRefs = append(filteredChunkRefs, chunkRefs)
-			}
+			filteredChunkRefs = append(filteredChunkRefs, resp.ChunkRefs...)
 			return nil
 		})
 		if err != nil {
-			return nil, nil, err
+			return nil, err
 		}
 	}
-	return fingerprints, filteredChunkRefs, nil
+	return filteredChunkRefs, nil
 }
 
 // isEqualStringElements checks if two string slices contain the same elements.
@@ -215,29 +199,19 @@ func listContainsAddrs(list []chunkRefsByAddrs, addrs []string) (int, bool) {
 }
 
 type chunkRefsByAddrs struct {
-	addrs   []string
-	refs    []*logproto.ChunkRef
-	streams []uint64
+	addrs []string
+	refs  []*logproto.GroupedChunkRefs
 }
 
-// groupStreamsByAddr takes a slice of stream fingerprints, a slices of chunkRef slices, and a slice of address slices
-// and groups them into a slice of chunkRefsByAddrs.
-// streams is a slice of uint64 stream fingerprints
-// chunks is a slice of chunk ref slices
-// addresses is a slice of string slices containing server addresses
-// It is necessary that len(streams) == len(chunks) == len(addresses), but the
-// function implementation does not validate the precondition and would fail silently.
-func (c *GatewayClient) groupStreamsByAddr(streams []uint64, chunks [][]*logproto.ChunkRef, addresses [][]string) []chunkRefsByAddrs {
+func (c *GatewayClient) groupStreamsByAddr(groups []*logproto.GroupedChunkRefs, addresses [][]string) []chunkRefsByAddrs {
 	res := make([]chunkRefsByAddrs, 0, len(addresses))
 	for i := 0; i < len(addresses); i++ {
 		addrs := addresses[i]
-		refs := chunks[i]
-		fp := streams[i]
+		refs := groups[i]
 		if idx, ok := listContainsAddrs(res, addrs); ok {
-			res[idx].refs = append(res[idx].refs, refs...)
-			res[idx].streams = append(res[idx].streams, fp)
+			res[idx].refs = append(res[idx].refs, refs)
 		} else {
-			res = append(res, chunkRefsByAddrs{addrs: addrs, refs: refs, streams: []uint64{fp}})
+			res = append(res, chunkRefsByAddrs{addrs: addrs, refs: []*logproto.GroupedChunkRefs{refs}})
 		}
 	}
 	return res
@@ -272,7 +246,7 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
 // Returns an error in case the bloom gateway ring could not get the
 // corresponding replica set for a given fingerprint.
 // Warning: This function becomes inefficient when the number of fingerprints is very large.
-func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints []uint64) ([]uint64, [][]string, error) {
+func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, groups []*logproto.GroupedChunkRefs) ([]uint64, [][]string, error) {
 	subRing := GetShuffleShardingSubring(c.ring, tenantID, c.limits)
 
 	rs, err := subRing.GetAllHealthy(BlocksRead)
@@ -285,7 +259,7 @@ func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints
 		numTokens += len(instanceDesc.Tokens)
 	}
 
-	numFingerprints := len(fingerprints)
+	numFingerprints := len(groups)
 	if numFingerprints > int(float64(numTokens)*math.Log2(float64(numFingerprints))) {
 		// TODO(chaudum): Implement algorithm in O(n * m * log(k) + n) instead of O(k) by iterating over ring tokens
 		// and finding corresponding fingerprint ranges using binary search.
@@ -295,14 +269,16 @@ func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints
 		level.Warn(c.logger).Log("msg", "using an inefficient algorithm to determin server addresses for fingerprints", "fingerprints", numFingerprints, "tokens", numTokens)
 	}
 
+	fingerprints := make([]uint64, numFingerprints)
 	addresses := make([][]string, numFingerprints)
 	bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
 
-	for idx, key := range fingerprints {
-		rs, err = subRing.Get(uint32(key), BlocksRead, bufDescs, bufHosts, bufZones)
+	for idx, key := range groups {
+		rs, err = subRing.Get(uint32(key.Fingerprint), BlocksRead, bufDescs, bufHosts, bufZones)
 		if err != nil {
 			return nil, nil, errors.Wrap(err, "bloom gateway get ring")
 		}
+		fingerprints[idx] = key.Fingerprint
 		addresses[idx] = rs.GetAddresses()
 	}
 
diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go
index 3d56eb367f6bf..d1e31643e4b0f 100644
--- a/pkg/bloomgateway/client_test.go
+++ b/pkg/bloomgateway/client_test.go
@@ -45,25 +45,22 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
 
 	testCases := []struct {
 		name      string
-		streams   []uint64
-		chunks    [][]*logproto.ChunkRef
+		chunks    []*logproto.GroupedChunkRefs
 		addresses [][]string
 		expected  []chunkRefsByAddrs
 	}{
 		{
 			name:      "empty input yields empty result",
-			streams:   []uint64{},
-			chunks:    [][]*logproto.ChunkRef{},
+			chunks:    []*logproto.GroupedChunkRefs{},
 			addresses: [][]string{},
 			expected:  []chunkRefsByAddrs{},
 		},
 		{
-			name:    "addresses with same elements are grouped into single item",
-			streams: []uint64{1, 2, 3},
-			chunks: [][]*logproto.ChunkRef{
-				{{Fingerprint: 1, Checksum: 1}},
-				{{Fingerprint: 2, Checksum: 2}},
-				{{Fingerprint: 3, Checksum: 3}},
+			name: "addresses with same elements are grouped into single item",
+			chunks: []*logproto.GroupedChunkRefs{
+				{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
+				{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
+				{Fingerprint: 3, Refs: []*logproto.ShortRef{{Checksum: 3}}},
 			},
 			addresses: [][]string{
 				{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
@@ -73,21 +70,19 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
 			expected: []chunkRefsByAddrs{
 				{
 					addrs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
-					refs: []*logproto.ChunkRef{
-						{Fingerprint: 1, Checksum: 1},
-						{Fingerprint: 2, Checksum: 2},
-						{Fingerprint: 3, Checksum: 3},
+					refs: []*logproto.GroupedChunkRefs{
+						{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
+						{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
+						{Fingerprint: 3, Refs: []*logproto.ShortRef{{Checksum: 3}}},
 					},
-					streams: []uint64{1, 2, 3},
 				},
 			},
 		},
 		{
-			name:    "partially overlapping addresses are not grouped together",
-			streams: []uint64{1, 2},
-			chunks: [][]*logproto.ChunkRef{
-				{{Fingerprint: 1, Checksum: 1}},
-				{{Fingerprint: 2, Checksum: 2}},
+			name: "partially overlapping addresses are not grouped together",
+			chunks: []*logproto.GroupedChunkRefs{
+				{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
+				{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
 			},
 			addresses: [][]string{
 				{"10.0.0.1", "10.0.0.2"},
@@ -96,17 +91,15 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
 			expected: []chunkRefsByAddrs{
 				{
 					addrs: []string{"10.0.0.1", "10.0.0.2"},
-					refs: []*logproto.ChunkRef{
-						{Fingerprint: 1, Checksum: 1},
+					refs: []*logproto.GroupedChunkRefs{
+						{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
 					},
-					streams: []uint64{1},
 				},
 				{
 					addrs: []string{"10.0.0.2", "10.0.0.3"},
-					refs: []*logproto.ChunkRef{
-						{Fingerprint: 2, Checksum: 2},
+					refs: []*logproto.GroupedChunkRefs{
+						{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
 					},
-					streams: []uint64{2},
 				},
 			},
 		},
@@ -114,7 +107,7 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
 	for _, tc := range testCases {
 		tc := tc
 		t.Run(tc.name, func(t *testing.T) {
-			res := c.groupStreamsByAddr(tc.streams, tc.chunks, tc.addresses)
+			res := c.groupStreamsByAddr(tc.chunks, tc.addresses)
 			require.Equal(t, tc.expected, res)
 		})
 	}
diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go
index 6ae19aeeb72d9..ab7b9eb40500e 100644
--- a/pkg/bloomgateway/querier.go
+++ b/pkg/bloomgateway/querier.go
@@ -21,34 +21,22 @@ func NewBloomQuerier(c Client, logger log.Logger) *BloomQuerier {
 	return &BloomQuerier{c: c, logger: logger}
 }
 
+func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
+	return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
+}
+
 func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]*logproto.ChunkRef, error) {
 	// Shortcut that does not require any filtering
 	if len(chunkRefs) == 0 || len(filters) == 0 {
 		return chunkRefs, nil
 	}
+
 	// TODO(chaudum): Make buffer pool to reduce allocations.
 	// The indexes of the chunks slice correspond to the indexes of the fingerprint slice.
-	fingerprints := make([]uint64, 0, len(chunkRefs))
-	chunks := make([][]*logproto.ChunkRef, 0, len(chunkRefs))
-
-	// Sort the chunkRefs by their stream fingerprint
-	// so we can easily append them to the target slice by iterating over them.
-	sort.Slice(chunkRefs, func(i, j int) bool {
-		return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
-	})
-
-	for _, chunkRef := range chunkRefs {
-		idx := len(fingerprints) - 1
-		if idx == -1 || fingerprints[idx] < chunkRef.Fingerprint {
-			fingerprints = append(fingerprints, chunkRef.Fingerprint)
-			chunks = append(chunks, []*logproto.ChunkRef{chunkRef})
-			continue
-		}
-		chunks[idx] = append(chunks[idx], chunkRef)
-	}
+	grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs))
+	grouped = groupChunkRefs(chunkRefs, grouped)
 
-	// Drop series fingerprints, because they are not used (yet).
-	_, refs, err := bq.c.FilterChunks(ctx, tenant, from, through, fingerprints, chunks, filters...)
+	refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...)
 	if err != nil {
 		return nil, err
 	}
@@ -58,7 +46,37 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
 	// Flatten response from client and return
 	result := make([]*logproto.ChunkRef, 0, len(chunkRefs))
 	for i := range refs {
-		result = append(result, refs[i]...)
+		for _, ref := range refs[i].Refs {
+			result = append(result, &logproto.ChunkRef{
+				Fingerprint: refs[i].Fingerprint,
+				UserID:      tenant,
+				From:        ref.From,
+				Through:     ref.Through,
+				Checksum:    ref.Checksum,
+			})
+		}
 	}
 	return result, nil
 }
+
+func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
+	// Sort the chunkRefs by their stream fingerprint
+	// so we can easily append them to the target slice by iterating over them.
+	sort.Slice(chunkRefs, func(i, j int) bool {
+		return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
+	})
+
+	for _, chunkRef := range chunkRefs {
+		idx := len(grouped) - 1
+		if idx == -1 || grouped[idx].Fingerprint < chunkRef.Fingerprint {
+			grouped = append(grouped, &logproto.GroupedChunkRefs{
+				Fingerprint: chunkRef.Fingerprint,
+				Tenant:      chunkRef.UserID,
+				Refs:        []*logproto.ShortRef{convertToShortRef(chunkRef)},
+			})
+			continue
+		}
+		grouped[idx].Refs = append(grouped[idx].Refs, convertToShortRef(chunkRef))
+	}
+	return grouped
+}
diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go
index e1aaeb449ca71..1e40eb2994c04 100644
--- a/pkg/bloomgateway/querier_test.go
+++ b/pkg/bloomgateway/querier_test.go
@@ -19,9 +19,9 @@ type noopClient struct {
 }
 
 // FilterChunks implements Client.
-func (c *noopClient) FilterChunks(ctx context.Context, tenant string, from model.Time, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error) {
+func (c *noopClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
 	c.callCount++
-	return fingerprints, chunkRefs, c.err
+	return groups, c.err
 }
 
 func TestBloomQuerier(t *testing.T) {
diff --git a/pkg/logproto/bloomgateway.pb.go b/pkg/logproto/bloomgateway.pb.go
index 0739d1e556000..1c7fb9fc2da29 100644
--- a/pkg/logproto/bloomgateway.pb.go
+++ b/pkg/logproto/bloomgateway.pb.go
@@ -33,7 +33,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
 type FilterChunkRefRequest struct {
 	From    github_com_prometheus_common_model.Time `protobuf:"varint,1,opt,name=from,proto3,customtype=github.com/prometheus/common/model.Time" json:"from"`
 	Through github_com_prometheus_common_model.Time `protobuf:"varint,2,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"`
-	Refs    []*ChunkRef                             `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"`
+	Refs    []*GroupedChunkRefs                     `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"`
 	Filters []*LineFilterExpression                 `protobuf:"bytes,4,rep,name=filters,proto3" json:"filters,omitempty"`
 }
 
@@ -69,7 +69,7 @@ func (m *FilterChunkRefRequest) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_FilterChunkRefRequest proto.InternalMessageInfo
 
-func (m *FilterChunkRefRequest) GetRefs() []*ChunkRef {
+func (m *FilterChunkRefRequest) GetRefs() []*GroupedChunkRefs {
 	if m != nil {
 		return m.Refs
 	}
@@ -241,35 +241,35 @@ func init() { proto.RegisterFile("pkg/logproto/bloomgateway.proto", fileDescript
 
 var fileDescriptor_a50b5dd1dbcd1415 = []byte{
 	// 467 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0x3d, 0x6f, 0xd4, 0x40,
-	0x10, 0xf5, 0xe6, 0x4e, 0xc9, 0x65, 0x03, 0x02, 0xad, 0x20, 0xb2, 0x8c, 0xb4, 0x67, 0x59, 0x08,
-	0xae, 0x3a, 0x4b, 0xa1, 0x49, 0x7d, 0x11, 0x44, 0x48, 0x54, 0x0b, 0xa2, 0x48, 0xe7, 0x73, 0xc6,
-	0x1f, 0x3a, 0x7b, 0xc7, 0xec, 0xae, 0x05, 0x74, 0xb4, 0x74, 0xfc, 0x0c, 0x7e, 0x01, 0xbf, 0x21,
-	0xe5, 0x95, 0x11, 0x45, 0xc4, 0xf9, 0x1a, 0xca, 0xfc, 0x04, 0x94, 0x75, 0x7c, 0x77, 0x89, 0x48,
-	0x43, 0x45, 0xe5, 0xdd, 0x99, 0x37, 0x4f, 0xfb, 0xde, 0x1b, 0xd3, 0x61, 0x35, 0x4b, 0xc3, 0x02,
-	0xd3, 0x4a, 0xa1, 0xc1, 0x70, 0x5a, 0x20, 0x96, 0x69, 0x64, 0xe0, 0x63, 0xf4, 0x79, 0x6c, 0x4b,
-	0x6c, 0xd0, 0x35, 0xbd, 0x47, 0x29, 0xa6, 0xd8, 0xe2, 0xae, 0x4e, 0x6d, 0xdf, 0x7b, 0x72, 0x83,
-	0xa0, 0x3b, 0xb4, 0xcd, 0xe0, 0xeb, 0x16, 0x7d, 0xfc, 0x2a, 0x2f, 0x0c, 0xa8, 0xa3, 0xac, 0x96,
-	0x33, 0x01, 0x89, 0x80, 0x0f, 0x35, 0x68, 0xc3, 0x8e, 0x68, 0x3f, 0x51, 0x58, 0xba, 0xc4, 0x27,
-	0xa3, 0xde, 0x24, 0x3c, 0xbb, 0x18, 0x3a, 0x3f, 0x2f, 0x86, 0xcf, 0xd3, 0xdc, 0x64, 0xf5, 0x74,
-	0x1c, 0x63, 0x19, 0x56, 0x0a, 0x4b, 0x30, 0x19, 0xd4, 0x3a, 0x8c, 0xb1, 0x2c, 0x51, 0x86, 0x25,
-	0x9e, 0x42, 0x31, 0x7e, 0x97, 0x97, 0x20, 0xec, 0x30, 0x7b, 0x4d, 0x77, 0x4c, 0xa6, 0xb0, 0x4e,
-	0x33, 0x77, 0xeb, 0xdf, 0x78, 0xba, 0x79, 0xf6, 0x8c, 0xf6, 0x15, 0x24, 0xda, 0xed, 0xf9, 0xbd,
-	0xd1, 0xde, 0x01, 0x1b, 0xaf, 0x84, 0xac, 0x1e, 0x6e, 0xfb, 0xec, 0x90, 0xee, 0x24, 0x56, 0x90,
-	0x76, 0xfb, 0x16, 0xca, 0xd7, 0xd0, 0x37, 0xb9, 0x84, 0x56, 0xed, 0xcb, 0x4f, 0x95, 0x02, 0xad,
-	0x73, 0x94, 0xa2, 0x83, 0x07, 0x82, 0xee, 0xdf, 0xb6, 0x42, 0x57, 0x28, 0x35, 0xb0, 0x43, 0xba,
-	0x1b, 0x5f, 0xd7, 0xb4, 0x4b, 0x2c, 0xab, 0xb7, 0x66, 0x3d, 0x56, 0x58, 0x57, 0x70, 0xda, 0x4d,
-	0x69, 0xb1, 0x06, 0x07, 0x3f, 0x08, 0x1d, 0xbc, 0xcd, 0x50, 0x19, 0x01, 0xc9, 0x7f, 0x67, 0xa9,
-	0x47, 0x07, 0x71, 0x06, 0xf1, 0x4c, 0xd7, 0xa5, 0xdb, 0xf3, 0xc9, 0xe8, 0xbe, 0x58, 0xdd, 0x03,
-	0x43, 0x1f, 0xde, 0xd6, 0xc5, 0x7c, 0xba, 0x97, 0xe4, 0x32, 0x05, 0x55, 0xa9, 0x5c, 0x1a, 0x2b,
-	0xa3, 0x2f, 0x36, 0x4b, 0x6c, 0x9f, 0x6e, 0x1b, 0x90, 0x91, 0x34, 0xf6, 0x6d, 0xbb, 0xe2, 0xfa,
-	0x76, 0x77, 0x78, 0x9d, 0x37, 0x6d, 0x78, 0x07, 0x09, 0xbd, 0x37, 0xb9, 0xda, 0xf0, 0xe3, 0x76,
-	0xc3, 0xd9, 0x7b, 0xfa, 0xe0, 0x66, 0x24, 0x9a, 0x0d, 0xd7, 0xc3, 0x7f, 0x5d, 0x5c, 0xcf, 0xbf,
-	0x1b, 0xd0, 0xc6, 0x19, 0x38, 0x93, 0x93, 0xf9, 0x82, 0x3b, 0xe7, 0x0b, 0xee, 0x5c, 0x2e, 0x38,
-	0xf9, 0xd2, 0x70, 0xf2, 0xbd, 0xe1, 0xe4, 0xac, 0xe1, 0x64, 0xde, 0x70, 0xf2, 0xab, 0xe1, 0xe4,
-	0x77, 0xc3, 0x9d, 0xcb, 0x86, 0x93, 0x6f, 0x4b, 0xee, 0xcc, 0x97, 0xdc, 0x39, 0x5f, 0x72, 0xe7,
-	0xe4, 0xe9, 0x86, 0xc3, 0xa9, 0x8a, 0x92, 0x48, 0x46, 0x61, 0x81, 0xb3, 0x3c, 0xdc, 0xfc, 0xc3,
-	0xa6, 0xdb, 0xf6, 0xf3, 0xe2, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe6, 0xf6, 0xab, 0x52, 0xb9,
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0xcb, 0x6e, 0xd3, 0x40,
+	0x14, 0xf5, 0x34, 0x51, 0x9b, 0x4e, 0x41, 0xa0, 0x11, 0x54, 0x56, 0x90, 0x26, 0x96, 0x85, 0x20,
+	0x2b, 0x5b, 0x2a, 0x9b, 0xae, 0x53, 0x41, 0x85, 0xc4, 0x6a, 0x40, 0x2c, 0xba, 0x73, 0xdc, 0xeb,
+	0x87, 0xe2, 0x99, 0x6b, 0x66, 0xc6, 0x02, 0x76, 0x7c, 0x02, 0xfc, 0x05, 0x5f, 0xc0, 0x37, 0x74,
+	0x99, 0x65, 0xc5, 0xa2, 0x22, 0xce, 0x86, 0x65, 0x3f, 0x01, 0xd5, 0xc6, 0x4d, 0x52, 0x81, 0x90,
+	0x58, 0x75, 0xe5, 0x99, 0xb9, 0xe7, 0x8c, 0xef, 0x39, 0xe7, 0x0e, 0x1d, 0x95, 0xb3, 0x34, 0x2c,
+	0x30, 0x2d, 0x35, 0x5a, 0x0c, 0xa7, 0x05, 0xa2, 0x4c, 0x23, 0x0b, 0xef, 0xa3, 0x8f, 0x41, 0x73,
+	0xc4, 0x06, 0x5d, 0x71, 0xf8, 0x20, 0xc5, 0x14, 0x5b, 0xdc, 0xd5, 0xaa, 0xad, 0x0f, 0x1f, 0x6d,
+	0x5c, 0xd0, 0x2d, 0xda, 0xa2, 0xff, 0x65, 0x8b, 0x3e, 0x7c, 0x91, 0x17, 0x16, 0xf4, 0x51, 0x56,
+	0xa9, 0x99, 0x80, 0x44, 0xc0, 0xbb, 0x0a, 0x8c, 0x65, 0x47, 0xb4, 0x9f, 0x68, 0x94, 0x2e, 0xf1,
+	0xc8, 0xb8, 0x37, 0x09, 0xcf, 0x2e, 0x46, 0xce, 0xf7, 0x8b, 0xd1, 0xd3, 0x34, 0xb7, 0x59, 0x35,
+	0x0d, 0x62, 0x94, 0x61, 0xa9, 0x51, 0x82, 0xcd, 0xa0, 0x32, 0x61, 0x8c, 0x52, 0xa2, 0x0a, 0x25,
+	0x9e, 0x42, 0x11, 0xbc, 0xc9, 0x25, 0x88, 0x86, 0xcc, 0x5e, 0xd2, 0x1d, 0x9b, 0x69, 0xac, 0xd2,
+	0xcc, 0xdd, 0xfa, 0xbf, 0x7b, 0x3a, 0x3e, 0x0b, 0x68, 0x5f, 0x43, 0x62, 0xdc, 0x9e, 0xd7, 0x1b,
+	0xef, 0x1d, 0x0c, 0x83, 0x6b, 0x21, 0xc7, 0x1a, 0xab, 0x12, 0x4e, 0xbb, 0xfe, 0x8d, 0x68, 0x70,
+	0xec, 0x90, 0xee, 0x24, 0x8d, 0x30, 0xe3, 0xf6, 0x1b, 0x0a, 0x5f, 0x51, 0x5e, 0xe5, 0x0a, 0x5a,
+	0xd5, 0xcf, 0x3f, 0x94, 0x1a, 0x8c, 0xc9, 0x51, 0x89, 0x0e, 0xee, 0x0b, 0xba, 0x7f, 0xd3, 0x12,
+	0x53, 0xa2, 0x32, 0xc0, 0x0e, 0xe9, 0x6e, 0xdc, 0xfd, 0xc6, 0x25, 0xff, 0x6c, 0x64, 0x05, 0xf6,
+	0xbf, 0x11, 0x3a, 0x78, 0x9d, 0xa1, 0xb6, 0x02, 0x92, 0x5b, 0x67, 0xed, 0x90, 0x0e, 0xe2, 0x0c,
+	0xe2, 0x99, 0xa9, 0xa4, 0xdb, 0xf3, 0xc8, 0xf8, 0xae, 0xb8, 0xde, 0xfb, 0x96, 0xde, 0xbf, 0xa9,
+	0x8b, 0x79, 0x74, 0x2f, 0xc9, 0x55, 0x0a, 0xba, 0xd4, 0xb9, 0xb2, 0x8d, 0x8c, 0xbe, 0x58, 0x3f,
+	0x62, 0xfb, 0x74, 0xdb, 0x82, 0x8a, 0x94, 0x6d, 0x7a, 0xdb, 0x15, 0xbf, 0x77, 0xec, 0xc9, 0x46,
+	0x88, 0x6c, 0xe5, 0x5d, 0xe7, 0x4d, 0x1b, 0xde, 0x41, 0x42, 0xef, 0x4c, 0xae, 0x26, 0xfd, 0xb8,
+	0x9d, 0x74, 0xf6, 0x96, 0xde, 0xdb, 0x8c, 0xc4, 0xb0, 0xd1, 0x8a, 0xfc, 0xc7, 0x01, 0x1e, 0x7a,
+	0x7f, 0x07, 0xb4, 0x71, 0xfa, 0xce, 0xe4, 0x64, 0xbe, 0xe0, 0xce, 0xf9, 0x82, 0x3b, 0x97, 0x0b,
+	0x4e, 0x3e, 0xd5, 0x9c, 0x7c, 0xad, 0x39, 0x39, 0xab, 0x39, 0x99, 0xd7, 0x9c, 0xfc, 0xa8, 0x39,
+	0xf9, 0x59, 0x73, 0xe7, 0xb2, 0xe6, 0xe4, 0xf3, 0x92, 0x3b, 0xf3, 0x25, 0x77, 0xce, 0x97, 0xdc,
+	0x39, 0x79, 0xbc, 0xe6, 0x70, 0xaa, 0xa3, 0x24, 0x52, 0x51, 0x58, 0xe0, 0x2c, 0x0f, 0xd7, 0x5f,
+	0xda, 0x74, 0xbb, 0xf9, 0x3c, 0xfb, 0x15, 0x00, 0x00, 0xff, 0xff, 0x55, 0xf6, 0x2b, 0x1c, 0xc1,
 	0x03, 0x00, 0x00,
 }
 
@@ -842,9 +842,9 @@ func (this *FilterChunkRefRequest) String() string {
 	if this == nil {
 		return "nil"
 	}
-	repeatedStringForRefs := "[]*ChunkRef{"
+	repeatedStringForRefs := "[]*GroupedChunkRefs{"
 	for _, f := range this.Refs {
-		repeatedStringForRefs += strings.Replace(fmt.Sprintf("%v", f), "ChunkRef", "ChunkRef", 1) + ","
+		repeatedStringForRefs += strings.Replace(f.String(), "GroupedChunkRefs", "GroupedChunkRefs", 1) + ","
 	}
 	repeatedStringForRefs += "}"
 	repeatedStringForFilters := "[]*LineFilterExpression{"
@@ -1009,7 +1009,7 @@ func (m *FilterChunkRefRequest) Unmarshal(dAtA []byte) error {
 			if postIndex > l {
 				return io.ErrUnexpectedEOF
 			}
-			m.Refs = append(m.Refs, &ChunkRef{})
+			m.Refs = append(m.Refs, &GroupedChunkRefs{})
 			if err := m.Refs[len(m.Refs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
 				return err
 			}
diff --git a/pkg/logproto/bloomgateway.proto b/pkg/logproto/bloomgateway.proto
index 8dd79e15a46c9..71e762cc61459 100644
--- a/pkg/logproto/bloomgateway.proto
+++ b/pkg/logproto/bloomgateway.proto
@@ -16,7 +16,7 @@ message FilterChunkRefRequest {
     (gogoproto.customtype) = "github.com/prometheus/common/model.Time",
     (gogoproto.nullable) = false
   ];
-  repeated logproto.ChunkRef refs = 3;
+  repeated GroupedChunkRefs refs = 3;
   repeated logproto.LineFilterExpression filters = 4;
 }
 
diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go
index 9d1679d6ca3b6..2df1f41cd4a25 100644
--- a/pkg/storage/stores/shipper/bloomshipper/shipper.go
+++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go
@@ -40,7 +40,7 @@ func (s *Shipper) ForEachBlock(
 	fingerprints []uint64,
 	callback ForEachBlockCallback) error {
 
-	level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", fingerprints)
+	level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", len(fingerprints))
 
 	blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from.UnixNano(), through.UnixNano(), fingerprints)
 	if err != nil {
diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go
index e174fbd1696de..80f2c352d5326 100644
--- a/pkg/storage/stores/shipper/bloomshipper/store.go
+++ b/pkg/storage/stores/shipper/bloomshipper/store.go
@@ -4,6 +4,8 @@ import (
 	"context"
 	"time"
 
+	"github.com/prometheus/common/model"
+
 	"github.com/grafana/loki/pkg/logproto"
 	v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
 )
@@ -20,7 +22,7 @@ type Interface interface {
 }
 
 type Store interface {
-	FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]*logproto.ChunkRef, error)
+	FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error)
 	Stop()
 }
 
@@ -38,33 +40,84 @@ func (bs *BloomStore) Stop() {
 	bs.shipper.Stop()
 }
 
-func (bs *BloomStore) FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]*logproto.ChunkRef, error) {
+func (bs *BloomStore) FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
 	fingerprints := make([]uint64, 0, len(chunkRefs))
 	for _, ref := range chunkRefs {
 		fingerprints = append(fingerprints, ref.Fingerprint)
 	}
-	blooms, err := bs.blooms(ctx, tenant, from, through, fingerprints)
+
+	blooms, err := bs.queriers(ctx, tenant, from, through, fingerprints)
 	if err != nil {
 		return nil, err
 	}
-	return blooms.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, filters...)
+
+	searches := convertLineFilterExpressions(filters)
+
+	for _, ref := range chunkRefs {
+		refs, err := blooms.Filter(ctx, model.Fingerprint(ref.Fingerprint), convertToChunkRefs(ref.Refs), searches)
+		if err != nil {
+			return nil, err
+		}
+		ref.Refs = convertToShortRefs(refs)
+	}
+	return chunkRefs, nil
 }
 
-func (bs *BloomStore) blooms(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) (*bloomFilters, error) {
-	bf := &bloomFilters{}
+func (bs *BloomStore) queriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) (*bloomQueriers, error) {
+	bf := newBloomFilters(1024)
 	err := bs.shipper.ForEachBlock(ctx, tenant, from, through, fingerprints, func(bq *v1.BlockQuerier) error {
+		bf.queriers = append(bf.queriers, bq)
 		return nil
 	})
 	return bf, err
 }
 
-type bloomFilters struct {
+func convertLineFilterExpressions(filters []*logproto.LineFilterExpression) [][]byte {
+	searches := make([][]byte, len(filters))
+	for _, f := range filters {
+		searches = append(searches, []byte(f.Match))
+	}
+	return searches
 }
 
-func newBloomFilters(size int) *bloomFilters {
-	return &bloomFilters{}
+// convertToShortRefs converts a v1.ChunkRefs into []*logproto.ShortRef
+// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request.
+func convertToShortRefs(refs v1.ChunkRefs) []*logproto.ShortRef {
+	result := make([]*logproto.ShortRef, len(refs))
+	for _, ref := range refs {
+		result = append(result, &logproto.ShortRef{From: ref.Start, Through: ref.End, Checksum: ref.Checksum})
+	}
+	return result
 }
 
-func (bf *bloomFilters) FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]*logproto.ChunkRef, error) {
-	return nil, nil
+// convertToChunkRefs converts a []*logproto.ShortRef into v1.ChunkRefs
+// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request.
+func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
+	result := make(v1.ChunkRefs, len(refs))
+	for _, ref := range refs {
+		result = append(result, v1.ChunkRef{Start: ref.From, End: ref.Through, Checksum: ref.Checksum})
+	}
+	return result
+}
+
+type bloomQueriers struct {
+	queriers []*v1.BlockQuerier
+}
+
+func newBloomFilters(size int) *bloomQueriers {
+	return &bloomQueriers{
+		queriers: make([]*v1.BlockQuerier, size),
+	}
+}
+
+func (bf *bloomQueriers) Filter(_ context.Context, fp model.Fingerprint, chunkRefs v1.ChunkRefs, filters [][]byte) (v1.ChunkRefs, error) {
+	result := make(v1.ChunkRefs, len(chunkRefs))
+	for _, bq := range bf.queriers {
+		refs, err := bq.CheckChunksForSeries(fp, chunkRefs, filters)
+		if err != nil {
+			return nil, err
+		}
+		result = append(result, refs...)
+	}
+	return result, nil
 }