Skip to content

Commit

Permalink
Extract shard annotation into new FingerprintFilter interface (grafan…
Browse files Browse the repository at this point in the history
…a#11834)

**What this PR does / why we need it**:
This PR extracts the `Match` and `Bounds` methods of TSDB's
`ShardAnnotation` into a new interface `FingerprintFilter`.
This will allow us to query the index for any fingerprint bounds, not
just power of 2 shard factors.

We now use this in the bloom compactor by calling the index `ForSeries`
([here][1]) passing a fingerprint filter with the bounds of the FP range
owned by the compactor.

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](grafana@0d4416a)


[1]:
https://github.com/grafana/loki/blob/de4f56e42d14eb25f22a249aca04dd0736e88d15/pkg/bloomcompactor/bloomcompactor.go#L408
  • Loading branch information
salvacorts authored Feb 1, 2024
1 parent 1191f88 commit c335cd2
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 95 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/index/bitprefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ii *BitPrefixInvertedIndex) getShards(shard *astmapper.ShardAnnotation) ([
}

requestedShard := shard.TSDB()
minFp, maxFp := requestedShard.Bounds()
minFp, maxFp := requestedShard.GetFromThrough()

// Determine how many bits we need to take from
// the requested shard's min/max fingerprint values
Expand Down Expand Up @@ -143,7 +143,7 @@ func (ii *BitPrefixInvertedIndex) Lookup(matchers []*labels.Matcher, shard *astm
// Because bit prefix order is also ascending order,
// the merged fingerprints from ascending shards are also in order.
if filter {
minFP, maxFP := shard.TSDB().Bounds()
minFP, maxFP := shard.TSDB().GetFromThrough()
minIdx := sort.Search(len(result), func(i int) bool {
return result[i] >= minFP
})
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ func (b FingerprintBounds) Overlaps(target FingerprintBounds) bool {
return b.Cmp(target.Min) != After && b.Cmp(target.Max) != Before
}

// Match implements TSDBs FingerprintFilter interface
func (b FingerprintBounds) Match(fp model.Fingerprint) bool {
return b.Cmp(fp) == Overlap
}

// GetFromThrough implements TSDBs FingerprintFilter interface
func (b FingerprintBounds) GetFromThrough() (model.Fingerprint, model.Fingerprint) {
return b.Min, b.Max
}

// Slice returns a new fingerprint bounds clipped to the target bounds or nil if there is no overlap
func (b FingerprintBounds) Slice(min, max model.Fingerprint) *FingerprintBounds {
return b.Intersection(FingerprintBounds{Min: min, Max: max})
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,22 +747,22 @@ func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx

}

func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, _ []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, _ []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) {
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil, nil
}
return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
return idx.GetChunkRefs(ctx, userID, from, through, nil, fpFilter, matchers...)

}

// Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.
func (t *tenantHeads) Series(ctx context.Context, userID string, from, through model.Time, _ []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
func (t *tenantHeads) Series(ctx context.Context, userID string, from, through model.Time, _ []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) {
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil, nil
}
return idx.Series(ctx, userID, from, through, nil, shard, matchers...)
return idx.Series(ctx, userID, from, through, nil, fpFilter, matchers...)

}

Expand All @@ -784,20 +784,20 @@ func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, thro

}

func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil
}
return idx.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, matchers...)
return idx.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, matchers...)
}

func (t *tenantHeads) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error {
func (t *tenantHeads) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error {
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil
}
return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...)
return idx.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, matchers...)
}

// helper only used in building TSDBs
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err
}

// Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error) {
func (h *headIndexReader) Postings(name string, fpFilter index.FingerprintFilter, values ...string) (index.Postings, error) {
var p index.Postings
switch len(values) {
case 0:
Expand All @@ -115,8 +115,8 @@ func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, va
p = index.Merge(res...)
}

if shard != nil {
return index.NewShardedPostings(p, *shard, nil), nil
if fpFilter != nil {
return index.NewShardedPostings(p, fpFilter, nil), nil
}
return p, nil
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,25 @@ type Index interface {
// the requested shard. If it is nil, TSDB will return all results,
// regardless of shard.
// Note: any shard used must be a valid factor of two, meaning `0_of_2` and `3_of_4` are fine, but `0_of_3` is not.
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error)
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error)
// Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.
Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)
Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error)
LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)
LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)
Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error
Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error
Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error
Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error
}

type NoopIndex struct{}

func (NoopIndex) Close() error { return nil }
func (NoopIndex) Bounds() (_, through model.Time) { return }
func (NoopIndex) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ []ChunkRef, _ *index.ShardAnnotation, _ ...*labels.Matcher) ([]ChunkRef, error) {
func (NoopIndex) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ []ChunkRef, _ index.FingerprintFilter, _ ...*labels.Matcher) ([]ChunkRef, error) {
return nil, nil
}

// Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.
func (NoopIndex) Series(_ context.Context, _ string, _, _ model.Time, _ []Series, _ *index.ShardAnnotation, _ ...*labels.Matcher) ([]Series, error) {
func (NoopIndex) Series(_ context.Context, _ string, _, _ model.Time, _ []Series, _ index.FingerprintFilter, _ ...*labels.Matcher) ([]Series, error) {
return nil, nil
}
func (NoopIndex) LabelNames(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([]string, error) {
Expand All @@ -75,12 +75,12 @@ func (NoopIndex) LabelValues(_ context.Context, _ string, _, _ model.Time, _ str
return nil, nil
}

func (NoopIndex) Stats(_ context.Context, _ string, _, _ model.Time, _ IndexStatsAccumulator, _ *index.ShardAnnotation, _ shouldIncludeChunk, _ ...*labels.Matcher) error {
func (NoopIndex) Stats(_ context.Context, _ string, _, _ model.Time, _ IndexStatsAccumulator, _ index.FingerprintFilter, _ shouldIncludeChunk, _ ...*labels.Matcher) error {
return nil
}

func (NoopIndex) SetChunkFilterer(_ chunk.RequestChunkFilterer) {}

func (NoopIndex) Volume(_ context.Context, _ string, _, _ model.Time, _ VolumeAccumulator, _ *index.ShardAnnotation, _ shouldIncludeChunk, _ []string, _ string, _ ...*labels.Matcher) error {
func (NoopIndex) Volume(_ context.Context, _ string, _, _ model.Time, _ VolumeAccumulator, _ index.FingerprintFilter, _ shouldIncludeChunk, _ []string, _ string, _ ...*labels.Matcher) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
// (SeriesRef, Fingerprint) tuples
type FingerprintOffsets [][2]uint64

func (xs FingerprintOffsets) Range(shard ShardAnnotation) (minOffset, maxOffset uint64) {
from, through := shard.Bounds()
func (xs FingerprintOffsets) Range(fpFilter FingerprintFilter) (minOffset, maxOffset uint64) {
from, through := fpFilter.GetFromThrough()
lower := sort.Search(len(xs), func(i int) bool {
return xs[i][1] >= uint64(from)
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1849,7 +1849,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
}

func (r *Reader) Postings(name string, shard *ShardAnnotation, values ...string) (Postings, error) {
func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) {
if r.version == FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
Expand Down Expand Up @@ -1947,8 +1947,8 @@ func (r *Reader) Postings(name string, shard *ShardAnnotation, values ...string)
}

merged := Merge(res...)
if shard != nil {
return NewShardedPostings(merged, *shard, r.fingerprintOffsets), nil
if fpFilter != nil {
return NewShardedPostings(merged, fpFilter, r.fingerprintOffsets), nil
}

return merged, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,8 @@ type ShardedPostings struct {
// For example (below), given a shard, we'll likely return a slight superset of offsets surrounding the shard.
// ---[shard0]--- # Shard membership
// -[--shard0--]- # Series returned by shardedPostings
func NewShardedPostings(p Postings, shard ShardAnnotation, offsets FingerprintOffsets) *ShardedPostings {
min, max := offsets.Range(shard)
func NewShardedPostings(p Postings, fpFilter FingerprintFilter, offsets FingerprintOffsets) *ShardedPostings {
min, max := offsets.Range(fpFilter)
return &ShardedPostings{
p: p,
minOffset: min,
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ const (

var errDisallowedIdentityShard = errors.New("shard with factor of 1 is explicitly disallowed. It's equivalent to no sharding")

type FingerprintFilter interface {
Match(model.Fingerprint) bool
GetFromThrough() (model.Fingerprint, model.Fingerprint)
}

// ShardAnnotation is a convenience struct which holds data from a parsed shard label
// Of MUST be a power of 2 to ensure sharding logic works correctly.
type ShardAnnotation struct {
Expand Down Expand Up @@ -72,9 +77,9 @@ func (shard ShardAnnotation) Validate() error {
return nil
}

// Bounds shows the [minimum, maximum) fingerprints. If there is no maximum
// GetFromThrough shows the [minimum, maximum) fingerprints. If there is no maximum
// fingerprint (for example the last shard), math.MaxUint64 is used as the maximum.
func (shard ShardAnnotation) Bounds() (model.Fingerprint, model.Fingerprint) {
func (shard ShardAnnotation) GetFromThrough() (model.Fingerprint, model.Fingerprint) {
requiredBits := model.Fingerprint(shard.RequiredBits())
from := model.Fingerprint(shard.Shard) << (64 - requiredBits)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestShardBounds(t *testing.T) {
},
} {
t.Run(tc.shard.String(), func(t *testing.T) {
from, through := tc.shard.Bounds()
from, through := tc.shard.GetFromThrough()
require.Equal(t, model.Fingerprint(tc.from), from)
require.Equal(t, model.Fingerprint(tc.through), through)
})
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,22 @@ func NewIndexClient(idx Index, opts IndexClientOptions, l Limits) *IndexClient {
// In the future, we should use dynamic sharding in TSDB to determine the shard factors
// and we may no longer wish to send a shard label inside the queries,
// but rather expose it as part of the stores.Index interface
func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.ShardAnnotation, error) {
func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, index.FingerprintFilter, error) {
// first use withoutNameLabel to make a copy with the name label removed
matchers = withoutNameLabel(matchers)
s, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers)
if err != nil {
return nil, nil, err
}

var shard *index.ShardAnnotation
var fpFilter index.FingerprintFilter
if s != nil {
matchers = append(matchers[:shardLabelIndex], matchers[shardLabelIndex+1:]...)
shard = &index.ShardAnnotation{
shard := index.ShardAnnotation{
Shard: uint32(s.Shard),
Of: uint32(s.Of),
}
fpFilter = shard

if err := shard.Validate(); err != nil {
return nil, nil, err
Expand All @@ -97,7 +98,7 @@ func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.Shard
matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, "", ""))
}

return matchers, shard, err
return matchers, fpFilter, err

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ func (i *indexShipperQuerier) Close() error {
return nil
}

func (i *indexShipperQuerier) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *tsdbindex.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
func (i *indexShipperQuerier) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter tsdbindex.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)
return idx.GetChunkRefs(ctx, userID, from, through, res, fpFilter, matchers...)
}

func (i *indexShipperQuerier) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *tsdbindex.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
func (i *indexShipperQuerier) Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter tsdbindex.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.Series(ctx, userID, from, through, res, shard, matchers...)
return idx.Series(ctx, userID, from, through, res, fpFilter, matchers...)
}

func (i *indexShipperQuerier) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
Expand All @@ -116,22 +116,22 @@ func (i *indexShipperQuerier) LabelValues(ctx context.Context, userID string, fr
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
}

func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *tsdbindex.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter tsdbindex.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return err
}

return idx.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, matchers...)
return idx.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, matchers...)
}

func (i *indexShipperQuerier) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *tsdbindex.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error {
func (i *indexShipperQuerier) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter tsdbindex.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return err
}

return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...)
return idx.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, matchers...)
}

type resultAccumulator struct {
Expand Down
Loading

0 comments on commit c335cd2

Please sign in to comment.