diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index bead6f79c320a..13fc0d9bf3e3e 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -42,6 +42,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" @@ -343,26 +344,37 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip) errs := multierror.New() - if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { - if isMultiTenantIndex { // TODO: handle multitenant tables - return fmt.Errorf("unexpected multi-tenant") + rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp) + if err != nil { + return err + } + tokenRanges := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances) + + _ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { + if isMultiTenantIndex { + // Skip multi-tenant indexes + return nil + } + + tsdbFile, ok := idx.(*tsdb.TSDBFile) + if !ok { + errs.Add(fmt.Errorf("failed to cast to TSDBFile")) + return nil + } + + tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex) + if !ok { + errs.Add(fmt.Errorf("failed to cast to TSDBIndex")) + return nil } var seriesMetas []seriesMeta - // TODO: Make these casts safely - if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries( + + err := tsdbIndex.ForSeries( ctx, nil, 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { - // TODO: Inefficient as is, calls the ring per fingerprint. Refactor to make the call once per compaction fingerprint bounds. - ownsFingerprint, err := c.sharding.OwnsFingerprint(tenant, uint64(fingerprint)) - - if err != nil { - level.Error(logger).Log("msg", "failed to check if compactor owns fp", "err", err) - errs.Add(err) - return - } - if !ownsFingerprint { + if !tokenRanges.Contains(uint32(fingerprint)) { return } @@ -371,26 +383,26 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto //All seriesMetas given a table within fp of this compactor shard seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp}) }, - ); err != nil { + ) + + if err != nil { errs.Add(err) + return nil } job := NewJob(tenant, tableName, idx.Path(), seriesMetas) jobLogger := log.With(logger, "job", job.String()) c.metrics.compactionRunJobStarted.Inc() - if err := c.runCompact(ctx, jobLogger, job, bt, sc); err != nil { + err = c.runCompact(ctx, jobLogger, job, bt, sc) + if err != nil { c.metrics.compactionRunJobFailed.Inc() errs.Add(errors.Wrap(err, "runBloomCompact failed")) - return errs.Err() + } else { + c.metrics.compactionRunJobSuceeded.Inc() } - - c.metrics.compactionRunJobSuceeded.Inc() - return nil - }); err != nil { - errs.Add(err) - } + }) return errs.Err() } diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index cfbb6c60284ec..e1bd59a0e8e57 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/distributor/clientpool" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -283,39 +284,44 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway } func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) { + servers, err := serverAddressesWithTokenRanges(subRing, instances) + if err != nil { + return nil, err + } + boundedFingerprints := partitionFingerprintsByAddresses(groups, servers) + return groupByInstance(boundedFingerprints), nil +} + +func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) { bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() servers := make([]addrsWithTokenRange, 0, len(instances)) - prev := -1 - it := newInstanceSortMergeIterator(instances) + it := bloomutils.NewInstanceSortMergeIterator(instances) for it.Next() { // We can use on of the tokens from the token range // to obtain all addresses for that token. - rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones) + rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones) if err != nil { return nil, errors.Wrap(err, "bloom gateway get ring") } servers = append(servers, addrsWithTokenRange{ - minToken: uint32(prev + 1), - maxToken: it.At().token, - id: it.At().instance.Id, + id: it.At().Instance.Id, addrs: rs.GetAddresses(), + minToken: it.At().MinToken, + maxToken: it.At().MaxToken, }) - prev = int(it.At().token) } - if len(servers) > 0 { + if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 { // append the instance for the token range between the greates token and MaxUint32 servers = append(servers, addrsWithTokenRange{ - minToken: uint32(prev), - maxToken: math.MaxUint32, - addrs: servers[0].addrs, id: servers[0].id, + addrs: servers[0].addrs, + minToken: servers[len(servers)-1].maxToken + 1, + maxToken: math.MaxUint32, }) } - - boundedFingerprints := partitionFingerprintsByAddresses(groups, servers) - return groupByInstance(boundedFingerprints), nil + return servers, nil } type instanceWithToken struct { @@ -401,61 +407,3 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW return result } - -// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements -// where the token of the elements are sorted in ascending order. -func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] { - it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{ - items: instances, - transform: func(item ring.InstanceDesc, val uint32) instanceWithToken { - return instanceWithToken{instance: item, token: val} - }, - } - sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances)) - for i := range instances { - sort.Slice(instances[i].Tokens, func(a, b int) bool { - return instances[i].Tokens[a] < instances[i].Tokens[b] - }) - iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i) - sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter)) - } - it.heap = v1.NewHeapIterator( - func(i, j IndexedValue[uint32]) bool { - return i.val < j.val - }, - sequences..., - ) - it.err = nil - - return it -} - -// sortMergeIterator implements v1.Iterator -type sortMergeIterator[T any, C comparable, R any] struct { - curr R - heap *v1.HeapIterator[IndexedValue[C]] - items []T - transform func(T, C) R - err error -} - -func (it *sortMergeIterator[T, C, R]) Next() bool { - ok := it.heap.Next() - if !ok { - it.err = io.EOF - return false - } - - group := it.heap.At() - it.curr = it.transform(it.items[group.idx], group.val) - - return true -} - -func (it *sortMergeIterator[T, C, R]) At() R { - return it.curr -} - -func (it *sortMergeIterator[T, C, R]) Err() error { - return it.err -} diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index f0d5b2edf5c07..6edd8fcb406ea 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -1,6 +1,7 @@ package bloomgateway import ( + "math" "sort" "testing" "time" @@ -11,12 +12,12 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/validation" ) func TestBloomGatewayClient(t *testing.T) { - logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -32,33 +33,6 @@ func TestBloomGatewayClient(t *testing.T) { }) } -func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { - input := []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{6, 5, 2, 9}}, - {Id: "2", Tokens: []uint32{3, 4, 7}}, - {Id: "3", Tokens: []uint32{1, 8, 0}}, - } - expected := []instanceWithToken{ - {instance: input[2], token: 0}, - {instance: input[2], token: 1}, - {instance: input[0], token: 2}, - {instance: input[1], token: 3}, - {instance: input[1], token: 4}, - {instance: input[0], token: 5}, - {instance: input[0], token: 6}, - {instance: input[1], token: 7}, - {instance: input[2], token: 8}, - {instance: input[0], token: 9}, - } - - var i int - it := newInstanceSortMergeIterator(input) - for it.Next() { - require.Equal(t, expected[i], it.At()) - i++ - } -} - func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { // instance token ranges do not overlap t.Run("non-overlapping", func(t *testing.T) { @@ -183,6 +157,50 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { }) } +func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { + testCases := map[string]struct { + instances []ring.InstanceDesc + expected []addrsWithTokenRange + }{ + "one token per instance": { + instances: []ring.InstanceDesc{ + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{math.MaxUint32 / 6 * 1}}, + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 6 * 3}}, + {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}}, + }, + expected: []addrsWithTokenRange{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32}, + }, + }, + "MinUint32 and MaxUint32 are tokens in the ring": { + instances: []ring.InstanceDesc{ + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}}, + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}}, + }, + expected: []addrsWithTokenRange{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32}, + }, + }, + } + + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + subRing := newMockRing(tc.instances) + res, err := serverAddressesWithTokenRanges(subRing, tc.instances) + require.NoError(t, err) + require.Equal(t, tc.expected, res) + }) + } + +} + func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { logger := log.NewNopLogger() @@ -203,9 +221,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}}, } - it := newInstanceSortMergeIterator(instances) + it := bloomutils.NewInstanceSortMergeIterator(instances) for it.Next() { - t.Log(it.At().token, it.At().instance.Addr) + t.Log(it.At().MaxToken, it.At().Instance.Addr) } testCases := []struct { @@ -257,7 +275,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { }, }, { - name: "fingerprints with token ranges of a multiple instance are grouped", + name: "fingerprints with token ranges of multiple instances are grouped", chunks: []*logproto.GroupedChunkRefs{ // instance 1 {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, @@ -327,8 +345,8 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { var _ ring.ReadRing = &mockRing{} func newMockRing(instances []ring.InstanceDesc) *mockRing { - it := newInstanceSortMergeIterator(instances) - ranges := make([]instanceWithToken, 0) + it := bloomutils.NewInstanceSortMergeIterator(instances) + ranges := make([]bloomutils.InstanceWithTokenRange, 0) for it.Next() { ranges = append(ranges, it.At()) } @@ -340,21 +358,21 @@ func newMockRing(instances []ring.InstanceDesc) *mockRing { type mockRing struct { instances []ring.InstanceDesc - ranges []instanceWithToken + ranges []bloomutils.InstanceWithTokenRange } // Get implements ring.ReadRing. func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { idx, _ := sort.Find(len(r.ranges), func(i int) int { - if r.ranges[i].token < key { + if r.ranges[i].MaxToken < key { return 1 } - if r.ranges[i].token > key { + if r.ranges[i].MaxToken > key { return -1 } return 0 }) - return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].instance}}, nil + return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].Instance}}, nil } // GetAllHealthy implements ring.ReadRing. diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 17063a4903d23..c5c6964038931 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -164,7 +164,7 @@ type FilterRequest struct { // taskMergeIterator implements v1.Iterator type taskMergeIterator struct { curr FilterRequest - heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]] + heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]] tasks []Task day time.Time err error @@ -181,14 +181,14 @@ func newTaskMergeIterator(day time.Time, tasks ...Task) v1.PeekingIterator[v1.Re } func (it *taskMergeIterator) init() { - sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) + sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) for i := range it.tasks { - iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) + iter := v1.NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) sequences = append(sequences, v1.NewPeekingIter(iter)) } it.heap = v1.NewHeapIterator( - func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool { - return i.val.Fingerprint < j.val.Fingerprint + func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool { + return i.Value().Fingerprint < j.Value().Fingerprint }, sequences..., ) @@ -202,10 +202,10 @@ func (it *taskMergeIterator) Next() bool { } group := it.heap.At() - task := it.tasks[group.idx] + task := it.tasks[group.Index()] - it.curr.Fp = model.Fingerprint(group.val.Fingerprint) - it.curr.Chks = convertToChunkRefs(group.val.Refs) + it.curr.Fp = model.Fingerprint(group.Value().Fingerprint) + it.curr.Chks = convertToChunkRefs(group.Value().Refs) it.curr.Searches = convertToSearches(task.Request.Filters) it.curr.Response = task.ResCh it.curr.Error = task.ErrCh diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index dc95da534d0a6..33477e9052fb0 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -12,67 +12,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) -type IndexedValue[T any] struct { - idx int - val T -} - -type IterWithIndex[T any] struct { - v1.Iterator[T] - zero T // zero value of T - cache IndexedValue[T] -} - -func (it *IterWithIndex[T]) At() IndexedValue[T] { - it.cache.val = it.Iterator.At() - return it.cache -} - -func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] { - return &IterWithIndex[T]{ - Iterator: iter, - cache: IndexedValue[T]{idx: idx}, - } -} - -type SliceIterWithIndex[T any] struct { - xs []T // source slice - pos int // position within the slice - zero T // zero value of T - cache IndexedValue[T] -} - -func (it *SliceIterWithIndex[T]) Next() bool { - it.pos++ - return it.pos < len(it.xs) -} - -func (it *SliceIterWithIndex[T]) Err() error { - return nil -} - -func (it *SliceIterWithIndex[T]) At() IndexedValue[T] { - it.cache.val = it.xs[it.pos] - return it.cache -} - -func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) { - if it.pos+1 >= len(it.xs) { - it.cache.val = it.zero - return it.cache, false - } - it.cache.val = it.xs[it.pos+1] - return it.cache, true -} - -func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] { - return &SliceIterWithIndex[T]{ - xs: xs, - pos: -1, - cache: IndexedValue[T]{idx: idx}, - } -} - func getDayTime(ts model.Time) time.Time { return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC) } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 70e3d89eb2143..08c6d2a1306a4 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -10,34 +10,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) -func TestSliceIterWithIndex(t *testing.T) { - t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) { - xs := []string{"a", "b", "c"} - it := NewSliceIterWithIndex(xs, 123) - - // peek at first item - p, ok := it.Peek() - require.True(t, ok) - require.Equal(t, "a", p.val) - require.Equal(t, 123, p.idx) - - // proceed to first item - require.True(t, it.Next()) - require.Equal(t, "a", it.At().val) - require.Equal(t, 123, it.At().idx) - - // proceed to second and third item - require.True(t, it.Next()) - require.True(t, it.Next()) - - // peek at non-existing fourth item - p, ok = it.Peek() - require.False(t, ok) - require.Equal(t, "", p.val) // "" is zero value for type string - require.Equal(t, 123, p.idx) - }) -} - func TestGetFromThrough(t *testing.T) { chunks := []*logproto.ShortRef{ {From: 0, Through: 6}, diff --git a/pkg/bloomutils/iter.go b/pkg/bloomutils/iter.go new file mode 100644 index 0000000000000..fdbe4a5e62587 --- /dev/null +++ b/pkg/bloomutils/iter.go @@ -0,0 +1,37 @@ +package bloomutils + +import ( + "io" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" +) + +// sortMergeIterator implements v1.Iterator +type sortMergeIterator[T any, C comparable, R any] struct { + curr *R + heap *v1.HeapIterator[v1.IndexedValue[C]] + items []T + transform func(T, C, *R) *R + err error +} + +func (it *sortMergeIterator[T, C, R]) Next() bool { + ok := it.heap.Next() + if !ok { + it.err = io.EOF + return false + } + + group := it.heap.At() + it.curr = it.transform(it.items[group.Index()], group.Value(), it.curr) + + return true +} + +func (it *sortMergeIterator[T, C, R]) At() R { + return *it.curr +} + +func (it *sortMergeIterator[T, C, R]) Err() error { + return it.err +} diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go new file mode 100644 index 0000000000000..08e62a13acb71 --- /dev/null +++ b/pkg/bloomutils/ring.go @@ -0,0 +1,146 @@ +// This file contains a bunch of utility functions for bloom components. +// TODO: Find a better location for this package + +package bloomutils + +import ( + "math" + "sort" + + "github.com/grafana/dskit/ring" + "golang.org/x/exp/slices" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" +) + +type InstanceWithTokenRange struct { + Instance ring.InstanceDesc + MinToken, MaxToken uint32 +} + +func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck { + if token < i.MinToken { + return v1.Before + } else if token > i.MaxToken { + return v1.After + } + return v1.Overlap +} + +type InstancesWithTokenRange []InstanceWithTokenRange + +func (i InstancesWithTokenRange) Contains(token uint32) bool { + for _, instance := range i { + if instance.Cmp(token) == v1.Overlap { + return true + } + } + return false +} + +// GetInstanceTokenRange calculates the token range for a specific instance +// with given id based on the first token in the ring. +// This assumes that each instance in the ring is configured with only a single +// token. +func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { + + // Sorting the tokens of the instances would not be necessary if there is + // only a single token per instances, however, since we only assume one + // token, but don't enforce one token, we keep the sorting. + for _, inst := range instances { + sort.Slice(inst.Tokens, func(i, j int) bool { + return inst.Tokens[i] < inst.Tokens[j] + }) + } + + // Sort instances + sort.Slice(instances, func(i, j int) bool { + return instances[i].Tokens[0] < instances[j].Tokens[0] + }) + + idx := slices.IndexFunc(instances, func(inst ring.InstanceDesc) bool { + return inst.Id == id + }) + + // instance with Id == id not found + if idx == -1 { + return InstancesWithTokenRange{} + } + + i := uint32(idx) + n := uint32(len(instances)) + step := math.MaxUint32 / n + + minToken := step * i + maxToken := step*i + step - 1 + if i == n-1 { + // extend the last token tange to MaxUint32 + maxToken = math.MaxUint32 + } + + return InstancesWithTokenRange{ + {MinToken: minToken, MaxToken: maxToken, Instance: instances[i]}, + } +} + +// GetInstancesWithTokenRanges calculates the token ranges for a specific +// instance with given id based on all tokens in the ring. +// If the instances in the ring are configured with a single token, such as the +// bloom compactor, use GetInstanceWithTokenRange() instead. +func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { + servers := make([]InstanceWithTokenRange, 0, len(instances)) + it := NewInstanceSortMergeIterator(instances) + var firstInst ring.InstanceDesc + var lastToken uint32 + for it.Next() { + if firstInst.Id == "" { + firstInst = it.At().Instance + } + if it.At().Instance.Id == id { + servers = append(servers, it.At()) + } + lastToken = it.At().MaxToken + } + // append token range from lastToken+1 to MaxUint32 + // only if the instance with the first token is the current one + if len(servers) > 0 && firstInst.Id == id { + servers = append(servers, InstanceWithTokenRange{ + MinToken: lastToken + 1, + MaxToken: math.MaxUint32, + Instance: servers[0].Instance, + }) + } + return servers +} + +// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements +// where the token of the elements are sorted in ascending order. +func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] { + it := &sortMergeIterator[ring.InstanceDesc, uint32, InstanceWithTokenRange]{ + items: instances, + transform: func(item ring.InstanceDesc, val uint32, prev *InstanceWithTokenRange) *InstanceWithTokenRange { + var prevToken uint32 + if prev != nil { + prevToken = prev.MaxToken + 1 + } + return &InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val} + }, + } + sequences := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances)) + for i := range instances { + sort.Slice(instances[i].Tokens, func(a, b int) bool { + return instances[i].Tokens[a] < instances[i].Tokens[b] + }) + iter := v1.NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i) + sequences = append(sequences, v1.NewPeekingIter[v1.IndexedValue[uint32]](iter)) + } + it.heap = v1.NewHeapIterator( + func(i, j v1.IndexedValue[uint32]) bool { + return i.Value() < j.Value() + }, + sequences..., + ) + it.err = nil + + return it +} diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go new file mode 100644 index 0000000000000..30da072021edf --- /dev/null +++ b/pkg/bloomutils/ring_test.go @@ -0,0 +1,112 @@ +package bloomutils + +import ( + "math" + "testing" + + "github.com/grafana/dskit/ring" + "github.com/stretchr/testify/require" +) + +func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { + input := []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{5, 9}}, + {Id: "2", Tokens: []uint32{3, 7}}, + {Id: "3", Tokens: []uint32{1}}, + } + expected := []InstanceWithTokenRange{ + {Instance: input[2], MinToken: 0, MaxToken: 1}, + {Instance: input[1], MinToken: 2, MaxToken: 3}, + {Instance: input[0], MinToken: 4, MaxToken: 5}, + {Instance: input[1], MinToken: 6, MaxToken: 7}, + {Instance: input[0], MinToken: 8, MaxToken: 9}, + } + + var i int + it := NewInstanceSortMergeIterator(input) + for it.Next() { + t.Log(expected[i], it.At()) + require.Equal(t, expected[i], it.At()) + i++ + } +} + +func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) { + t.Run("instance does not own first token in the ring", func(t *testing.T) { + input := []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{5, 9}}, + {Id: "2", Tokens: []uint32{3, 7}}, + {Id: "3", Tokens: []uint32{1}}, + } + expected := InstancesWithTokenRange{ + {Instance: input[1], MinToken: 2, MaxToken: 3}, + {Instance: input[1], MinToken: 6, MaxToken: 7}, + } + + result := GetInstancesWithTokenRanges("2", input) + require.Equal(t, expected, result) + }) + + t.Run("instance owns first token in the ring", func(t *testing.T) { + input := []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{5, 9}}, + {Id: "2", Tokens: []uint32{3, 7}}, + {Id: "3", Tokens: []uint32{1}}, + } + expected := InstancesWithTokenRange{ + {Instance: input[2], MinToken: 0, MaxToken: 1}, + {Instance: input[2], MinToken: 10, MaxToken: math.MaxUint32}, + } + + result := GetInstancesWithTokenRanges("3", input) + require.Equal(t, expected, result) + }) +} + +func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) { + for name, tc := range map[string]struct { + id string + input []ring.InstanceDesc + expected InstancesWithTokenRange + }{ + "first instance includes 0 token": { + id: "3", + input: []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{3}}, + {Id: "2", Tokens: []uint32{5}}, + {Id: "3", Tokens: []uint32{1}}, + }, + expected: InstancesWithTokenRange{ + {Instance: ring.InstanceDesc{Id: "3", Tokens: []uint32{1}}, MinToken: 0, MaxToken: math.MaxUint32/3 - 1}, + }, + }, + "middle instance": { + id: "1", + input: []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{3}}, + {Id: "2", Tokens: []uint32{5}}, + {Id: "3", Tokens: []uint32{1}}, + }, + expected: InstancesWithTokenRange{ + {Instance: ring.InstanceDesc{Id: "1", Tokens: []uint32{3}}, MinToken: math.MaxUint32 / 3, MaxToken: math.MaxUint32/3*2 - 1}, + }, + }, + "last instance includes MaxUint32 token": { + id: "2", + input: []ring.InstanceDesc{ + {Id: "1", Tokens: []uint32{3}}, + {Id: "2", Tokens: []uint32{5}}, + {Id: "3", Tokens: []uint32{1}}, + }, + expected: InstancesWithTokenRange{ + {Instance: ring.InstanceDesc{Id: "2", Tokens: []uint32{5}}, MinToken: math.MaxUint32 / 3 * 2, MaxToken: math.MaxUint32}, + }, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + result := GetInstanceWithTokenRange(tc.id, tc.input) + require.Equal(t, tc.expected, result) + }) + } +} diff --git a/pkg/storage/bloom/v1/iter.go b/pkg/storage/bloom/v1/iter.go new file mode 100644 index 0000000000000..b1b460fb64207 --- /dev/null +++ b/pkg/storage/bloom/v1/iter.go @@ -0,0 +1,70 @@ +package v1 + +type IndexedValue[T any] struct { + idx int + val T +} + +func (iv IndexedValue[T]) Value() T { + return iv.val +} + +func (iv IndexedValue[T]) Index() int { + return iv.idx +} + +type IterWithIndex[T any] struct { + Iterator[T] + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *IterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.Iterator.At() + return it.cache +} + +func NewIterWithIndex[T any](iter Iterator[T], idx int) Iterator[IndexedValue[T]] { + return &IterWithIndex[T]{ + Iterator: iter, + cache: IndexedValue[T]{idx: idx}, + } +} + +type SliceIterWithIndex[T any] struct { + xs []T // source slice + pos int // position within the slice + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *SliceIterWithIndex[T]) Next() bool { + it.pos++ + return it.pos < len(it.xs) +} + +func (it *SliceIterWithIndex[T]) Err() error { + return nil +} + +func (it *SliceIterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.xs[it.pos] + return it.cache +} + +func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) { + if it.pos+1 >= len(it.xs) { + it.cache.val = it.zero + return it.cache, false + } + it.cache.val = it.xs[it.pos+1] + return it.cache, true +} + +func NewSliceIterWithIndex[T any](xs []T, idx int) PeekingIterator[IndexedValue[T]] { + return &SliceIterWithIndex[T]{ + xs: xs, + pos: -1, + cache: IndexedValue[T]{idx: idx}, + } +} diff --git a/pkg/storage/bloom/v1/iter_test.go b/pkg/storage/bloom/v1/iter_test.go new file mode 100644 index 0000000000000..3ec8ead536e75 --- /dev/null +++ b/pkg/storage/bloom/v1/iter_test.go @@ -0,0 +1,35 @@ +package v1 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSliceIterWithIndex(t *testing.T) { + t.Run("SliceIterWithIndex implements PeekingIterator interface", func(t *testing.T) { + xs := []string{"a", "b", "c"} + it := NewSliceIterWithIndex(xs, 123) + + // peek at first item + p, ok := it.Peek() + require.True(t, ok) + require.Equal(t, "a", p.val) + require.Equal(t, 123, p.idx) + + // proceed to first item + require.True(t, it.Next()) + require.Equal(t, "a", it.At().val) + require.Equal(t, 123, it.At().idx) + + // proceed to second and third item + require.True(t, it.Next()) + require.True(t, it.Next()) + + // peek at non-existing fourth item + p, ok = it.Peek() + require.False(t, ok) + require.Equal(t, "", p.val) // "" is zero value for type string + require.Equal(t, 123, p.idx) + }) +}