Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace min/max token with TokenRange in bloom ring utilities #11960

Merged
merged 7 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error

}

ownershipBounds, err := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)
keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Uint64Range)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range")
}
return ownershipBounds, true, nil
return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil
}

// runs a single round of compaction for all relevant tenants and tables
Expand Down
31 changes: 12 additions & 19 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,23 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().MaxToken, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().TokenRange.Max, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
tokenRange: it.At().TokenRange,
})
}

if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 {
if len(servers) > 0 && servers[len(servers)-1].tokenRange.Max < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
id: servers[0].id,
addrs: servers[0].addrs,
tokenRange: bloomutils.NewTokenRange(servers[len(servers)-1].tokenRange.Max+1, math.MaxUint32),
})
}
return servers, nil
Expand All @@ -334,18 +332,13 @@ type instanceWithToken struct {
}

type addrsWithTokenRange struct {
id string
addrs []string
minToken, maxToken uint32
id string
addrs []string
tokenRange bloomutils.Range[uint32]
}

func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck {
if token < s.minToken {
return v1.Before
} else if token > s.maxToken {
return v1.After
}
return v1.Overlap
return s.tokenRange.Cmp(token)
}

type instanceWithFingerprints struct {
Expand Down
39 changes: 21 additions & 18 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/grafana/loki/pkg/validation"
)

// short constructor
var newTr = bloomutils.NewTokenRange

func TestBloomGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -53,10 +56,10 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
{Fingerprint: 401}, // out of bounds, will be dismissed
}
servers := []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 100},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 101, maxToken: 200},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 201, maxToken: 300},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 301, maxToken: 400},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 100)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(101, 200)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(201, 300)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(301, 400)},
}

// partition fingerprints
Expand Down Expand Up @@ -135,9 +138,9 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
{Fingerprint: 350},
}
servers := []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 200},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 100, maxToken: 300},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 200, maxToken: 400},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 200)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(100, 300)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(200, 400)},
}

// partition fingerprints
Expand Down Expand Up @@ -174,10 +177,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, math.MaxUint32/6*1)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/6*1+1, math.MaxUint32/6*3)},
{id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(math.MaxUint32/6*3+1, math.MaxUint32/6*5)},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/6*5+1, math.MaxUint32)},
},
},
"MinUint32 and MaxUint32 are tokens in the ring": {
Expand All @@ -186,10 +189,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 0)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(1, math.MaxUint32/3)},
{id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/3*1+1, math.MaxUint32/3*2)},
{id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/3*2+1, math.MaxUint32)},
},
},
}
Expand All @@ -215,7 +218,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {

it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().MaxToken, it.At().Instance.Addr)
t.Log(it.At().TokenRange.Max, it.At().Instance.Addr)
}

testCases := []struct {
Expand Down Expand Up @@ -357,10 +360,10 @@ type mockRing struct {
// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].MaxToken < key {
if r.ranges[i].TokenRange.Max < key {
return 1
}
if r.ranges[i].MaxToken > key {
if r.ranges[i].TokenRange.Max > key {
return -1
}
return 0
Expand Down
106 changes: 54 additions & 52 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,62 @@
// This file contains a bunch of utility functions for bloom components.
// TODO: Find a better location for this package

package bloomutils

import (
"errors"
"fmt"
"math"
"sort"

"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
MinToken, MaxToken uint32
var (
Uint32Range = Range[uint32]{Min: 0, Max: math.MaxUint32}
Uint64Range = Range[uint64]{Min: 0, Max: math.MaxUint64}
)

type Range[T constraints.Integer] struct {
Min, Max T
}

func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
if token < i.MinToken {
func (r Range[T]) String() string {
return fmt.Sprintf("%016x-%016x", r.Min, r.Max)
}

func (r Range[T]) Less(other Range[T]) bool {
if r.Min != other.Min {
return r.Min < other.Min
}
return r.Max <= other.Max
}

func (r Range[T]) Cmp(t T) v1.BoundsCheck {
if t < r.Min {
return v1.Before
} else if token > i.MaxToken {
} else if t > r.Max {
return v1.After
}
return v1.Overlap
}

func NewTokenRange(min, max uint32) Range[uint32] {
return Range[uint32]{min, max}
}

type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
TokenRange Range[uint32]
}

func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
return i.TokenRange.Cmp(token)
}

type InstancesWithTokenRange []InstanceWithTokenRange

func (i InstancesWithTokenRange) Contains(token uint32) bool {
Expand All @@ -39,11 +68,11 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool {
return false
}

// GetInstanceWithTokenRange calculates the token range for a specific instance
// KeyRangeForInstance calculates the token range for a specific instance
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.FingerprintBounds, error) {
func KeyRangeForInstance[T constraints.Integer](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {

// Sort instances -- they may not be sorted
// because they're usually accessed by looking up the tokens (which are sorted)
Expand All @@ -57,57 +86,31 @@ func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.Fin

// instance with Id == id not found
if idx == -1 {
return v1.FingerprintBounds{}, ring.ErrInstanceNotFound
return Range[T]{}, ring.ErrInstanceNotFound
}

i := uint64(idx)
n := uint64(len(instances))
step := math.MaxUint64 / n
diff := keyspace.Max - keyspace.Min
i := T(idx)
n := T(len(instances))

minToken := model.Fingerprint(step * i)
maxToken := model.Fingerprint(step*i + step - 1)
if diff < n {
return Range[T]{}, errors.New("keyspace is smaller than amount of instances")
}

step := diff / n
min := step * i
max := step*i + step - 1
if i == n-1 {
// extend the last token tange to MaxUint32
maxToken = math.MaxUint64
max = (keyspace.Max - keyspace.Min)
}

return v1.NewBounds(minToken, maxToken), nil
}

// GetInstancesWithTokenRanges calculates the token ranges for a specific
// instance with given id based on all tokens in the ring.
// If the instances in the ring are configured with a single token, such as the
// bloom compactor, use GetInstanceWithTokenRange() instead.
func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange {
servers := make([]InstanceWithTokenRange, 0, len(instances))
it := NewInstanceSortMergeIterator(instances)
var firstInst ring.InstanceDesc
var lastToken uint32
for it.Next() {
if firstInst.Id == "" {
firstInst = it.At().Instance
}
if it.At().Instance.Id == id {
servers = append(servers, it.At())
}
lastToken = it.At().MaxToken
}
// append token range from lastToken+1 to MaxUint32
// only if the instance with the first token is the current one
if len(servers) > 0 && firstInst.Id == id {
servers = append(servers, InstanceWithTokenRange{
MinToken: lastToken + 1,
MaxToken: math.MaxUint32,
Instance: servers[0].Instance,
})
}
return servers
return Range[T]{min, max}, nil
}

// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] {

tokenIters := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances))
for i, inst := range instances {
sort.Slice(inst.Tokens, func(a, b int) bool { return inst.Tokens[a] < inst.Tokens[b] })
Expand All @@ -131,9 +134,8 @@ func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[Ins
minToken, maxToken := uint32(prevToken+1), iv.Value()
prevToken = int(maxToken)
return InstanceWithTokenRange{
Instance: instances[iv.Index()],
MinToken: minToken,
MaxToken: maxToken,
Instance: instances[iv.Index()],
TokenRange: NewTokenRange(minToken, maxToken),
}
},
func(iv v1.IndexedValue[uint32], iwtr InstanceWithTokenRange) InstanceWithTokenRange {
Expand Down
Loading
Loading