Skip to content

Commit

Permalink
Make KeyRangeForInstance generic
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 15, 2024
1 parent ddb53e9 commit e9a0f90
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomcompactor

import (
"context"
"math"
"sync"
"time"

Expand Down Expand Up @@ -201,11 +202,11 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error

}

ownershipBounds, err := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)
keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Range[uint64]{Min: 0, Max: math.MaxUint64})
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range")
}
return ownershipBounds, true, nil
return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil
}

// runs a single round of compaction for all relevant tenants and tables
Expand Down
32 changes: 21 additions & 11 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
package bloomutils

import (
"errors"
"fmt"
"math"
"sort"

"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

var (
Uint32Range = Range[uint32]{Min: 0, Max: math.MaxUint32}
Uint64Range = Range[uint64]{Min: 0, Max: math.MaxUint64}
)

type Range[T constraints.Integer] struct {
Min, Max T
}
Expand Down Expand Up @@ -63,11 +68,11 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool {
return false
}

// GetInstanceWithTokenRange calculates the token range for a specific instance
// KeyRangeForInstance calculates the token range for a specific instance
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.FingerprintBounds, error) {
func KeyRangeForInstance[T constraints.Integer](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {

// Sort instances -- they may not be sorted
// because they're usually accessed by looking up the tokens (which are sorted)
Expand All @@ -81,21 +86,26 @@ func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.Fin

// instance with Id == id not found
if idx == -1 {
return v1.FingerprintBounds{}, ring.ErrInstanceNotFound
return Range[T]{}, ring.ErrInstanceNotFound
}

i := uint64(idx)
n := uint64(len(instances))
step := math.MaxUint64 / n
diff := keyspace.Max - keyspace.Min
i := T(idx)
n := T(len(instances))

if diff < n {
return Range[T]{}, errors.New("keyspace is smaller than amount of instances")
}

minToken := model.Fingerprint(step * i)
maxToken := model.Fingerprint(step*i + step - 1)
step := diff / n
min := step * i
max := step*i + step - 1
if i == n-1 {
// extend the last token tange to MaxUint32
maxToken = math.MaxUint64
max = (keyspace.Max - keyspace.Min)
}

return v1.NewBounds(minToken, maxToken), nil
return Range[T]{min, max}, nil
}

// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomutils/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
func TestBloomGatewayClient_InstanceSortMergeIterator(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{5, 9}},
{Id: "2", Tokens: []uint32{3, 7}},
Expand All @@ -33,7 +33,7 @@ func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
}
}

func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) {
func TestBloomGatewayClient_KeyRangeForInstance(t *testing.T) {
for name, tc := range map[string]struct {
id string
input []ring.InstanceDesc
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) {
} {
tc := tc
t.Run(name, func(t *testing.T) {
result, err := GetInstanceWithTokenRange(tc.id, tc.input)
result, err := KeyRangeForInstance(tc.id, tc.input, Uint32Range)
require.NoError(t, err)
require.Equal(t, tc.expected, result)
})
Expand Down

0 comments on commit e9a0f90

Please sign in to comment.