diff --git a/pkg/bloomcompactor/batch.go b/pkg/bloomcompactor/batch.go index 920bff1decc8f..e9fae9f9df0f0 100644 --- a/pkg/bloomcompactor/batch.go +++ b/pkg/bloomcompactor/batch.go @@ -132,7 +132,7 @@ func newBatchedChunkLoader( time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, - logql_log.NewNoopPipeline().ForStream(c.Metric), + logql_log.NewNoopPipeline().ForStream(nil), ) if err != nil { diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index cc96cc7219e8d..dd5a9c96ca811 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -2,6 +2,10 @@ package bloomcompactor import ( "context" + "fmt" + "math" + "slices" + "sort" "sync" "time" @@ -193,23 +197,120 @@ func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iter } // ownsTenant returns the ownership range for the tenant, if the compactor owns the tenant, and an error. -func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error) { +func (c *Compactor) ownsTenant(tenant string) ([]v1.FingerprintBounds, bool, error) { tenantRing, owned := c.sharding.OwnsTenant(tenant) if !owned { - return v1.FingerprintBounds{}, false, nil + return nil, false, nil } + // TOOD(owen-d): use .GetTokenRangesForInstance() + // when it's supported for non zone-aware rings + // instead of doing all this manually + rs, err := tenantRing.GetAllHealthy(RingOp) if err != nil { - return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting ring healthy instances") - + return nil, false, errors.Wrap(err, "getting ring healthy instances") } - keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Uint64Range) + ranges, err := tokenRangesForInstance(c.cfg.Ring.InstanceID, rs.Instances) if err != nil { - return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range") + return nil, false, errors.Wrap(err, "getting token ranges for instance") } - return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil + + keyspaces := bloomutils.KeyspacesFromTokenRanges(ranges) + return keyspaces, true, nil +} + +func tokenRangesForInstance(id string, instances []ring.InstanceDesc) (ranges ring.TokenRanges, err error) { + var ownedTokens map[uint32]struct{} + + // lifted from grafana/dskit/ring/model.go <*Desc>.GetTokens() + toks := make([][]uint32, 0, len(instances)) + for _, instance := range instances { + if instance.Id == id { + ranges = make(ring.TokenRanges, 0, 2*(len(instance.Tokens)+1)) + ownedTokens = make(map[uint32]struct{}, len(instance.Tokens)) + for _, tok := range instance.Tokens { + ownedTokens[tok] = struct{}{} + } + } + + // Tokens may not be sorted for an older version which, so we enforce sorting here. + tokens := instance.Tokens + if !sort.IsSorted(ring.Tokens(tokens)) { + sort.Sort(ring.Tokens(tokens)) + } + + toks = append(toks, tokens) + } + + if cap(ranges) == 0 { + return nil, fmt.Errorf("instance %s not found", id) + } + + allTokens := ring.MergeTokens(toks) + if len(allTokens) == 0 { + return nil, errors.New("no tokens in the ring") + } + + // mostly lifted from grafana/dskit/ring/token_range.go <*Ring>.GetTokenRangesForInstance() + + // non-zero value means we're now looking for start of the range. Zero value means we're looking for next end of range (ie. token owned by this instance). + rangeEnd := uint32(0) + + // if this instance claimed the first token, it owns the wrap-around range, which we'll break into two separate ranges + firstToken := allTokens[0] + _, ownsFirstToken := ownedTokens[firstToken] + + if ownsFirstToken { + // we'll start by looking for the beginning of the range that ends with math.MaxUint32 + rangeEnd = math.MaxUint32 + } + + // walk the ring backwards, alternating looking for ends and starts of ranges + for i := len(allTokens) - 1; i > 0; i-- { + token := allTokens[i] + _, owned := ownedTokens[token] + + if rangeEnd == 0 { + // we're looking for the end of the next range + if owned { + rangeEnd = token - 1 + } + } else { + // we have a range end, and are looking for the start of the range + if !owned { + ranges = append(ranges, rangeEnd, token) + rangeEnd = 0 + } + } + } + + // finally look at the first token again + // - if we have a range end, check if we claimed token 0 + // - if we don't, we have our start + // - if we do, the start is 0 + // - if we don't have a range end, check if we claimed token 0 + // - if we don't, do nothing + // - if we do, add the range of [0, token-1] + // - BUT, if the token itself is 0, do nothing, because we don't own the tokens themselves (we should be covered by the already added range that ends with MaxUint32) + + if rangeEnd == 0 { + if ownsFirstToken && firstToken != 0 { + ranges = append(ranges, firstToken-1, 0) + } + } else { + if ownsFirstToken { + ranges = append(ranges, rangeEnd, 0) + } else { + ranges = append(ranges, rangeEnd, firstToken) + } + } + + // Ensure returned ranges are sorted. + slices.Sort(ranges) + + return ranges, nil } // runs a single round of compaction for all relevant tenants and tables @@ -266,25 +367,28 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil { c.metrics.tenantsDiscovered.Inc() tenant := tenants.At() - ownershipRange, owns, err := c.ownsTenant(tenant) + ownershipRanges, owns, err := c.ownsTenant(tenant) if err != nil { return errors.Wrap(err, "checking tenant ownership") } - level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns) + level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ranges", len(ownershipRanges), "owns", owns) if !owns { c.metrics.tenantsSkipped.Inc() continue } c.metrics.tenantsOwned.Inc() - select { - case ch <- tenantTable{ - tenant: tenant, - table: table, - ownershipRange: ownershipRange, - }: - case <-ctx.Done(): - return ctx.Err() + for _, ownershipRange := range ownershipRanges { + + select { + case ch <- tenantTable{ + tenant: tenant, + table: table, + ownershipRange: ownershipRange, + }: + case <-ctx.Done(): + return ctx.Err() + } } } diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go index 475ba8ec0585d..097e04d2a39a6 100644 --- a/pkg/bloomcompactor/bloomcompactor_test.go +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -113,7 +114,7 @@ func TestCompactor_ownsTenant(t *testing.T) { require.NoError(t, err) if ownsTenant { compactorOwnsTenant++ - compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange) + compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange...) } } require.Equal(t, tc.expectedCompactorsOwningTenant, compactorOwnsTenant) @@ -135,12 +136,6 @@ func TestCompactor_ownsTenant(t *testing.T) { coveredKeySpace.Max = boundsA.Max } - // Assert that the fingerprint key-space is evenly distributed across the compactors - // We do some adjustments if the key-space is not evenly distributable, so we use a delta of 10 - // to account for that and check that the key-space is reasonably evenly distributed. - fpPerTenant := math.MaxUint64 / uint64(tc.expectedCompactorsOwningTenant) - boundsLen := uint64(boundsA.Max - boundsA.Min) - require.InDelta(t, fpPerTenant, boundsLen, 10) } // Assert that the fingerprint key-space is complete require.True(t, coveredKeySpace.Equal(v1.NewBounds(0, math.MaxUint64))) @@ -195,3 +190,75 @@ func (m mockLimits) BloomFalsePositiveRate(_ string) float64 { func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int { panic("implement me") } + +func TestTokenRangesForInstance(t *testing.T) { + desc := func(id int, tokens ...uint32) ring.InstanceDesc { + return ring.InstanceDesc{Id: fmt.Sprintf("%d", id), Tokens: tokens} + } + + tests := map[string]struct { + input []ring.InstanceDesc + exp map[string]ring.TokenRanges + err bool + }{ + "no nodes": { + input: []ring.InstanceDesc{}, + exp: map[string]ring.TokenRanges{ + "0": {0, math.MaxUint32}, // have to put one in here to trigger test + }, + err: true, + }, + "one node": { + input: []ring.InstanceDesc{ + desc(0, 0, 100), + }, + exp: map[string]ring.TokenRanges{ + "0": {0, math.MaxUint32}, + }, + }, + "two nodes": { + input: []ring.InstanceDesc{ + desc(0, 25, 75), + desc(1, 10, 50, 100), + }, + exp: map[string]ring.TokenRanges{ + "0": {10, 24, 50, 74}, + "1": {0, 9, 25, 49, 75, math.MaxUint32}, + }, + }, + "consecutive tokens": { + input: []ring.InstanceDesc{ + desc(0, 99), + desc(1, 100), + }, + exp: map[string]ring.TokenRanges{ + "0": {0, 98, 100, math.MaxUint32}, + "1": {99, 99}, + }, + }, + "extremes": { + input: []ring.InstanceDesc{ + desc(0, 0), + desc(1, math.MaxUint32), + }, + exp: map[string]ring.TokenRanges{ + "0": {math.MaxUint32, math.MaxUint32}, + "1": {0, math.MaxUint32 - 1}, + }, + }, + } + + for desc, test := range tests { + t.Run(desc, func(t *testing.T) { + for id := range test.exp { + ranges, err := tokenRangesForInstance(id, test.input) + if test.err { + require.Error(t, err) + continue + } + require.NoError(t, err) + require.Equal(t, test.exp[id], ranges) + } + }) + } +} diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index b3246fd5876ab..bc58bf09c8865 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -3,14 +3,13 @@ package bloomutils import ( - "errors" "fmt" "math" "sort" "github.com/grafana/dskit/ring" + "github.com/prometheus/common/model" "golang.org/x/exp/constraints" - "golang.org/x/exp/slices" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) @@ -72,44 +71,16 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool { return false } -// KeyRangeForInstance calculates the token range for a specific instance -// with given id based on the first token in the ring. -// This assumes that each instance in the ring is configured with only a single -// token. -func KeyRangeForInstance[T constraints.Unsigned](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) { - - // Sort instances -- they may not be sorted - // because they're usually accessed by looking up the tokens (which are sorted) - sort.Slice(instances, func(i, j int) bool { - return instances[i].Tokens[0] < instances[j].Tokens[0] - }) - - idx := slices.IndexFunc(instances, func(inst ring.InstanceDesc) bool { - return inst.Id == id - }) - - // instance with Id == id not found - if idx == -1 { - return Range[T]{}, ring.ErrInstanceNotFound +// TODO(owen-d): use https://github.com/grafana/loki/pull/11975 after merge +func KeyspacesFromTokenRanges(tokenRanges ring.TokenRanges) []v1.FingerprintBounds { + keyspaces := make([]v1.FingerprintBounds, 0, len(tokenRanges)/2) + for i := 0; i < len(tokenRanges)-1; i += 2 { + keyspaces = append(keyspaces, v1.FingerprintBounds{ + Min: model.Fingerprint(tokenRanges[i]) << 32, + Max: model.Fingerprint(tokenRanges[i+1])<<32 | model.Fingerprint(math.MaxUint32), + }) } - - diff := keyspace.Max - keyspace.Min - i := T(idx) - n := T(len(instances)) - - if diff < n { - return Range[T]{}, errors.New("keyspace is smaller than amount of instances") - } - - step := diff / n - min := step * i - max := step*i + step - 1 - if i == n-1 { - // extend the last token tange to MaxUint32 - max = (keyspace.Max - keyspace.Min) - } - - return Range[T]{min, max}, nil + return keyspaces } // NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements diff --git a/pkg/bloomutils/ring_test.go b/pkg/bloomutils/ring_test.go index c9ff6cf5e1d60..47ebb4766490f 100644 --- a/pkg/bloomutils/ring_test.go +++ b/pkg/bloomutils/ring_test.go @@ -1,11 +1,14 @@ package bloomutils import ( + "fmt" "math" "testing" "github.com/grafana/dskit/ring" "github.com/stretchr/testify/require" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) func TestBloomGatewayClient_InstanceSortMergeIterator(t *testing.T) { @@ -40,45 +43,34 @@ func uint64Range(min, max uint64) Range[uint64] { return Range[uint64]{min, max} } -func TestBloomGatewayClient_KeyRangeForInstance(t *testing.T) { - for name, tc := range map[string]struct { - id string - input []ring.InstanceDesc - expected Range[uint64] +func TestKeyspacesFromTokenRanges(t *testing.T) { + for i, tc := range []struct { + tokenRanges ring.TokenRanges + exp []v1.FingerprintBounds }{ - "first instance includes 0 token": { - id: "3", - input: []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{3}}, - {Id: "2", Tokens: []uint32{5}}, - {Id: "3", Tokens: []uint32{1}}, + { + tokenRanges: ring.TokenRanges{ + 0, math.MaxUint32 / 2, + math.MaxUint32/2 + 1, math.MaxUint32, }, - expected: uint64Range(0, math.MaxUint64/3-1), - }, - "middle instance": { - id: "1", - input: []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{3}}, - {Id: "2", Tokens: []uint32{5}}, - {Id: "3", Tokens: []uint32{1}}, + exp: []v1.FingerprintBounds{ + v1.NewBounds(0, math.MaxUint64/2), + v1.NewBounds(math.MaxUint64/2+1, math.MaxUint64), }, - expected: uint64Range(math.MaxUint64/3, math.MaxUint64/3*2-1), }, - "last instance includes MaxUint32 token": { - id: "2", - input: []ring.InstanceDesc{ - {Id: "1", Tokens: []uint32{3}}, - {Id: "2", Tokens: []uint32{5}}, - {Id: "3", Tokens: []uint32{1}}, + { + tokenRanges: ring.TokenRanges{ + 0, math.MaxUint8, + math.MaxUint16, math.MaxUint16 << 1, + }, + exp: []v1.FingerprintBounds{ + v1.NewBounds(0, 0xff00000000|math.MaxUint32), + v1.NewBounds(math.MaxUint16<<32, math.MaxUint16<<33|math.MaxUint32), }, - expected: uint64Range(math.MaxUint64/3*2, math.MaxUint64), }, } { - tc := tc - t.Run(name, func(t *testing.T) { - result, err := KeyRangeForInstance(tc.id, tc.input, Uint64Range) - require.NoError(t, err) - require.Equal(t, tc.expected, result) + t.Run(fmt.Sprint(i), func(t *testing.T) { + require.Equal(t, tc.exp, KeyspacesFromTokenRanges(tc.tokenRanges)) }) } } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 15ee955355a62..57c6e96a2b3d6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1446,7 +1446,9 @@ func (t *Loki) initBloomCompactorRing() (services.Service, error) { // is LegacyMode needed? // legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read) - rm, err := lokiring.NewRingManager(bloomCompactorRingKey, lokiring.ServerMode, t.Cfg.BloomCompactor.Ring, 1, 1, util_log.Logger, prometheus.DefaultRegisterer) + // TODO(owen-d): configurable num tokens, just use lifecycler config? + numTokens := 10 + rm, err := lokiring.NewRingManager(bloomCompactorRingKey, lokiring.ServerMode, t.Cfg.BloomCompactor.Ring, 1, numTokens, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, gerrors.Wrap(err, "error initializing bloom-compactor ring manager")