From d2286d417e43c1166a4100b8a8b8217dbc1aa9f6 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 15 Feb 2024 09:13:13 +0100 Subject: [PATCH] Simplify implementation of instance sort iterator Signed-off-by: Christian Haudum --- pkg/bloomutils/iter.go | 37 -------------------------- pkg/bloomutils/ring.go | 53 +++++++++++++++++++++---------------- pkg/bloomutils/ring_test.go | 5 ++++ 3 files changed, 35 insertions(+), 60 deletions(-) delete mode 100644 pkg/bloomutils/iter.go diff --git a/pkg/bloomutils/iter.go b/pkg/bloomutils/iter.go deleted file mode 100644 index fdbe4a5e62587..0000000000000 --- a/pkg/bloomutils/iter.go +++ /dev/null @@ -1,37 +0,0 @@ -package bloomutils - -import ( - "io" - - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" -) - -// sortMergeIterator implements v1.Iterator -type sortMergeIterator[T any, C comparable, R any] struct { - curr *R - heap *v1.HeapIterator[v1.IndexedValue[C]] - items []T - transform func(T, C, *R) *R - err error -} - -func (it *sortMergeIterator[T, C, R]) Next() bool { - ok := it.heap.Next() - if !ok { - it.err = io.EOF - return false - } - - group := it.heap.At() - it.curr = it.transform(it.items[group.Index()], group.Value(), it.curr) - - return true -} - -func (it *sortMergeIterator[T, C, R]) At() R { - return *it.curr -} - -func (it *sortMergeIterator[T, C, R]) Err() error { - return it.err -} diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index 20bb446ba15d3..6da275f607c22 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -107,31 +107,38 @@ func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) Insta // NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements // where the token of the elements are sorted in ascending order. func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] { - it := &sortMergeIterator[ring.InstanceDesc, uint32, InstanceWithTokenRange]{ - items: instances, - transform: func(item ring.InstanceDesc, val uint32, prev *InstanceWithTokenRange) *InstanceWithTokenRange { - var prevToken uint32 - if prev != nil { - prevToken = prev.MaxToken + 1 - } - return &InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val} - }, - } - sequences := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances)) - for i := range instances { - sort.Slice(instances[i].Tokens, func(a, b int) bool { - return instances[i].Tokens[a] < instances[i].Tokens[b] - }) - iter := v1.NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i) - sequences = append(sequences, v1.NewPeekingIter[v1.IndexedValue[uint32]](iter)) + + tokenIters := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances)) + for i, inst := range instances { + sort.Slice(inst.Tokens, func(a, b int) bool { return inst.Tokens[a] < inst.Tokens[b] }) + itr := v1.NewIterWithIndex(v1.NewSliceIter[uint32](inst.Tokens), i) + tokenIters = append(tokenIters, v1.NewPeekingIter[v1.IndexedValue[uint32]](itr)) } - it.heap = v1.NewHeapIterator( - func(i, j v1.IndexedValue[uint32]) bool { - return i.Value() < j.Value() + + heapIter := v1.NewHeapIterator[v1.IndexedValue[uint32]]( + func(iv1, iv2 v1.IndexedValue[uint32]) bool { + return iv1.Value() < iv2.Value() }, - sequences..., + tokenIters..., ) - it.err = nil - return it + prevToken := -1 + return v1.NewDedupingIter[v1.IndexedValue[uint32], InstanceWithTokenRange]( + func(iv v1.IndexedValue[uint32], iwtr InstanceWithTokenRange) bool { + return false + }, + func(iv v1.IndexedValue[uint32]) InstanceWithTokenRange { + minToken, maxToken := uint32(prevToken+1), iv.Value() + prevToken = int(maxToken) + return InstanceWithTokenRange{ + Instance: instances[iv.Index()], + MinToken: minToken, + MaxToken: maxToken, + } + }, + func(iv v1.IndexedValue[uint32], iwtr InstanceWithTokenRange) InstanceWithTokenRange { + panic("must not be called, because Eq() is always false") + }, + v1.NewPeekingIter(heapIter), + ) } diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go index 1346559372c30..6cac31949eef3 100644 --- a/pkg/bloomutils/ring_test.go +++ b/pkg/bloomutils/ring_test.go @@ -11,6 +11,11 @@ import ( ) func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { + // | 1 2 3 4 5 6 7 8 9 | + // ---------+----------------------------+ + // ID 1 | * * | + // ID 2 | * * | + // ID 3 | * | input := []ring.InstanceDesc{ {Id: "1", Tokens: []uint32{5, 9}}, {Id: "2", Tokens: []uint32{3, 7}},