Skip to content

Commit

Permalink
Map ring token keyspace (uint32) into fingerprint keyspace (uint64) (#…
Browse files Browse the repository at this point in the history
…11975)

In order to compare the keyspace from the bloom gateway's ring with the fingerprints, they need to use the same keyspace.

The ring, however, uses tokens within a `uint32` keyspace, whereas the fingerprints use a `uint64` keyspace.
Therefore the ranges from the ring tokens need to be mapped into the `uint64` keyspace. This is done bit-shifting the Min value 32 bits to the left and the Max value 32 bits to the left plus adding `(1<<32)-1` to "fill" the remaining values up to the next Min value.

The structs used to combined ring instance information and token/fingerprint ranges are yet to be simplified. However, this is not goal of this PR.

Signed-off-by: Christian Haudum <[email protected]>
Co-authored-by: Owen Diehl <[email protected]>
  • Loading branch information
chaudum and owen-d authored Feb 20, 2024
1 parent 6204886 commit 71fa802
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 160 deletions.
72 changes: 42 additions & 30 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,71 +291,83 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
return err
}

func groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, servers []addrsWithTokenRange) []instanceWithFingerprints {
func groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, servers []addrsWithBounds) []instanceWithFingerprints {
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints)
}

func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) {
func mapTokenRangeToFingerprintRange(r bloomutils.Range[uint32]) v1.FingerprintBounds {
minFp := uint64(r.Min) << 32
maxFp := uint64(r.Max) << 32
return v1.NewBounds(
model.Fingerprint(minFp),
model.Fingerprint(maxFp|math.MaxUint32),
)
}

func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithBounds, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

servers := make([]addrsWithTokenRange, 0, len(instances))
servers := make([]addrsWithBounds, 0, len(instances))
it := bloomutils.NewInstanceSortMergeIterator(instances)

for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().TokenRange.Max, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
tokenRange: it.At().TokenRange,

bounds := mapTokenRangeToFingerprintRange(it.At().TokenRange)
servers = append(servers, addrsWithBounds{
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
FingerprintBounds: bounds,
})
}

if len(servers) > 0 && servers[len(servers)-1].tokenRange.Max < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
id: servers[0].id,
addrs: servers[0].addrs,
tokenRange: bloomutils.NewTokenRange(servers[len(servers)-1].tokenRange.Max+1, math.MaxUint32),
if len(servers) > 0 && servers[len(servers)-1].Max < math.MaxUint64 {
// append the instance for the range between the maxFp and MaxUint64
// TODO(owen-d): support wrapping around keyspace for token ranges
servers = append(servers, addrsWithBounds{
id: servers[0].id,
addrs: servers[0].addrs,
FingerprintBounds: v1.NewBounds(
servers[len(servers)-1].Max+1,
model.Fingerprint(math.MaxUint64),
),
})
}
return servers, nil
}

type addrsWithTokenRange struct {
id string
addrs []string
tokenRange bloomutils.Range[uint32]
}

func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck {
return s.tokenRange.Cmp(token)
type addrsWithBounds struct {
v1.FingerprintBounds
id string
addrs []string
}

type instanceWithFingerprints struct {
instance addrsWithTokenRange
instance addrsWithBounds
fingerprints []*logproto.GroupedChunkRefs
}

func partitionFingerprintsByAddresses(fingerprints []*logproto.GroupedChunkRefs, addresses []addrsWithTokenRange) (result []instanceWithFingerprints) {
func partitionFingerprintsByAddresses(fingerprints []*logproto.GroupedChunkRefs, addresses []addrsWithBounds) (result []instanceWithFingerprints) {
for _, instance := range addresses {
min, _ := slices.BinarySearchFunc(fingerprints, instance.tokenRange, func(g *logproto.GroupedChunkRefs, r bloomutils.Range[uint32]) int {
if uint32(g.Fingerprint) < r.Min {
min, _ := slices.BinarySearchFunc(fingerprints, instance.FingerprintBounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int {
if g.Fingerprint < uint64(b.Min) {
return -1
} else if uint32(g.Fingerprint) > r.Min {
} else if g.Fingerprint > uint64(b.Min) {
return 1
}
return 0
})

max, _ := slices.BinarySearchFunc(fingerprints, instance.tokenRange, func(g *logproto.GroupedChunkRefs, r bloomutils.Range[uint32]) int {
if uint32(g.Fingerprint) <= r.Max {
max, _ := slices.BinarySearchFunc(fingerprints, instance.FingerprintBounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int {
if g.Fingerprint <= uint64(b.Max) {
return -1
} else if uint32(g.Fingerprint) > r.Max {
} else if g.Fingerprint > uint64(b.Max) {
return 1
}
return 0
Expand Down Expand Up @@ -398,7 +410,7 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW

pos[cur.instance.id] = len(result)
result = append(result, instanceWithFingerprints{
instance: addrsWithTokenRange{
instance: addrsWithBounds{
id: cur.instance.id,
addrs: cur.instance.addrs,
},
Expand Down
Loading

0 comments on commit 71fa802

Please sign in to comment.