diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 36dcf36ed1fb7..5a579f95fdb77 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -201,11 +201,11 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error } - ownershipBounds, err := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances) + keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Uint64Range) if err != nil { return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range") } - return ownershipBounds, true, nil + return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil } // runs a single round of compaction for all relevant tenants and tables diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 9a75e4e87c26b..28400749404ce 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -304,25 +304,23 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst for it.Next() { // We can use on of the tokens from the token range // to obtain all addresses for that token. - rs, err := subRing.Get(it.At().MaxToken, BlocksOwnerRead, bufDescs, bufHosts, bufZones) + rs, err := subRing.Get(it.At().TokenRange.Max, BlocksOwnerRead, bufDescs, bufHosts, bufZones) if err != nil { return nil, errors.Wrap(err, "bloom gateway get ring") } servers = append(servers, addrsWithTokenRange{ - id: it.At().Instance.Id, - addrs: rs.GetAddresses(), - minToken: it.At().MinToken, - maxToken: it.At().MaxToken, + id: it.At().Instance.Id, + addrs: rs.GetAddresses(), + tokenRange: it.At().TokenRange, }) } - if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 { + if len(servers) > 0 && servers[len(servers)-1].tokenRange.Max < math.MaxUint32 { // append the instance for the token range between the greates token and MaxUint32 servers = append(servers, addrsWithTokenRange{ - id: servers[0].id, - addrs: servers[0].addrs, - minToken: servers[len(servers)-1].maxToken + 1, - maxToken: math.MaxUint32, + id: servers[0].id, + addrs: servers[0].addrs, + tokenRange: bloomutils.NewTokenRange(servers[len(servers)-1].tokenRange.Max+1, math.MaxUint32), }) } return servers, nil @@ -334,18 +332,13 @@ type instanceWithToken struct { } type addrsWithTokenRange struct { - id string - addrs []string - minToken, maxToken uint32 + id string + addrs []string + tokenRange bloomutils.Range[uint32] } func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck { - if token < s.minToken { - return v1.Before - } else if token > s.maxToken { - return v1.After - } - return v1.Overlap + return s.tokenRange.Cmp(token) } type instanceWithFingerprints struct { diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index b1716de8150ea..440347d1b2487 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -19,6 +19,9 @@ import ( "github.com/grafana/loki/pkg/validation" ) +// short constructor +var newTr = bloomutils.NewTokenRange + func TestBloomGatewayClient(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -53,10 +56,10 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { {Fingerprint: 401}, // out of bounds, will be dismissed } servers := []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 100}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 101, maxToken: 200}, - {id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 201, maxToken: 300}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 301, maxToken: 400}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 100)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(101, 200)}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(201, 300)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(301, 400)}, } // partition fingerprints @@ -135,9 +138,9 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { {Fingerprint: 350}, } servers := []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 200}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 100, maxToken: 300}, - {id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: 200, maxToken: 400}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 200)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(100, 300)}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(200, 400)}, } // partition fingerprints @@ -174,10 +177,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}}, }, expected: []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3}, - {id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5}, - {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, math.MaxUint32/6*1)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/6*1+1, math.MaxUint32/6*3)}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(math.MaxUint32/6*3+1, math.MaxUint32/6*5)}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/6*5+1, math.MaxUint32)}, }, }, "MinUint32 and MaxUint32 are tokens in the ring": { @@ -186,10 +189,10 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}}, }, expected: []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3}, - {id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 0)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(1, math.MaxUint32/3)}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/3*1+1, math.MaxUint32/3*2)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/3*2+1, math.MaxUint32)}, }, }, } @@ -215,7 +218,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { it := bloomutils.NewInstanceSortMergeIterator(instances) for it.Next() { - t.Log(it.At().MaxToken, it.At().Instance.Addr) + t.Log(it.At().TokenRange.Max, it.At().Instance.Addr) } testCases := []struct { @@ -357,10 +360,10 @@ type mockRing struct { // Get implements ring.ReadRing. func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { idx, _ := sort.Find(len(r.ranges), func(i int) int { - if r.ranges[i].MaxToken < key { + if r.ranges[i].TokenRange.Max < key { return 1 } - if r.ranges[i].MaxToken > key { + if r.ranges[i].TokenRange.Max > key { return -1 } return 0 diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index 6da275f607c22..102d3ed5e9a5e 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -1,33 +1,62 @@ // This file contains a bunch of utility functions for bloom components. -// TODO: Find a better location for this package package bloomutils import ( + "errors" + "fmt" "math" "sort" "github.com/grafana/dskit/ring" - "github.com/prometheus/common/model" + "golang.org/x/exp/constraints" "golang.org/x/exp/slices" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) -type InstanceWithTokenRange struct { - Instance ring.InstanceDesc - MinToken, MaxToken uint32 +var ( + Uint32Range = Range[uint32]{Min: 0, Max: math.MaxUint32} + Uint64Range = Range[uint64]{Min: 0, Max: math.MaxUint64} +) + +type Range[T constraints.Integer] struct { + Min, Max T } -func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck { - if token < i.MinToken { +func (r Range[T]) String() string { + return fmt.Sprintf("%016x-%016x", r.Min, r.Max) +} + +func (r Range[T]) Less(other Range[T]) bool { + if r.Min != other.Min { + return r.Min < other.Min + } + return r.Max <= other.Max +} + +func (r Range[T]) Cmp(t T) v1.BoundsCheck { + if t < r.Min { return v1.Before - } else if token > i.MaxToken { + } else if t > r.Max { return v1.After } return v1.Overlap } +func NewTokenRange(min, max uint32) Range[uint32] { + return Range[uint32]{min, max} +} + +type InstanceWithTokenRange struct { + Instance ring.InstanceDesc + TokenRange Range[uint32] +} + +func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck { + return i.TokenRange.Cmp(token) +} + type InstancesWithTokenRange []InstanceWithTokenRange func (i InstancesWithTokenRange) Contains(token uint32) bool { @@ -39,11 +68,11 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool { return false } -// GetInstanceWithTokenRange calculates the token range for a specific instance +// KeyRangeForInstance calculates the token range for a specific instance // with given id based on the first token in the ring. // This assumes that each instance in the ring is configured with only a single // token. -func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.FingerprintBounds, error) { +func KeyRangeForInstance[T constraints.Integer](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) { // Sort instances -- they may not be sorted // because they're usually accessed by looking up the tokens (which are sorted) @@ -57,57 +86,31 @@ func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) (v1.Fin // instance with Id == id not found if idx == -1 { - return v1.FingerprintBounds{}, ring.ErrInstanceNotFound + return Range[T]{}, ring.ErrInstanceNotFound } - i := uint64(idx) - n := uint64(len(instances)) - step := math.MaxUint64 / n + diff := keyspace.Max - keyspace.Min + i := T(idx) + n := T(len(instances)) - minToken := model.Fingerprint(step * i) - maxToken := model.Fingerprint(step*i + step - 1) + if diff < n { + return Range[T]{}, errors.New("keyspace is smaller than amount of instances") + } + + step := diff / n + min := step * i + max := step*i + step - 1 if i == n-1 { // extend the last token tange to MaxUint32 - maxToken = math.MaxUint64 + max = (keyspace.Max - keyspace.Min) } - return v1.NewBounds(minToken, maxToken), nil -} - -// GetInstancesWithTokenRanges calculates the token ranges for a specific -// instance with given id based on all tokens in the ring. -// If the instances in the ring are configured with a single token, such as the -// bloom compactor, use GetInstanceWithTokenRange() instead. -func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange { - servers := make([]InstanceWithTokenRange, 0, len(instances)) - it := NewInstanceSortMergeIterator(instances) - var firstInst ring.InstanceDesc - var lastToken uint32 - for it.Next() { - if firstInst.Id == "" { - firstInst = it.At().Instance - } - if it.At().Instance.Id == id { - servers = append(servers, it.At()) - } - lastToken = it.At().MaxToken - } - // append token range from lastToken+1 to MaxUint32 - // only if the instance with the first token is the current one - if len(servers) > 0 && firstInst.Id == id { - servers = append(servers, InstanceWithTokenRange{ - MinToken: lastToken + 1, - MaxToken: math.MaxUint32, - Instance: servers[0].Instance, - }) - } - return servers + return Range[T]{min, max}, nil } // NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements // where the token of the elements are sorted in ascending order. func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] { - tokenIters := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances)) for i, inst := range instances { sort.Slice(inst.Tokens, func(a, b int) bool { return inst.Tokens[a] < inst.Tokens[b] }) @@ -131,9 +134,8 @@ func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[Ins minToken, maxToken := uint32(prevToken+1), iv.Value() prevToken = int(maxToken) return InstanceWithTokenRange{ - Instance: instances[iv.Index()], - MinToken: minToken, - MaxToken: maxToken, + Instance: instances[iv.Index()], + TokenRange: NewTokenRange(minToken, maxToken), } }, func(iv v1.IndexedValue[uint32], iwtr InstanceWithTokenRange) InstanceWithTokenRange { diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go index 6cac31949eef3..c9ff6cf5e1d60 100644 --- a/pkg/bloomutils/ring_test.go +++ b/pkg/bloomutils/ring_test.go @@ -6,27 +6,25 @@ import ( "github.com/grafana/dskit/ring" "github.com/stretchr/testify/require" - - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) -func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { - // | 1 2 3 4 5 6 7 8 9 | - // ---------+----------------------------+ - // ID 1 | * * | - // ID 2 | * * | - // ID 3 | * | +func TestBloomGatewayClient_InstanceSortMergeIterator(t *testing.T) { + // | 0 1 2 3 4 5 6 7 8 9 | + // ---------+---------------------+ + // ID 1 | ***o ***o | + // ID 2 | ***o ***o | + // ID 3 | **o | input := []ring.InstanceDesc{ {Id: "1", Tokens: []uint32{5, 9}}, {Id: "2", Tokens: []uint32{3, 7}}, {Id: "3", Tokens: []uint32{1}}, } expected := []InstanceWithTokenRange{ - {Instance: input[2], MinToken: 0, MaxToken: 1}, - {Instance: input[1], MinToken: 2, MaxToken: 3}, - {Instance: input[0], MinToken: 4, MaxToken: 5}, - {Instance: input[1], MinToken: 6, MaxToken: 7}, - {Instance: input[0], MinToken: 8, MaxToken: 9}, + {Instance: input[2], TokenRange: NewTokenRange(0, 1)}, + {Instance: input[1], TokenRange: NewTokenRange(2, 3)}, + {Instance: input[0], TokenRange: NewTokenRange(4, 5)}, + {Instance: input[1], TokenRange: NewTokenRange(6, 7)}, + {Instance: input[0], TokenRange: NewTokenRange(8, 9)}, } var i int @@ -38,43 +36,15 @@ func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) { } } -func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) { - t.Run("instance does not own first token in the ring", func(t *testing.T) { - input := []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{5, 9}}, - {Id: "2", Tokens: []uint32{3, 7}}, - {Id: "3", Tokens: []uint32{1}}, - } - expected := InstancesWithTokenRange{ - {Instance: input[1], MinToken: 2, MaxToken: 3}, - {Instance: input[1], MinToken: 6, MaxToken: 7}, - } - - result := GetInstancesWithTokenRanges("2", input) - require.Equal(t, expected, result) - }) - - t.Run("instance owns first token in the ring", func(t *testing.T) { - input := []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{5, 9}}, - {Id: "2", Tokens: []uint32{3, 7}}, - {Id: "3", Tokens: []uint32{1}}, - } - expected := InstancesWithTokenRange{ - {Instance: input[2], MinToken: 0, MaxToken: 1}, - {Instance: input[2], MinToken: 10, MaxToken: math.MaxUint32}, - } - - result := GetInstancesWithTokenRanges("3", input) - require.Equal(t, expected, result) - }) +func uint64Range(min, max uint64) Range[uint64] { + return Range[uint64]{min, max} } -func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) { +func TestBloomGatewayClient_KeyRangeForInstance(t *testing.T) { for name, tc := range map[string]struct { id string input []ring.InstanceDesc - expected v1.FingerprintBounds + expected Range[uint64] }{ "first instance includes 0 token": { id: "3", @@ -83,7 +53,7 @@ func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) { {Id: "2", Tokens: []uint32{5}}, {Id: "3", Tokens: []uint32{1}}, }, - expected: v1.NewBounds(0, math.MaxUint64/3-1), + expected: uint64Range(0, math.MaxUint64/3-1), }, "middle instance": { id: "1", @@ -92,7 +62,7 @@ func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) { {Id: "2", Tokens: []uint32{5}}, {Id: "3", Tokens: []uint32{1}}, }, - expected: v1.NewBounds(math.MaxUint64/3, math.MaxUint64/3*2-1), + expected: uint64Range(math.MaxUint64/3, math.MaxUint64/3*2-1), }, "last instance includes MaxUint32 token": { id: "2", @@ -101,12 +71,12 @@ func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) { {Id: "2", Tokens: []uint32{5}}, {Id: "3", Tokens: []uint32{1}}, }, - expected: v1.NewBounds(math.MaxUint64/3*2, math.MaxUint64), + expected: uint64Range(math.MaxUint64/3*2, math.MaxUint64), }, } { tc := tc t.Run(name, func(t *testing.T) { - result, err := GetInstanceWithTokenRange(tc.id, tc.input) + result, err := KeyRangeForInstance(tc.id, tc.input, Uint64Range) require.NoError(t, err) require.Equal(t, tc.expected, result) })