Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(chore) Bloom gateway: Improve fingerprint partitioning in client #11971

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"math"
"math/rand"
"sort"
"sync"

"github.com/go-kit/log"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

Expand Down Expand Up @@ -326,11 +326,6 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst
return servers, nil
}

type instanceWithToken struct {
instance ring.InstanceDesc
token uint32
}

type addrsWithTokenRange struct {
id string
addrs []string
Expand All @@ -348,13 +343,22 @@ type instanceWithFingerprints struct {

func partitionFingerprintsByAddresses(fingerprints []*logproto.GroupedChunkRefs, addresses []addrsWithTokenRange) (result []instanceWithFingerprints) {
for _, instance := range addresses {

min := sort.Search(len(fingerprints), func(i int) bool {
return instance.cmp(uint32(fingerprints[i].Fingerprint)) > v1.Before
min, _ := slices.BinarySearchFunc(fingerprints, instance.tokenRange, func(g *logproto.GroupedChunkRefs, r bloomutils.Range[uint32]) int {
if uint32(g.Fingerprint) < r.Min {
return -1
} else if uint32(g.Fingerprint) > r.Min {
return 1
}
return 0
})

max := sort.Search(len(fingerprints), func(i int) bool {
return instance.cmp(uint32(fingerprints[i].Fingerprint)) == v1.After
max, _ := slices.BinarySearchFunc(fingerprints, instance.tokenRange, func(g *logproto.GroupedChunkRefs, r bloomutils.Range[uint32]) int {
if uint32(g.Fingerprint) <= r.Max {
return -1
} else if uint32(g.Fingerprint) > r.Max {
return 1
}
return 0
})

// fingerprint is out of boundaries
Expand Down
28 changes: 28 additions & 0 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"fmt"
"math"
"sort"
"testing"
Expand Down Expand Up @@ -165,6 +166,33 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
})
}

func BenchmarkPartitionFingerprintsByAddresses(b *testing.B) {
numFp := 100000
fpStep := math.MaxUint64 / uint64(numFp)

groups := make([]*logproto.GroupedChunkRefs, 0, numFp)
for i := uint64(0); i < math.MaxUint64-fpStep; i += fpStep {
groups = append(groups, &logproto.GroupedChunkRefs{Fingerprint: i})
}

numServers := 100
tokenStep := math.MaxUint32 / uint32(numServers)
servers := make([]addrsWithTokenRange, 0, numServers)
for i := uint32(0); i < math.MaxUint32-tokenStep; i += tokenStep {
servers = append(servers, addrsWithTokenRange{
id: fmt.Sprintf("instance-%x", i),
addrs: []string{fmt.Sprintf("%d", i)},
tokenRange: newTr(i, i+tokenStep),
})
}

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = partitionFingerprintsByAddresses(groups, servers)
}
}

func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
testCases := map[string]struct {
instances []ring.InstanceDesc
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
Uint64Range = Range[uint64]{Min: 0, Max: math.MaxUint64}
)

type Range[T constraints.Integer] struct {
type Range[T constraints.Unsigned] struct {
Min, Max T
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool {
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func KeyRangeForInstance[T constraints.Integer](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {
func KeyRangeForInstance[T constraints.Unsigned](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {

// Sort instances -- they may not be sorted
// because they're usually accessed by looking up the tokens (which are sorted)
Expand Down
Loading