Skip to content

Commit

Permalink
Replace min/max token with TokenRange in bloom ring utilities (graf…
Browse files Browse the repository at this point in the history
…ana#11960)

This PR replaces min/max token fields with a `TokenRange` field that uses the `Range[uint32]` type.
The `Range[uint32]` uses similar semantics as the `FingerprintBounds` we use for fingerprint ranges.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored and rhnasc committed Apr 12, 2024
1 parent 41628c4 commit 2bc53bc
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 140 deletions.
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error

}

ownershipBounds, err := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)
keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Uint64Range)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range")
}
return ownershipBounds, true, nil
return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil
}

// runs a single round of compaction for all relevant tenants and tables
Expand Down
31 changes: 12 additions & 19 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,23 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().MaxToken, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().TokenRange.Max, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
tokenRange: it.At().TokenRange,
})
}

if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 {
if len(servers) > 0 && servers[len(servers)-1].tokenRange.Max < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
id: servers[0].id,
addrs: servers[0].addrs,
tokenRange: bloomutils.NewTokenRange(servers[len(servers)-1].tokenRange.Max+1, math.MaxUint32),
})
}
return servers, nil
Expand All @@ -334,18 +332,13 @@ type instanceWithToken struct {
}

type addrsWithTokenRange struct {
id string
addrs []string
minToken, maxToken uint32
id string
addrs []string
tokenRange bloomutils.Range[uint32]
}

func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck {
if token < s.minToken {
return v1.Before
} else if token > s.maxToken {
return v1.After
}
return v1.Overlap
return s.tokenRange.Cmp(token)
}

type instanceWithFingerprints struct {
Expand Down
39 changes: 21 additions & 18 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/grafana/loki/pkg/validation"
)

// short constructor
var newTr = bloomutils.NewTokenRange

func TestBloomGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -53,10 +56,10 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
{Fingerprint: 401}, // out of bounds, will be dismissed
}
servers := []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 100},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 101, maxToken: 200},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 201, maxToken: 300},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 301, maxToken: 400},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 100)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(101, 200)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(201, 300)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(301, 400)},
}

// partition fingerprints
Expand Down Expand Up @@ -135,9 +138,9 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
{Fingerprint: 350},
}
servers := []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 200},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 100, maxToken: 300},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 200, maxToken: 400},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 200)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(100, 300)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(200, 400)},
}

// partition fingerprints
Expand Down Expand Up @@ -174,10 +177,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, math.MaxUint32/6*1)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/6*1+1, math.MaxUint32/6*3)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(math.MaxUint32/6*3+1, math.MaxUint32/6*5)},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/6*5+1, math.MaxUint32)},
},
},
"MinUint32 and MaxUint32 are tokens in the ring": {
Expand All @@ -186,10 +189,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 0)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(1, math.MaxUint32/3)},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/3*1+1, math.MaxUint32/3*2)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/3*2+1, math.MaxUint32)},
},
},
}
Expand All @@ -215,7 +218,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {

it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().MaxToken, it.At().Instance.Addr)
t.Log(it.At().TokenRange.Max, it.At().Instance.Addr)
}

testCases := []struct {
Expand Down Expand Up @@ -357,10 +360,10 @@ type mockRing struct {
// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].MaxToken < key {
if r.ranges[i].TokenRange.Max < key {
return 1
}
if r.ranges[i].MaxToken > key {
if r.ranges[i].TokenRange.Max > key {
return -1
}
return 0
Expand Down
106 changes: 54 additions & 52 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,62 @@
// This file contains a bunch of utility functions for bloom components.
// TODO: Find a better location for this package

package bloomutils

import (
"errors"
"fmt"
"math"
"sort"

"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
MinToken, MaxToken uint32
var (
Uint32Range = Range[uint32]{Min: 0, Max: math.MaxUint32}
Uint64Range = Range[uint64]{Min: 0, Max: math.MaxUint64}
)

type Range[T constraints.Integer] struct {
Min, Max T
}

func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
if token < i.MinToken {
func (r Range[T]) String() string {
return fmt.Sprintf("%016x-%016x", r.Min, r.Max)
}

func (r Range[T]) Less(other Range[T]) bool {
if r.Min != other.Min {
return r.Min < other.Min
}
return r.Max <= other.Max
}

func (r Range[T]) Cmp(t T) v1.BoundsCheck {
if t < r.Min {
return v1.Before
} else if token > i.MaxToken {
} else if t > r.Max {
return v1.After
}
return v1.Overlap
}

func NewTokenRange(min, max uint32) Range[uint32] {
return Range[uint32]{min, max}
}

type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
TokenRange Range[uint32]
}

func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
return i.TokenRange.Cmp(token)
}

type InstancesWithTokenRange []InstanceWithTokenRange

func (i InstancesWithTokenRange) Contains(token uint32) bool {
Expand All @@ -39,11 +68,11 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool {
return false
}

// GetInstanceWithTokenRange calculates the token range for a specific instance
// KeyRangeForInstance calculates the token range for a specific instance
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.FingerprintBounds, error) {
func KeyRangeForInstance[T constraints.Integer](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {

// Sort instances -- they may not be sorted
// because they're usually accessed by looking up the tokens (which are sorted)
Expand All @@ -57,57 +86,31 @@ func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.Fin

// instance with Id == id not found
if idx == -1 {
return v1.FingerprintBounds{}, ring.ErrInstanceNotFound
return Range[T]{}, ring.ErrInstanceNotFound
}

i := uint64(idx)
n := uint64(len(instances))
step := math.MaxUint64 / n
diff := keyspace.Max - keyspace.Min
i := T(idx)
n := T(len(instances))

minToken := model.Fingerprint(step * i)
maxToken := model.Fingerprint(step*i + step - 1)
if diff < n {
return Range[T]{}, errors.New("keyspace is smaller than amount of instances")
}

step := diff / n
min := step * i
max := step*i + step - 1
if i == n-1 {
// extend the last token tange to MaxUint32
maxToken = math.MaxUint64
max = (keyspace.Max - keyspace.Min)
}

return v1.NewBounds(minToken, maxToken), nil
}

// GetInstancesWithTokenRanges calculates the token ranges for a specific
// instance with given id based on all tokens in the ring.
// If the instances in the ring are configured with a single token, such as the
// bloom compactor, use GetInstanceWithTokenRange() instead.
func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange {
servers := make([]InstanceWithTokenRange, 0, len(instances))
it := NewInstanceSortMergeIterator(instances)
var firstInst ring.InstanceDesc
var lastToken uint32
for it.Next() {
if firstInst.Id == "" {
firstInst = it.At().Instance
}
if it.At().Instance.Id == id {
servers = append(servers, it.At())
}
lastToken = it.At().MaxToken
}
// append token range from lastToken+1 to MaxUint32
// only if the instance with the first token is the current one
if len(servers) > 0 && firstInst.Id == id {
servers = append(servers, InstanceWithTokenRange{
MinToken: lastToken + 1,
MaxToken: math.MaxUint32,
Instance: servers[0].Instance,
})
}
return servers
return Range[T]{min, max}, nil
}

// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] {

tokenIters := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances))
for i, inst := range instances {
sort.Slice(inst.Tokens, func(a, b int) bool { return inst.Tokens[a] < inst.Tokens[b] })
Expand All @@ -131,9 +134,8 @@ func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[Ins
minToken, maxToken := uint32(prevToken+1), iv.Value()
prevToken = int(maxToken)
return InstanceWithTokenRange{
Instance: instances[iv.Index()],
MinToken: minToken,
MaxToken: maxToken,
Instance: instances[iv.Index()],
TokenRange: NewTokenRange(minToken, maxToken),
}
},
func(iv v1.IndexedValue[uint32], iwtr InstanceWithTokenRange) InstanceWithTokenRange {
Expand Down
Loading

0 comments on commit 2bc53bc

Please sign in to comment.