Skip to content

Commit

Permalink
Adjust bloom gateway request structure (grafana#10926)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

Since we already group chunkRefs by fingerprint on the client side to
determine the correct bloom gateway to send the request to, we can keep
the grouping for the gRPC request.
This will not only optimize the request size, but also helps to reduce
complexity of querying blooms on the bloom gateway itself.

There is still conversion between `[]*logproto.ShortRef` and
`v1.ChunkRefs` required, which could be removed by sending
`v1.ChunkRefs` over the wire.

**Special notes for your reviewer**:

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Oct 18, 2023
1 parent 7a69ed2 commit d84026c
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 164 deletions.
23 changes: 3 additions & 20 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}

for _, ref := range req.Refs {
if ref.UserID != tenantID {
return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.UserID)
if ref.Tenant != tenantID {
return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.Tenant)
}
}

Expand All @@ -141,22 +141,5 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
}

// TODO(chaudum): Re-use buffers for response.
resp := make([]*logproto.GroupedChunkRefs, 0)
for idx, chunkRef := range chunkRefs {
fp := chunkRef.Fingerprint
shortRef := &logproto.ShortRef{From: chunkRef.From, Through: chunkRef.Through, Checksum: chunkRef.Checksum}
if idx == 0 || fp > resp[len(resp)-1].Fingerprint {
r := &logproto.GroupedChunkRefs{
Fingerprint: fp,
Tenant: tenantID,
Refs: []*logproto.ShortRef{shortRef},
}
resp = append(resp, r)
continue
}
resp[len(resp)-1].Refs = append(resp[len(resp)-1].Refs, shortRef)
}

return &logproto.FilterChunkRefResponse{ChunkRefs: resp}, nil
return &logproto.FilterChunkRefResponse{ChunkRefs: chunkRefs}, nil
}
10 changes: 8 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func parseDayTime(s string) config.DayTime {
}
}

func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs))
return groupChunkRefs(chunkRefs, grouped)
}

func TestBloomGateway_StartStopService(t *testing.T) {

ss := NewNoopStrategy()
Expand Down Expand Up @@ -145,7 +151,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: chunkRefs,
Refs: groupRefs(t, chunkRefs),
}

ctx := user.InjectOrgID(context.Background(), tenantID)
Expand Down Expand Up @@ -181,7 +187,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: chunkRefs,
Refs: groupRefs(t, chunkRefs),
}

ctx := user.InjectOrgID(context.Background(), tenantID)
Expand Down
66 changes: 21 additions & 45 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}

type Client interface {
FilterChunks(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error)
FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error)
}

type GatewayClient struct {
Expand Down Expand Up @@ -132,20 +132,19 @@ func shuffleAddrs(addrs []string) []string {
}

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error) {
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
// Get the addresses of corresponding bloom gateways for each series.
_, addrs, err := c.serverAddrsForFingerprints(tenant, fingerprints)
fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups)
if err != nil {
return nil, nil, err
return nil, err
}

// Group chunk refs by addresses of one or more bloom gateways.
// All chunk refs of series that belong to one and the same bloom gateway are set in one batch.
streamsByAddr := c.groupStreamsByAddr(fingerprints, chunkRefs, addrs)
streamsByAddr := c.groupStreamsByAddr(groups, addrs)

// TODO(chaudum): We might over-allocate for the filtered responses here?
filteredChunkRefs := make([][]*logproto.ChunkRef, 0, len(fingerprints))
filteredFingerprints := make([]uint64, 0, len(fingerprints))
filteredChunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(fingerprints))

for _, item := range streamsByAddr {
// randomize order of addresses so we don't hotspot the first server in the list
Expand All @@ -161,29 +160,14 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
if err != nil {
return err
}
for _, refGroup := range resp.ChunkRefs {
chunkRefs := make([]*logproto.ChunkRef, 0, len(refGroup.Refs))
for _, shortRef := range refGroup.Refs {
chunkRefs = append(chunkRefs,
&logproto.ChunkRef{
Fingerprint: refGroup.Fingerprint,
UserID: refGroup.Tenant,
From: shortRef.From,
Through: shortRef.Through,
Checksum: shortRef.Checksum,
},
)
}
filteredFingerprints = append(filteredFingerprints, refGroup.Fingerprint)
filteredChunkRefs = append(filteredChunkRefs, chunkRefs)
}
filteredChunkRefs = append(filteredChunkRefs, resp.ChunkRefs...)
return nil
})
if err != nil {
return nil, nil, err
return nil, err
}
}
return fingerprints, filteredChunkRefs, nil
return filteredChunkRefs, nil
}

// isEqualStringElements checks if two string slices contain the same elements.
Expand Down Expand Up @@ -215,29 +199,19 @@ func listContainsAddrs(list []chunkRefsByAddrs, addrs []string) (int, bool) {
}

type chunkRefsByAddrs struct {
addrs []string
refs []*logproto.ChunkRef
streams []uint64
addrs []string
refs []*logproto.GroupedChunkRefs
}

// groupStreamsByAddr takes a slice of stream fingerprints, a slices of chunkRef slices, and a slice of address slices
// and groups them into a slice of chunkRefsByAddrs.
// streams is a slice of uint64 stream fingerprints
// chunks is a slice of chunk ref slices
// addresses is a slice of string slices containing server addresses
// It is necessary that len(streams) == len(chunks) == len(addresses), but the
// function implementation does not validate the precondition and would fail silently.
func (c *GatewayClient) groupStreamsByAddr(streams []uint64, chunks [][]*logproto.ChunkRef, addresses [][]string) []chunkRefsByAddrs {
func (c *GatewayClient) groupStreamsByAddr(groups []*logproto.GroupedChunkRefs, addresses [][]string) []chunkRefsByAddrs {
res := make([]chunkRefsByAddrs, 0, len(addresses))
for i := 0; i < len(addresses); i++ {
addrs := addresses[i]
refs := chunks[i]
fp := streams[i]
refs := groups[i]
if idx, ok := listContainsAddrs(res, addrs); ok {
res[idx].refs = append(res[idx].refs, refs...)
res[idx].streams = append(res[idx].streams, fp)
res[idx].refs = append(res[idx].refs, refs)
} else {
res = append(res, chunkRefsByAddrs{addrs: addrs, refs: refs, streams: []uint64{fp}})
res = append(res, chunkRefsByAddrs{addrs: addrs, refs: []*logproto.GroupedChunkRefs{refs}})
}
}
return res
Expand Down Expand Up @@ -272,7 +246,7 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
// Returns an error in case the bloom gateway ring could not get the
// corresponding replica set for a given fingerprint.
// Warning: This function becomes inefficient when the number of fingerprints is very large.
func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints []uint64) ([]uint64, [][]string, error) {
func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, groups []*logproto.GroupedChunkRefs) ([]uint64, [][]string, error) {
subRing := GetShuffleShardingSubring(c.ring, tenantID, c.limits)

rs, err := subRing.GetAllHealthy(BlocksRead)
Expand All @@ -285,7 +259,7 @@ func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints
numTokens += len(instanceDesc.Tokens)
}

numFingerprints := len(fingerprints)
numFingerprints := len(groups)
if numFingerprints > int(float64(numTokens)*math.Log2(float64(numFingerprints))) {
// TODO(chaudum): Implement algorithm in O(n * m * log(k) + n) instead of O(k) by iterating over ring tokens
// and finding corresponding fingerprint ranges using binary search.
Expand All @@ -295,14 +269,16 @@ func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints
level.Warn(c.logger).Log("msg", "using an inefficient algorithm to determin server addresses for fingerprints", "fingerprints", numFingerprints, "tokens", numTokens)
}

fingerprints := make([]uint64, numFingerprints)
addresses := make([][]string, numFingerprints)
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

for idx, key := range fingerprints {
rs, err = subRing.Get(uint32(key), BlocksRead, bufDescs, bufHosts, bufZones)
for idx, key := range groups {
rs, err = subRing.Get(uint32(key.Fingerprint), BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, nil, errors.Wrap(err, "bloom gateway get ring")
}
fingerprints[idx] = key.Fingerprint
addresses[idx] = rs.GetAddresses()
}

Expand Down
47 changes: 20 additions & 27 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,22 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {

testCases := []struct {
name string
streams []uint64
chunks [][]*logproto.ChunkRef
chunks []*logproto.GroupedChunkRefs
addresses [][]string
expected []chunkRefsByAddrs
}{
{
name: "empty input yields empty result",
streams: []uint64{},
chunks: [][]*logproto.ChunkRef{},
chunks: []*logproto.GroupedChunkRefs{},
addresses: [][]string{},
expected: []chunkRefsByAddrs{},
},
{
name: "addresses with same elements are grouped into single item",
streams: []uint64{1, 2, 3},
chunks: [][]*logproto.ChunkRef{
{{Fingerprint: 1, Checksum: 1}},
{{Fingerprint: 2, Checksum: 2}},
{{Fingerprint: 3, Checksum: 3}},
name: "addresses with same elements are grouped into single item",
chunks: []*logproto.GroupedChunkRefs{
{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
{Fingerprint: 3, Refs: []*logproto.ShortRef{{Checksum: 3}}},
},
addresses: [][]string{
{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
Expand All @@ -73,21 +70,19 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
expected: []chunkRefsByAddrs{
{
addrs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
refs: []*logproto.ChunkRef{
{Fingerprint: 1, Checksum: 1},
{Fingerprint: 2, Checksum: 2},
{Fingerprint: 3, Checksum: 3},
refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
{Fingerprint: 3, Refs: []*logproto.ShortRef{{Checksum: 3}}},
},
streams: []uint64{1, 2, 3},
},
},
},
{
name: "partially overlapping addresses are not grouped together",
streams: []uint64{1, 2},
chunks: [][]*logproto.ChunkRef{
{{Fingerprint: 1, Checksum: 1}},
{{Fingerprint: 2, Checksum: 2}},
name: "partially overlapping addresses are not grouped together",
chunks: []*logproto.GroupedChunkRefs{
{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
},
addresses: [][]string{
{"10.0.0.1", "10.0.0.2"},
Expand All @@ -96,25 +91,23 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
expected: []chunkRefsByAddrs{
{
addrs: []string{"10.0.0.1", "10.0.0.2"},
refs: []*logproto.ChunkRef{
{Fingerprint: 1, Checksum: 1},
refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 1, Refs: []*logproto.ShortRef{{Checksum: 1}}},
},
streams: []uint64{1},
},
{
addrs: []string{"10.0.0.2", "10.0.0.3"},
refs: []*logproto.ChunkRef{
{Fingerprint: 2, Checksum: 2},
refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 2, Refs: []*logproto.ShortRef{{Checksum: 2}}},
},
streams: []uint64{2},
},
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := c.groupStreamsByAddr(tc.streams, tc.chunks, tc.addresses)
res := c.groupStreamsByAddr(tc.chunks, tc.addresses)
require.Equal(t, tc.expected, res)
})
}
Expand Down
60 changes: 39 additions & 21 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,22 @@ func NewBloomQuerier(c Client, logger log.Logger) *BloomQuerier {
return &BloomQuerier{c: c, logger: logger}
}

func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
}

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]*logproto.ChunkRef, error) {
// Shortcut that does not require any filtering
if len(chunkRefs) == 0 || len(filters) == 0 {
return chunkRefs, nil
}

// TODO(chaudum): Make buffer pool to reduce allocations.
// The indexes of the chunks slice correspond to the indexes of the fingerprint slice.
fingerprints := make([]uint64, 0, len(chunkRefs))
chunks := make([][]*logproto.ChunkRef, 0, len(chunkRefs))

// Sort the chunkRefs by their stream fingerprint
// so we can easily append them to the target slice by iterating over them.
sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})

for _, chunkRef := range chunkRefs {
idx := len(fingerprints) - 1
if idx == -1 || fingerprints[idx] < chunkRef.Fingerprint {
fingerprints = append(fingerprints, chunkRef.Fingerprint)
chunks = append(chunks, []*logproto.ChunkRef{chunkRef})
continue
}
chunks[idx] = append(chunks[idx], chunkRef)
}
grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs))
grouped = groupChunkRefs(chunkRefs, grouped)

// Drop series fingerprints, because they are not used (yet).
_, refs, err := bq.c.FilterChunks(ctx, tenant, from, through, fingerprints, chunks, filters...)
refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +46,37 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// Flatten response from client and return
result := make([]*logproto.ChunkRef, 0, len(chunkRefs))
for i := range refs {
result = append(result, refs[i]...)
for _, ref := range refs[i].Refs {
result = append(result, &logproto.ChunkRef{
Fingerprint: refs[i].Fingerprint,
UserID: tenant,
From: ref.From,
Through: ref.Through,
Checksum: ref.Checksum,
})
}
}
return result, nil
}

func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
// Sort the chunkRefs by their stream fingerprint
// so we can easily append them to the target slice by iterating over them.
sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})

for _, chunkRef := range chunkRefs {
idx := len(grouped) - 1
if idx == -1 || grouped[idx].Fingerprint < chunkRef.Fingerprint {
grouped = append(grouped, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
})
continue
}
grouped[idx].Refs = append(grouped[idx].Refs, convertToShortRef(chunkRef))
}
return grouped
}
4 changes: 2 additions & 2 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type noopClient struct {
}

// FilterChunks implements Client.
func (c *noopClient) FilterChunks(ctx context.Context, tenant string, from model.Time, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error) {
func (c *noopClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
c.callCount++
return fingerprints, chunkRefs, c.err
return groups, c.err
}

func TestBloomQuerier(t *testing.T) {
Expand Down
Loading

0 comments on commit d84026c

Please sign in to comment.