From d5d0706b60156ab9acc4df962de53f73b3624b0e Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 4 Dec 2023 11:15:47 +0100 Subject: [PATCH 1/8] Make compactTenant function more readable Signed-off-by: Christian Haudum --- pkg/bloomcompactor/bloomcompactor.go | 41 ++++++++++++++++++---------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index bead6f79c320a..1e16d937cfc68 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -343,14 +343,27 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip) errs := multierror.New() - if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { - if isMultiTenantIndex { // TODO: handle multitenant tables - return fmt.Errorf("unexpected multi-tenant") + _ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { + if isMultiTenantIndex { + // Skip multi-tenant indexes + return nil + } + + tsdbFile, ok := idx.(*tsdb.TSDBFile) + if !ok { + errs.Add(fmt.Errorf("failed to cast to TSDBFile")) + return nil + } + + tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex) + if !ok { + errs.Add(fmt.Errorf("failed to cast to TSDBIndex")) + return nil } var seriesMetas []seriesMeta - // TODO: Make these casts safely - if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries( + + err := tsdbIndex.ForSeries( ctx, nil, 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { @@ -371,26 +384,26 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto //All seriesMetas given a table within fp of this compactor shard seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp}) }, - ); err != nil { + ) + + if err != nil { errs.Add(err) + return nil } job := NewJob(tenant, tableName, idx.Path(), seriesMetas) jobLogger := log.With(logger, "job", job.String()) c.metrics.compactionRunJobStarted.Inc() - if err := c.runCompact(ctx, jobLogger, job, bt, sc); err != nil { + err = c.runCompact(ctx, jobLogger, job, bt, sc) + if err != nil { c.metrics.compactionRunJobFailed.Inc() errs.Add(errors.Wrap(err, "runBloomCompact failed")) - return errs.Err() + } else { + c.metrics.compactionRunJobSuceeded.Inc() } - - c.metrics.compactionRunJobSuceeded.Inc() - return nil - }); err != nil { - errs.Add(err) - } + }) return errs.Err() } From ce4867e19fe0571533595d465c9a07cb630c561a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 4 Dec 2023 16:54:56 +0100 Subject: [PATCH 2/8] Move common code from bloomgateway into shared packages Signed-off-by: Christian Haudum --- pkg/bloomgateway/client.go | 77 ++++---------------------- pkg/bloomgateway/client_test.go | 45 ++++------------ pkg/bloomgateway/multiplexing.go | 16 +++--- pkg/bloomgateway/util.go | 61 --------------------- pkg/bloomgateway/util_test.go | 28 ---------- pkg/bloomutils/iter.go | 37 +++++++++++++ pkg/bloomutils/ring.go | 89 +++++++++++++++++++++++++++++++ pkg/bloomutils/ring_test.go | 31 +++++++++++ pkg/storage/bloom/v1/iter.go | 70 ++++++++++++++++++++++++ pkg/storage/bloom/v1/iter_test.go | 35 ++++++++++++ 10 files changed, 288 insertions(+), 201 deletions(-) create mode 100644 pkg/bloomutils/iter.go create mode 100644 pkg/bloomutils/ring.go create mode 100644 pkg/bloomutils/ring_test.go create mode 100644 pkg/storage/bloom/v1/iter.go create mode 100644 pkg/storage/bloom/v1/iter_test.go diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index cfbb6c60284ec..2d4fd98a9aa01 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/distributor/clientpool" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -286,31 +287,29 @@ func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChun bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() servers := make([]addrsWithTokenRange, 0, len(instances)) - prev := -1 - it := newInstanceSortMergeIterator(instances) + it := bloomutils.NewInstanceSortMergeIterator(instances) for it.Next() { // We can use on of the tokens from the token range // to obtain all addresses for that token. - rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones) + rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones) if err != nil { return nil, errors.Wrap(err, "bloom gateway get ring") } servers = append(servers, addrsWithTokenRange{ - minToken: uint32(prev + 1), - maxToken: it.At().token, - id: it.At().instance.Id, + id: it.At().Instance.Id, addrs: rs.GetAddresses(), + minToken: it.At().MinToken, + maxToken: it.At().MaxToken, }) - prev = int(it.At().token) } if len(servers) > 0 { // append the instance for the token range between the greates token and MaxUint32 servers = append(servers, addrsWithTokenRange{ - minToken: uint32(prev), - maxToken: math.MaxUint32, - addrs: servers[0].addrs, id: servers[0].id, + addrs: servers[0].addrs, + minToken: servers[len(servers)-1].maxToken + 1, + maxToken: math.MaxUint32, }) } @@ -401,61 +400,3 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW return result } - -// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements -// where the token of the elements are sorted in ascending order. -func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] { - it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{ - items: instances, - transform: func(item ring.InstanceDesc, val uint32) instanceWithToken { - return instanceWithToken{instance: item, token: val} - }, - } - sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances)) - for i := range instances { - sort.Slice(instances[i].Tokens, func(a, b int) bool { - return instances[i].Tokens[a] < instances[i].Tokens[b] - }) - iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i) - sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter)) - } - it.heap = v1.NewHeapIterator( - func(i, j IndexedValue[uint32]) bool { - return i.val < j.val - }, - sequences..., - ) - it.err = nil - - return it -} - -// sortMergeIterator implements v1.Iterator -type sortMergeIterator[T any, C comparable, R any] struct { - curr R - heap *v1.HeapIterator[IndexedValue[C]] - items []T - transform func(T, C) R - err error -} - -func (it *sortMergeIterator[T, C, R]) Next() bool { - ok := it.heap.Next() - if !ok { - it.err = io.EOF - return false - } - - group := it.heap.At() - it.curr = it.transform(it.items[group.idx], group.val) - - return true -} - -func (it *sortMergeIterator[T, C, R]) At() R { - return it.curr -} - -func (it *sortMergeIterator[T, C, R]) Err() error { - return it.err -} diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index f0d5b2edf5c07..62a41033ce530 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -11,12 +11,12 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/validation" ) func TestBloomGatewayClient(t *testing.T) { - logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -32,33 +32,6 @@ func TestBloomGatewayClient(t *testing.T) { }) } -func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { - input := []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{6, 5, 2, 9}}, - {Id: "2", Tokens: []uint32{3, 4, 7}}, - {Id: "3", Tokens: []uint32{1, 8, 0}}, - } - expected := []instanceWithToken{ - {instance: input[2], token: 0}, - {instance: input[2], token: 1}, - {instance: input[0], token: 2}, - {instance: input[1], token: 3}, - {instance: input[1], token: 4}, - {instance: input[0], token: 5}, - {instance: input[0], token: 6}, - {instance: input[1], token: 7}, - {instance: input[2], token: 8}, - {instance: input[0], token: 9}, - } - - var i int - it := newInstanceSortMergeIterator(input) - for it.Next() { - require.Equal(t, expected[i], it.At()) - i++ - } -} - func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { // instance token ranges do not overlap t.Run("non-overlapping", func(t *testing.T) { @@ -203,9 +176,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}}, } - it := newInstanceSortMergeIterator(instances) + it := bloomutils.NewInstanceSortMergeIterator(instances) for it.Next() { - t.Log(it.At().token, it.At().instance.Addr) + t.Log(it.At().MaxToken, it.At().Instance.Addr) } testCases := []struct { @@ -327,8 +300,8 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { var _ ring.ReadRing = &mockRing{} func newMockRing(instances []ring.InstanceDesc) *mockRing { - it := newInstanceSortMergeIterator(instances) - ranges := make([]instanceWithToken, 0) + it := bloomutils.NewInstanceSortMergeIterator(instances) + ranges := make([]bloomutils.InstanceWithTokenRange, 0) for it.Next() { ranges = append(ranges, it.At()) } @@ -340,21 +313,21 @@ func newMockRing(instances []ring.InstanceDesc) *mockRing { type mockRing struct { instances []ring.InstanceDesc - ranges []instanceWithToken + ranges []bloomutils.InstanceWithTokenRange } // Get implements ring.ReadRing. func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { idx, _ := sort.Find(len(r.ranges), func(i int) int { - if r.ranges[i].token < key { + if r.ranges[i].MaxToken < key { return 1 } - if r.ranges[i].token > key { + if r.ranges[i].MaxToken > key { return -1 } return 0 }) - return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].instance}}, nil + return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].Instance}}, nil } // GetAllHealthy implements ring.ReadRing. diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 17063a4903d23..c5c6964038931 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -164,7 +164,7 @@ type FilterRequest struct { // taskMergeIterator implements v1.Iterator type taskMergeIterator struct { curr FilterRequest - heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]] + heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]] tasks []Task day time.Time err error @@ -181,14 +181,14 @@ func newTaskMergeIterator(day time.Time, tasks ...Task) v1.PeekingIterator[v1.Re } func (it *taskMergeIterator) init() { - sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) + sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) for i := range it.tasks { - iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) + iter := v1.NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) sequences = append(sequences, v1.NewPeekingIter(iter)) } it.heap = v1.NewHeapIterator( - func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool { - return i.val.Fingerprint < j.val.Fingerprint + func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool { + return i.Value().Fingerprint < j.Value().Fingerprint }, sequences..., ) @@ -202,10 +202,10 @@ func (it *taskMergeIterator) Next() bool { } group := it.heap.At() - task := it.tasks[group.idx] + task := it.tasks[group.Index()] - it.curr.Fp = model.Fingerprint(group.val.Fingerprint) - it.curr.Chks = convertToChunkRefs(group.val.Refs) + it.curr.Fp = model.Fingerprint(group.Value().Fingerprint) + it.curr.Chks = convertToChunkRefs(group.Value().Refs) it.curr.Searches = convertToSearches(task.Request.Filters) it.curr.Response = task.ResCh it.curr.Error = task.ErrCh diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index dc95da534d0a6..33477e9052fb0 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -12,67 +12,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) -type IndexedValue[T any] struct { - idx int - val T -} - -type IterWithIndex[T any] struct { - v1.Iterator[T] - zero T // zero value of T - cache IndexedValue[T] -} - -func (it *IterWithIndex[T]) At() IndexedValue[T] { - it.cache.val = it.Iterator.At() - return it.cache -} - -func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] { - return &IterWithIndex[T]{ - Iterator: iter, - cache: IndexedValue[T]{idx: idx}, - } -} - -type SliceIterWithIndex[T any] struct { - xs []T // source slice - pos int // position within the slice - zero T // zero value of T - cache IndexedValue[T] -} - -func (it *SliceIterWithIndex[T]) Next() bool { - it.pos++ - return it.pos < len(it.xs) -} - -func (it *SliceIterWithIndex[T]) Err() error { - return nil -} - -func (it *SliceIterWithIndex[T]) At() IndexedValue[T] { - it.cache.val = it.xs[it.pos] - return it.cache -} - -func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) { - if it.pos+1 >= len(it.xs) { - it.cache.val = it.zero - return it.cache, false - } - it.cache.val = it.xs[it.pos+1] - return it.cache, true -} - -func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] { - return &SliceIterWithIndex[T]{ - xs: xs, - pos: -1, - cache: IndexedValue[T]{idx: idx}, - } -} - func getDayTime(ts model.Time) time.Time { return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC) } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 70e3d89eb2143..08c6d2a1306a4 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -10,34 +10,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) -func TestSliceIterWithIndex(t *testing.T) { - t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) { - xs := []string{"a", "b", "c"} - it := NewSliceIterWithIndex(xs, 123) - - // peek at first item - p, ok := it.Peek() - require.True(t, ok) - require.Equal(t, "a", p.val) - require.Equal(t, 123, p.idx) - - // proceed to first item - require.True(t, it.Next()) - require.Equal(t, "a", it.At().val) - require.Equal(t, 123, it.At().idx) - - // proceed to second and third item - require.True(t, it.Next()) - require.True(t, it.Next()) - - // peek at non-existing fourth item - p, ok = it.Peek() - require.False(t, ok) - require.Equal(t, "", p.val) // "" is zero value for type string - require.Equal(t, 123, p.idx) - }) -} - func TestGetFromThrough(t *testing.T) { chunks := []*logproto.ShortRef{ {From: 0, Through: 6}, diff --git a/pkg/bloomutils/iter.go b/pkg/bloomutils/iter.go new file mode 100644 index 0000000000000..5be7d4b74cece --- /dev/null +++ b/pkg/bloomutils/iter.go @@ -0,0 +1,37 @@ +package bloomutils + +import ( + "io" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" +) + +// sortMergeIterator implements v1.Iterator +type sortMergeIterator[T any, C comparable, R any] struct { + curr R + heap *v1.HeapIterator[v1.IndexedValue[C]] + items []T + transform func(T, C, R) R + err error +} + +func (it *sortMergeIterator[T, C, R]) Next() bool { + ok := it.heap.Next() + if !ok { + it.err = io.EOF + return false + } + + group := it.heap.At() + it.curr = it.transform(it.items[group.Index()], group.Value(), it.curr) + + return true +} + +func (it *sortMergeIterator[T, C, R]) At() R { + return it.curr +} + +func (it *sortMergeIterator[T, C, R]) Err() error { + return it.err +} diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go new file mode 100644 index 0000000000000..99932b157a061 --- /dev/null +++ b/pkg/bloomutils/ring.go @@ -0,0 +1,89 @@ +// This file contains a bunch of utility functions for bloom components. +// TODO: Find a better location for this package + +package bloomutils + +import ( + "math" + "sort" + + "github.com/grafana/dskit/ring" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" +) + +type InstanceWithTokenRange struct { + Instance ring.InstanceDesc + MinToken, MaxToken uint32 +} + +func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck { + if token < i.MinToken { + return v1.Before + } else if token > i.MaxToken { + return v1.After + } + return v1.Overlap +} + +type InstancesWithTokenRange []InstanceWithTokenRange + +func (i InstancesWithTokenRange) Contains(token uint32) bool { + for _, instance := range i { + if instance.Cmp(token) == v1.Overlap { + return true + } + } + return false +} + +func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { + servers := make([]InstanceWithTokenRange, 0, len(instances)) + it := NewInstanceSortMergeIterator(instances) + for it.Next() { + if it.At().Instance.Id == id { + servers = append(servers, it.At()) + } + } + // wrap around ring + if len(servers) > 0 && servers[0].Instance.Id == id { + servers = append(servers, InstanceWithTokenRange{ + MinToken: servers[len(servers)-1].MaxToken + 1, + MaxToken: math.MaxUint32, + Instance: servers[0].Instance, + }) + } + return servers +} + +// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements +// where the token of the elements are sorted in ascending order. +func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] { + it := &sortMergeIterator[ring.InstanceDesc, uint32, InstanceWithTokenRange]{ + items: instances, + transform: func(item ring.InstanceDesc, val uint32, prev InstanceWithTokenRange) InstanceWithTokenRange { + prevToken := prev.MaxToken + 1 + if prev.MaxToken == 0 { + prevToken = 0 + } + return InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val} + }, + } + sequences := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances)) + for i := range instances { + sort.Slice(instances[i].Tokens, func(a, b int) bool { + return instances[i].Tokens[a] < instances[i].Tokens[b] + }) + iter := v1.NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i) + sequences = append(sequences, v1.NewPeekingIter[v1.IndexedValue[uint32]](iter)) + } + it.heap = v1.NewHeapIterator( + func(i, j v1.IndexedValue[uint32]) bool { + return i.Value() < j.Value() + }, + sequences..., + ) + it.err = nil + + return it +} diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go new file mode 100644 index 0000000000000..16a10cf45b5c4 --- /dev/null +++ b/pkg/bloomutils/ring_test.go @@ -0,0 +1,31 @@ +package bloomutils + +import ( + "testing" + + "github.com/grafana/dskit/ring" + "github.com/stretchr/testify/require" +) + +func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { + input := []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{5, 9}}, + {Id: "2", Tokens: []uint32{3, 7}}, + {Id: "3", Tokens: []uint32{1}}, + } + expected := []InstanceWithTokenRange{ + {Instance: input[2], MinToken: 0, MaxToken: 1}, + {Instance: input[1], MinToken: 2, MaxToken: 3}, + {Instance: input[0], MinToken: 4, MaxToken: 5}, + {Instance: input[1], MinToken: 6, MaxToken: 7}, + {Instance: input[0], MinToken: 8, MaxToken: 9}, + } + + var i int + it := NewInstanceSortMergeIterator(input) + for it.Next() { + t.Log(expected[i], it.At()) + require.Equal(t, expected[i], it.At()) + i++ + } +} diff --git a/pkg/storage/bloom/v1/iter.go b/pkg/storage/bloom/v1/iter.go new file mode 100644 index 0000000000000..b1b460fb64207 --- /dev/null +++ b/pkg/storage/bloom/v1/iter.go @@ -0,0 +1,70 @@ +package v1 + +type IndexedValue[T any] struct { + idx int + val T +} + +func (iv IndexedValue[T]) Value() T { + return iv.val +} + +func (iv IndexedValue[T]) Index() int { + return iv.idx +} + +type IterWithIndex[T any] struct { + Iterator[T] + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *IterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.Iterator.At() + return it.cache +} + +func NewIterWithIndex[T any](iter Iterator[T], idx int) Iterator[IndexedValue[T]] { + return &IterWithIndex[T]{ + Iterator: iter, + cache: IndexedValue[T]{idx: idx}, + } +} + +type SliceIterWithIndex[T any] struct { + xs []T // source slice + pos int // position within the slice + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *SliceIterWithIndex[T]) Next() bool { + it.pos++ + return it.pos < len(it.xs) +} + +func (it *SliceIterWithIndex[T]) Err() error { + return nil +} + +func (it *SliceIterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.xs[it.pos] + return it.cache +} + +func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) { + if it.pos+1 >= len(it.xs) { + it.cache.val = it.zero + return it.cache, false + } + it.cache.val = it.xs[it.pos+1] + return it.cache, true +} + +func NewSliceIterWithIndex[T any](xs []T, idx int) PeekingIterator[IndexedValue[T]] { + return &SliceIterWithIndex[T]{ + xs: xs, + pos: -1, + cache: IndexedValue[T]{idx: idx}, + } +} diff --git a/pkg/storage/bloom/v1/iter_test.go b/pkg/storage/bloom/v1/iter_test.go new file mode 100644 index 0000000000000..3ec8ead536e75 --- /dev/null +++ b/pkg/storage/bloom/v1/iter_test.go @@ -0,0 +1,35 @@ +package v1 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSliceIterWithIndex(t *testing.T) { + t.Run("SliceIterWithIndex implements PeekingIterator interface", func(t *testing.T) { + xs := []string{"a", "b", "c"} + it := NewSliceIterWithIndex(xs, 123) + + // peek at first item + p, ok := it.Peek() + require.True(t, ok) + require.Equal(t, "a", p.val) + require.Equal(t, 123, p.idx) + + // proceed to first item + require.True(t, it.Next()) + require.Equal(t, "a", it.At().val) + require.Equal(t, 123, it.At().idx) + + // proceed to second and third item + require.True(t, it.Next()) + require.True(t, it.Next()) + + // peek at non-existing fourth item + p, ok = it.Peek() + require.False(t, ok) + require.Equal(t, "", p.val) // "" is zero value for type string + require.Equal(t, 123, p.idx) + }) +} From 318e3af792a61d1dfe6973f1d9709ec1fbbf3b45 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 4 Dec 2023 16:58:08 +0100 Subject: [PATCH 3/8] Bloom Compactor: Make fingerprint ownership check more efficient Signed-off-by: Christian Haudum --- pkg/bloomcompactor/bloomcompactor.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 1e16d937cfc68..e07ce4a9ac3f3 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -42,6 +42,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" @@ -343,6 +344,12 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip) errs := multierror.New() + rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp) + if err != nil { + return err + } + tokenRanges := bloomutils.GetInstancesWithTokenRanges(c.cfg.Ring.InstanceID, rs.Instances) + _ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { if isMultiTenantIndex { // Skip multi-tenant indexes @@ -367,15 +374,7 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto ctx, nil, 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { - // TODO: Inefficient as is, calls the ring per fingerprint. Refactor to make the call once per compaction fingerprint bounds. - ownsFingerprint, err := c.sharding.OwnsFingerprint(tenant, uint64(fingerprint)) - - if err != nil { - level.Error(logger).Log("msg", "failed to check if compactor owns fp", "err", err) - errs.Add(err) - return - } - if !ownsFingerprint { + if !tokenRanges.Contains(uint32(fingerprint)) { return } From a0e6ed9be8559009e58ff918b3afc6c2745d8a72 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 7 Dec 2023 10:17:01 +0100 Subject: [PATCH 4/8] fixup! Merge branch 'main' into chaudum/bloomcompactor-fingerprint-ownership Signed-off-by: Christian Haudum --- pkg/bloomutils/ring.go | 13 ++++++++++--- pkg/bloomutils/ring_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index 99932b157a061..3c362c8954ed9 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -40,15 +40,22 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool { func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { servers := make([]InstanceWithTokenRange, 0, len(instances)) it := NewInstanceSortMergeIterator(instances) + var firstInst ring.InstanceDesc + var lastToken uint32 for it.Next() { + if firstInst.Id == "" { + firstInst = it.At().Instance + } if it.At().Instance.Id == id { servers = append(servers, it.At()) } + lastToken = it.At().MaxToken } - // wrap around ring - if len(servers) > 0 && servers[0].Instance.Id == id { + // append token range from lastToken+1 to MaxUint32 + // only if the instance with the first token is the current one + if len(servers) > 0 && firstInst.Id == id { servers = append(servers, InstanceWithTokenRange{ - MinToken: servers[len(servers)-1].MaxToken + 1, + MinToken: lastToken + 1, MaxToken: math.MaxUint32, Instance: servers[0].Instance, }) diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go index 16a10cf45b5c4..8077544589e35 100644 --- a/pkg/bloomutils/ring_test.go +++ b/pkg/bloomutils/ring_test.go @@ -1,6 +1,7 @@ package bloomutils import ( + "math" "testing" "github.com/grafana/dskit/ring" @@ -29,3 +30,35 @@ func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { i++ } } + +func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) { + t.Run("instance does not own first token in the ring", func(t *testing.T) { + input := []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{5, 9}}, + {Id: "2", Tokens: []uint32{3, 7}}, + {Id: "3", Tokens: []uint32{1}}, + } + expected := InstancesWithTokenRange{ + {Instance: input[1], MinToken: 2, MaxToken: 3}, + {Instance: input[1], MinToken: 6, MaxToken: 7}, + } + + result := GetInstancesWithTokenRanges("2", input) + require.Equal(t, expected, result) + }) + + t.Run("instance owns first token in the ring", func(t *testing.T) { + input := []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{5, 9}}, + {Id: "2", Tokens: []uint32{3, 7}}, + {Id: "3", Tokens: []uint32{1}}, + } + expected := InstancesWithTokenRange{ + {Instance: input[2], MinToken: 0, MaxToken: 1}, + {Instance: input[2], MinToken: 10, MaxToken: math.MaxUint32}, + } + + result := GetInstancesWithTokenRanges("3", input) + require.Equal(t, expected, result) + }) +} From 3b41d191321515c4ade147a61185394c352114ba Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 7 Dec 2023 14:37:25 +0100 Subject: [PATCH 5/8] Fix `serverAddressesWithTokenRanges` function Handle ranges correctly when token is 0 or MaxUint32 Signed-off-by: Christian Haudum --- pkg/bloomgateway/client.go | 15 +++++++++++---- pkg/bloomgateway/client_test.go | 34 ++++++++++++++++++++++++++++++++- pkg/bloomutils/iter.go | 6 +++--- pkg/bloomutils/ring.go | 10 +++++----- 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 2d4fd98a9aa01..e1bd59a0e8e57 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -284,6 +284,15 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway } func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) { + servers, err := serverAddressesWithTokenRanges(subRing, instances) + if err != nil { + return nil, err + } + boundedFingerprints := partitionFingerprintsByAddresses(groups, servers) + return groupByInstance(boundedFingerprints), nil +} + +func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) { bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() servers := make([]addrsWithTokenRange, 0, len(instances)) @@ -303,7 +312,7 @@ func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChun }) } - if len(servers) > 0 { + if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 { // append the instance for the token range between the greates token and MaxUint32 servers = append(servers, addrsWithTokenRange{ id: servers[0].id, @@ -312,9 +321,7 @@ func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChun maxToken: math.MaxUint32, }) } - - boundedFingerprints := partitionFingerprintsByAddresses(groups, servers) - return groupByInstance(boundedFingerprints), nil + return servers, nil } type instanceWithToken struct { diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 62a41033ce530..ddb8d2fb529b1 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -1,6 +1,7 @@ package bloomgateway import ( + "math" "sort" "testing" "time" @@ -156,6 +157,37 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { }) } +func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { + testCases := map[string]struct { + instances []ring.InstanceDesc + expected []addrsWithTokenRange + }{ + "MinUint32 and MaxUint32 are actual tokens in the ring": { + instances: []ring.InstanceDesc{ + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}}, + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}}, + }, + expected: []addrsWithTokenRange{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32}, + }, + }, + } + + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + subRing := newMockRing(tc.instances) + res, err := serverAddressesWithTokenRanges(subRing, tc.instances) + require.NoError(t, err) + require.Equal(t, tc.expected, res) + }) + } + +} + func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { logger := log.NewNopLogger() @@ -230,7 +262,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { }, }, { - name: "fingerprints with token ranges of a multiple instance are grouped", + name: "fingerprints with token ranges of multiple instances are grouped", chunks: []*logproto.GroupedChunkRefs{ // instance 1 {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, diff --git a/pkg/bloomutils/iter.go b/pkg/bloomutils/iter.go index 5be7d4b74cece..fdbe4a5e62587 100644 --- a/pkg/bloomutils/iter.go +++ b/pkg/bloomutils/iter.go @@ -8,10 +8,10 @@ import ( // sortMergeIterator implements v1.Iterator type sortMergeIterator[T any, C comparable, R any] struct { - curr R + curr *R heap *v1.HeapIterator[v1.IndexedValue[C]] items []T - transform func(T, C, R) R + transform func(T, C, *R) *R err error } @@ -29,7 +29,7 @@ func (it *sortMergeIterator[T, C, R]) Next() bool { } func (it *sortMergeIterator[T, C, R]) At() R { - return it.curr + return *it.curr } func (it *sortMergeIterator[T, C, R]) Err() error { diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index 3c362c8954ed9..d7af09039e9a0 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -68,12 +68,12 @@ func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) Insta func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] { it := &sortMergeIterator[ring.InstanceDesc, uint32, InstanceWithTokenRange]{ items: instances, - transform: func(item ring.InstanceDesc, val uint32, prev InstanceWithTokenRange) InstanceWithTokenRange { - prevToken := prev.MaxToken + 1 - if prev.MaxToken == 0 { - prevToken = 0 + transform: func(item ring.InstanceDesc, val uint32, prev *InstanceWithTokenRange) *InstanceWithTokenRange { + var prevToken uint32 + if prev != nil { + prevToken = prev.MaxToken + 1 } - return InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val} + return &InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val} }, } sequences := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances)) From f2b443e9ed655f3ef2d9d67aa57fc2af6ae1a979 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 7 Dec 2023 18:28:47 +0100 Subject: [PATCH 6/8] fixup! Fix `serverAddressesWithTokenRanges` function Signed-off-by: Christian Haudum --- pkg/bloomgateway/client_test.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index ddb8d2fb529b1..6edd8fcb406ea 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -162,7 +162,20 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { instances []ring.InstanceDesc expected []addrsWithTokenRange }{ - "MinUint32 and MaxUint32 are actual tokens in the ring": { + "one token per instance": { + instances: []ring.InstanceDesc{ + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{math.MaxUint32 / 6 * 1}}, + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 6 * 3}}, + {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}}, + }, + expected: []addrsWithTokenRange{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32}, + }, + }, + "MinUint32 and MaxUint32 are tokens in the ring": { instances: []ring.InstanceDesc{ {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}}, {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}}, From d6a94fbc3ea34afbdea8743f88fe15f87e7a3479 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 11 Dec 2023 09:26:22 +0100 Subject: [PATCH 7/8] Calculate token range base off first instance token Divide full token range by the amount of instances to get equal token ranges. Assign ranges based on the sort order of the first token of an instance in the ring. Signed-off-by: Christian Haudum --- pkg/bloomcompactor/bloomcompactor.go | 2 +- pkg/bloomutils/ring.go | 49 ++++++++++++++++++++++++++++ pkg/bloomutils/ring_test.go | 48 +++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index e07ce4a9ac3f3..13fc0d9bf3e3e 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -348,7 +348,7 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto if err != nil { return err } - tokenRanges := bloomutils.GetInstancesWithTokenRanges(c.cfg.Ring.InstanceID, rs.Instances) + tokenRanges := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances) _ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { if isMultiTenantIndex { diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index d7af09039e9a0..ab6f9d22404b3 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -8,6 +8,7 @@ import ( "sort" "github.com/grafana/dskit/ring" + "golang.org/x/exp/slices" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) @@ -37,6 +38,54 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool { return false } +// GetInstanceTokenRange calculates the token range for a specific instance +// with given id based on the first token in the ring. +// This assumes that each instance in the ring is configured with only a single +// token. +func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { + + // Sorting the tokens of the instances would not be necessary if there is + // only a single token per instances, however, since we only assume one + // token, but don't enforce one token, we keep the sorting. + for _, inst := range instances { + sort.Slice(inst.Tokens, func(i, j int) bool { + return inst.Tokens[i] < inst.Tokens[j] + }) + } + + // Sort instances + sort.Slice(instances, func(i, j int) bool { + return instances[i].Tokens[0] < instances[j].Tokens[0] + }) + + idx := slices.IndexFunc(instances, func(inst ring.InstanceDesc) bool { + return inst.Id == id + }) + + // instance with Id == id not found + if idx == -1 { + return InstancesWithTokenRange{} + } + + n := len(instances) + step := math.MaxUint32 / n + + minToken := uint32(step * idx) + maxToken := uint32(step*idx + step - 1) + if idx == n-1 { + // extend the last token tange to MaxUint32 + maxToken = math.MaxUint32 + } + + return InstancesWithTokenRange{ + {MinToken: minToken, MaxToken: maxToken, Instance: instances[idx]}, + } +} + +// GetInstancesWithTokenRanges calculates the token ranges for a specific +// instance with given id based on all tokens in the ring. +// If the instances in the ring are configured with a single token, such as the +// bloom compactor, use GetInstanceWithTokenRange() instead. func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { servers := make([]InstanceWithTokenRange, 0, len(instances)) it := NewInstanceSortMergeIterator(instances) diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go index 8077544589e35..30da072021edf 100644 --- a/pkg/bloomutils/ring_test.go +++ b/pkg/bloomutils/ring_test.go @@ -62,3 +62,51 @@ func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) { require.Equal(t, expected, result) }) } + +func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) { + for name, tc := range map[string]struct { + id string + input []ring.InstanceDesc + expected InstancesWithTokenRange + }{ + "first instance includes 0 token": { + id: "3", + input: []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{3}}, + {Id: "2", Tokens: []uint32{5}}, + {Id: "3", Tokens: []uint32{1}}, + }, + expected: InstancesWithTokenRange{ + {Instance: ring.InstanceDesc{Id: "3", Tokens: []uint32{1}}, MinToken: 0, MaxToken: math.MaxUint32/3 - 1}, + }, + }, + "middle instance": { + id: "1", + input: []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{3}}, + {Id: "2", Tokens: []uint32{5}}, + {Id: "3", Tokens: []uint32{1}}, + }, + expected: InstancesWithTokenRange{ + {Instance: ring.InstanceDesc{Id: "1", Tokens: []uint32{3}}, MinToken: math.MaxUint32 / 3, MaxToken: math.MaxUint32/3*2 - 1}, + }, + }, + "last instance includes MaxUint32 token": { + id: "2", + input: []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{3}}, + {Id: "2", Tokens: []uint32{5}}, + {Id: "3", Tokens: []uint32{1}}, + }, + expected: InstancesWithTokenRange{ + {Instance: ring.InstanceDesc{Id: "2", Tokens: []uint32{5}}, MinToken: math.MaxUint32 / 3 * 2, MaxToken: math.MaxUint32}, + }, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + result := GetInstanceWithTokenRange(tc.id, tc.input) + require.Equal(t, tc.expected, result) + }) + } +} From 585c34214b9822bcfeb690af9cf1e431ae6044ec Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 12 Dec 2023 08:55:23 +0100 Subject: [PATCH 8/8] Fix constant int overflow for ARM Signed-off-by: Christian Haudum --- pkg/bloomutils/ring.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index ab6f9d22404b3..08e62a13acb71 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -67,18 +67,19 @@ func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) Instanc return InstancesWithTokenRange{} } - n := len(instances) + i := uint32(idx) + n := uint32(len(instances)) step := math.MaxUint32 / n - minToken := uint32(step * idx) - maxToken := uint32(step*idx + step - 1) - if idx == n-1 { + minToken := step * i + maxToken := step*i + step - 1 + if i == n-1 { // extend the last token tange to MaxUint32 maxToken = math.MaxUint32 } return InstancesWithTokenRange{ - {MinToken: minToken, MaxToken: maxToken, Instance: instances[idx]}, + {MinToken: minToken, MaxToken: maxToken, Instance: instances[i]}, } }