Skip to content

Commit

Permalink
Replace min/max token with TokenRange in ring utilities
Browse files Browse the repository at this point in the history
Similar to FingerprintBounds

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 15, 2024
1 parent d0fae5c commit f9399c8
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 59 deletions.
31 changes: 12 additions & 19 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,23 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().MaxToken, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().TokenRange.Max, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
tokenRange: it.At().TokenRange,
})
}

if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 {
if len(servers) > 0 && servers[len(servers)-1].tokenRange.Max < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
id: servers[0].id,
addrs: servers[0].addrs,
tokenRange: bloomutils.NewTokenRange(servers[len(servers)-1].tokenRange.Max+1, math.MaxUint32),
})
}
return servers, nil
Expand All @@ -334,18 +332,13 @@ type instanceWithToken struct {
}

type addrsWithTokenRange struct {
id string
addrs []string
minToken, maxToken uint32
id string
addrs []string
tokenRange bloomutils.Range[uint32]
}

func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck {
if token < s.minToken {
return v1.Before
} else if token > s.maxToken {
return v1.After
}
return v1.Overlap
return s.tokenRange.Cmp(token)
}

type instanceWithFingerprints struct {
Expand Down
39 changes: 21 additions & 18 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/grafana/loki/pkg/validation"
)

// short constructor
var newTr = bloomutils.NewTokenRange

func TestBloomGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -53,10 +56,10 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
{Fingerprint: 401}, // out of bounds, will be dismissed
}
servers := []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 100},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 101, maxToken: 200},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 201, maxToken: 300},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 301, maxToken: 400},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 100)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(101, 200)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(201, 300)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(301, 400)},
}

// partition fingerprints
Expand Down Expand Up @@ -135,9 +138,9 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
{Fingerprint: 350},
}
servers := []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 200},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 100, maxToken: 300},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 200, maxToken: 400},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 200)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(100, 300)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(200, 400)},
}

// partition fingerprints
Expand Down Expand Up @@ -174,10 +177,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, math.MaxUint32/6*1)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/6*1+1, math.MaxUint32/6*3)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(math.MaxUint32/6*3+1, math.MaxUint32/6*5)},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/6*5+1, math.MaxUint32)},
},
},
"MinUint32 and MaxUint32 are tokens in the ring": {
Expand All @@ -186,10 +189,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 0)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(1, math.MaxUint32/3)},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/3*1+1, math.MaxUint32/3*2)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/3*2+1, math.MaxUint32)},
},
},
}
Expand All @@ -215,7 +218,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {

it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().MaxToken, it.At().Instance.Addr)
t.Log(it.At().TokenRange.Max, it.At().Instance.Addr)
}

testCases := []struct {
Expand Down Expand Up @@ -357,10 +360,10 @@ type mockRing struct {
// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].MaxToken < key {
if r.ranges[i].TokenRange.Max < key {
return 1
}
if r.ranges[i].MaxToken > key {
if r.ranges[i].TokenRange.Max > key {
return -1
}
return 0
Expand Down
53 changes: 40 additions & 13 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// This file contains a bunch of utility functions for bloom components.
// TODO: Find a better location for this package

package bloomutils

import (
"fmt"
"math"
"sort"

Expand All @@ -14,20 +14,47 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
MinToken, MaxToken uint32
type integer interface {
~uint | ~uint32 | ~uint64 | ~int | ~int32 | ~int64
}

func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
if token < i.MinToken {
type Range[T integer] struct {
Min, Max T
}

func (r Range[T]) String() string {
return fmt.Sprintf("%016x-%016x", r.Min, r.Max)
}

func (r Range[T]) Less(other Range[T]) bool {
if r.Min != other.Min {
return r.Min < other.Min
}
return r.Max <= other.Max
}

func (r Range[T]) Cmp(t T) v1.BoundsCheck {
if t < r.Min {
return v1.Before
} else if token > i.MaxToken {
} else if t > r.Max {
return v1.After
}
return v1.Overlap
}

func NewTokenRange(min, max uint32) Range[uint32] {
return Range[uint32]{min, max}
}

type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
TokenRange Range[uint32]
}

func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
return i.TokenRange.Cmp(token)
}

type InstancesWithTokenRange []InstanceWithTokenRange

func (i InstancesWithTokenRange) Contains(token uint32) bool {
Expand Down Expand Up @@ -90,15 +117,14 @@ func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) Insta
if it.At().Instance.Id == id {
servers = append(servers, it.At())
}
lastToken = it.At().MaxToken
lastToken = it.At().TokenRange.Max
}
// append token range from lastToken+1 to MaxUint32
// only if the instance with the first token is the current one
if len(servers) > 0 && firstInst.Id == id {
servers = append(servers, InstanceWithTokenRange{
MinToken: lastToken + 1,
MaxToken: math.MaxUint32,
Instance: servers[0].Instance,
Instance: servers[0].Instance,
TokenRange: NewTokenRange(lastToken+1, math.MaxUint32),
})
}
return servers
Expand All @@ -107,14 +133,15 @@ func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) Insta
// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] {

it := &sortMergeIterator[ring.InstanceDesc, uint32, InstanceWithTokenRange]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32, prev *InstanceWithTokenRange) *InstanceWithTokenRange {
var prevToken uint32
if prev != nil {
prevToken = prev.MaxToken + 1
prevToken = prev.TokenRange.Max + 1
}
return &InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val}
return &InstanceWithTokenRange{Instance: item, TokenRange: NewTokenRange(prevToken, val)}
},
}
sequences := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances))
Expand Down
18 changes: 9 additions & 9 deletions pkg/bloomutils/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
{Id: "3", Tokens: []uint32{1}},
}
expected := []InstanceWithTokenRange{
{Instance: input[2], MinToken: 0, MaxToken: 1},
{Instance: input[1], MinToken: 2, MaxToken: 3},
{Instance: input[0], MinToken: 4, MaxToken: 5},
{Instance: input[1], MinToken: 6, MaxToken: 7},
{Instance: input[0], MinToken: 8, MaxToken: 9},
{Instance: input[2], TokenRange: NewTokenRange(0, 1)},
{Instance: input[1], TokenRange: NewTokenRange(2, 3)},
{Instance: input[0], TokenRange: NewTokenRange(4, 5)},
{Instance: input[1], TokenRange: NewTokenRange(6, 7)},
{Instance: input[0], TokenRange: NewTokenRange(8, 9)},
}

var i int
Expand All @@ -41,8 +41,8 @@ func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) {
{Id: "3", Tokens: []uint32{1}},
}
expected := InstancesWithTokenRange{
{Instance: input[1], MinToken: 2, MaxToken: 3},
{Instance: input[1], MinToken: 6, MaxToken: 7},
{Instance: input[1], TokenRange: NewTokenRange(2, 3)},
{Instance: input[1], TokenRange: NewTokenRange(6, 7)},
}

result := GetInstancesWithTokenRanges("2", input)
Expand All @@ -56,8 +56,8 @@ func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) {
{Id: "3", Tokens: []uint32{1}},
}
expected := InstancesWithTokenRange{
{Instance: input[2], MinToken: 0, MaxToken: 1},
{Instance: input[2], MinToken: 10, MaxToken: math.MaxUint32},
{Instance: input[2], TokenRange: NewTokenRange(0, 1)},
{Instance: input[2], TokenRange: NewTokenRange(10, math.MaxUint32)},
}

result := GetInstancesWithTokenRanges("3", input)
Expand Down

0 comments on commit f9399c8

Please sign in to comment.