From c335cd200e8403e547e867528bbb9df3c41dda1f Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 1 Feb 2024 19:10:01 +0100 Subject: [PATCH] Extract shard annotation into new FingerprintFilter interface (#11834) **What this PR does / why we need it**: This PR extracts the `Match` and `Bounds` methods of TSDB's `ShardAnnotation` into a new interface `FingerprintFilter`. This will allow us to query the index for any fingerprint bounds, not just power of 2 shard factors. We now use this in the bloom compactor by calling the index `ForSeries` ([here][1]) passing a fingerprint filter with the bounds of the FP range owned by the compactor. **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) [1]: https://github.com/grafana/loki/blob/de4f56e42d14eb25f22a249aca04dd0736e88d15/pkg/bloomcompactor/bloomcompactor.go#L408 --- pkg/ingester/index/bitprefix.go | 4 +-- pkg/storage/bloom/v1/bounds.go | 10 +++++++ .../shipper/indexshipper/tsdb/head_manager.go | 16 +++++----- .../shipper/indexshipper/tsdb/head_read.go | 6 ++-- .../stores/shipper/indexshipper/tsdb/index.go | 16 +++++----- .../indexshipper/tsdb/index/fingerprint.go | 4 +-- .../shipper/indexshipper/tsdb/index/index.go | 6 ++-- .../indexshipper/tsdb/index/postings.go | 4 +-- .../shipper/indexshipper/tsdb/index/shard.go | 9 ++++-- .../indexshipper/tsdb/index/shard_test.go | 2 +- .../shipper/indexshipper/tsdb/index_client.go | 9 +++--- .../tsdb/index_shipper_querier.go | 16 +++++----- .../shipper/indexshipper/tsdb/lazy_index.go | 16 +++++----- .../indexshipper/tsdb/multi_file_index.go | 16 +++++----- .../shipper/indexshipper/tsdb/multitenant.go | 16 +++++----- .../shipper/indexshipper/tsdb/querier.go | 26 ++++++++-------- .../indexshipper/tsdb/single_file_index.go | 30 +++++++++---------- 17 files changed, 111 insertions(+), 95 deletions(-) diff --git a/pkg/ingester/index/bitprefix.go b/pkg/ingester/index/bitprefix.go index 025005618d8c5..8235c2821d6ca 100644 --- a/pkg/ingester/index/bitprefix.go +++ b/pkg/ingester/index/bitprefix.go @@ -69,7 +69,7 @@ func (ii *BitPrefixInvertedIndex) getShards(shard *astmapper.ShardAnnotation) ([ } requestedShard := shard.TSDB() - minFp, maxFp := requestedShard.Bounds() + minFp, maxFp := requestedShard.GetFromThrough() // Determine how many bits we need to take from // the requested shard's min/max fingerprint values @@ -143,7 +143,7 @@ func (ii *BitPrefixInvertedIndex) Lookup(matchers []*labels.Matcher, shard *astm // Because bit prefix order is also ascending order, // the merged fingerprints from ascending shards are also in order. if filter { - minFP, maxFP := shard.TSDB().Bounds() + minFP, maxFP := shard.TSDB().GetFromThrough() minIdx := sort.Search(len(result), func(i int) bool { return result[i] >= minFP }) diff --git a/pkg/storage/bloom/v1/bounds.go b/pkg/storage/bloom/v1/bounds.go index 0e52554a393d3..961060198c393 100644 --- a/pkg/storage/bloom/v1/bounds.go +++ b/pkg/storage/bloom/v1/bounds.go @@ -59,6 +59,16 @@ func (b FingerprintBounds) Overlaps(target FingerprintBounds) bool { return b.Cmp(target.Min) != After && b.Cmp(target.Max) != Before } +// Match implements TSDBs FingerprintFilter interface +func (b FingerprintBounds) Match(fp model.Fingerprint) bool { + return b.Cmp(fp) == Overlap +} + +// GetFromThrough implements TSDBs FingerprintFilter interface +func (b FingerprintBounds) GetFromThrough() (model.Fingerprint, model.Fingerprint) { + return b.Min, b.Max +} + // Slice returns a new fingerprint bounds clipped to the target bounds or nil if there is no overlap func (b FingerprintBounds) Slice(min, max model.Fingerprint) *FingerprintBounds { return b.Intersection(FingerprintBounds{Min: min, Max: max}) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go index bae41255554db..7342fe851c577 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go @@ -747,22 +747,22 @@ func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx } -func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, _ []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { +func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, _ []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) { idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil, nil } - return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...) + return idx.GetChunkRefs(ctx, userID, from, through, nil, fpFilter, matchers...) } // Series follows the same semantics regarding the passed slice and shard as GetChunkRefs. -func (t *tenantHeads) Series(ctx context.Context, userID string, from, through model.Time, _ []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { +func (t *tenantHeads) Series(ctx context.Context, userID string, from, through model.Time, _ []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) { idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil, nil } - return idx.Series(ctx, userID, from, through, nil, shard, matchers...) + return idx.Series(ctx, userID, from, through, nil, fpFilter, matchers...) } @@ -784,20 +784,20 @@ func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, thro } -func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { +func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil } - return idx.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, matchers...) + return idx.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, matchers...) } -func (t *tenantHeads) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { +func (t *tenantHeads) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil } - return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) + return idx.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) } // helper only used in building TSDBs diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go index 4ffc8ae2e9a96..203e951a435d5 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go @@ -100,7 +100,7 @@ func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, err } // Postings returns the postings list iterator for the label pairs. -func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error) { +func (h *headIndexReader) Postings(name string, fpFilter index.FingerprintFilter, values ...string) (index.Postings, error) { var p index.Postings switch len(values) { case 0: @@ -115,8 +115,8 @@ func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, va p = index.Merge(res...) } - if shard != nil { - return index.NewShardedPostings(p, *shard, nil), nil + if fpFilter != nil { + return index.NewShardedPostings(p, fpFilter, nil), nil } return p, nil } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index.go index 69f4c26765883..bb294fb13f450 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index.go @@ -47,25 +47,25 @@ type Index interface { // the requested shard. If it is nil, TSDB will return all results, // regardless of shard. // Note: any shard used must be a valid factor of two, meaning `0_of_2` and `3_of_4` are fine, but `0_of_3` is not. - GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) + GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) // Series follows the same semantics regarding the passed slice and shard as GetChunkRefs. - Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) + Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) - Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error - Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error + Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error + Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error } type NoopIndex struct{} func (NoopIndex) Close() error { return nil } func (NoopIndex) Bounds() (_, through model.Time) { return } -func (NoopIndex) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ []ChunkRef, _ *index.ShardAnnotation, _ ...*labels.Matcher) ([]ChunkRef, error) { +func (NoopIndex) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ []ChunkRef, _ index.FingerprintFilter, _ ...*labels.Matcher) ([]ChunkRef, error) { return nil, nil } // Series follows the same semantics regarding the passed slice and shard as GetChunkRefs. -func (NoopIndex) Series(_ context.Context, _ string, _, _ model.Time, _ []Series, _ *index.ShardAnnotation, _ ...*labels.Matcher) ([]Series, error) { +func (NoopIndex) Series(_ context.Context, _ string, _, _ model.Time, _ []Series, _ index.FingerprintFilter, _ ...*labels.Matcher) ([]Series, error) { return nil, nil } func (NoopIndex) LabelNames(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([]string, error) { @@ -75,12 +75,12 @@ func (NoopIndex) LabelValues(_ context.Context, _ string, _, _ model.Time, _ str return nil, nil } -func (NoopIndex) Stats(_ context.Context, _ string, _, _ model.Time, _ IndexStatsAccumulator, _ *index.ShardAnnotation, _ shouldIncludeChunk, _ ...*labels.Matcher) error { +func (NoopIndex) Stats(_ context.Context, _ string, _, _ model.Time, _ IndexStatsAccumulator, _ index.FingerprintFilter, _ shouldIncludeChunk, _ ...*labels.Matcher) error { return nil } func (NoopIndex) SetChunkFilterer(_ chunk.RequestChunkFilterer) {} -func (NoopIndex) Volume(_ context.Context, _ string, _, _ model.Time, _ VolumeAccumulator, _ *index.ShardAnnotation, _ shouldIncludeChunk, _ []string, _ string, _ ...*labels.Matcher) error { +func (NoopIndex) Volume(_ context.Context, _ string, _, _ model.Time, _ VolumeAccumulator, _ index.FingerprintFilter, _ shouldIncludeChunk, _ []string, _ string, _ ...*labels.Matcher) error { return nil } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/fingerprint.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/fingerprint.go index 646e587f706ea..f8b45682b93d0 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/fingerprint.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/fingerprint.go @@ -8,8 +8,8 @@ import ( // (SeriesRef, Fingerprint) tuples type FingerprintOffsets [][2]uint64 -func (xs FingerprintOffsets) Range(shard ShardAnnotation) (minOffset, maxOffset uint64) { - from, through := shard.Bounds() +func (xs FingerprintOffsets) Range(fpFilter FingerprintFilter) (minOffset, maxOffset uint64) { + from, through := fpFilter.GetFromThrough() lower := sort.Search(len(xs), func(i int) bool { return xs[i][1] >= uint64(from) }) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index eb9681160f4ef..7aa429d367718 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -1849,7 +1849,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls) } -func (r *Reader) Postings(name string, shard *ShardAnnotation, values ...string) (Postings, error) { +func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) { if r.version == FormatV1 { e, ok := r.postingsV1[name] if !ok { @@ -1947,8 +1947,8 @@ func (r *Reader) Postings(name string, shard *ShardAnnotation, values ...string) } merged := Merge(res...) - if shard != nil { - return NewShardedPostings(merged, *shard, r.fingerprintOffsets), nil + if fpFilter != nil { + return NewShardedPostings(merged, fpFilter, r.fingerprintOffsets), nil } return merged, nil diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go index 028da1bd06b57..0077f845df29a 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go @@ -845,8 +845,8 @@ type ShardedPostings struct { // For example (below), given a shard, we'll likely return a slight superset of offsets surrounding the shard. // ---[shard0]--- # Shard membership // -[--shard0--]- # Series returned by shardedPostings -func NewShardedPostings(p Postings, shard ShardAnnotation, offsets FingerprintOffsets) *ShardedPostings { - min, max := offsets.Range(shard) +func NewShardedPostings(p Postings, fpFilter FingerprintFilter, offsets FingerprintOffsets) *ShardedPostings { + min, max := offsets.Range(fpFilter) return &ShardedPostings{ p: p, minOffset: min, diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard.go index 12d75a06f743b..b188ebbcb24ed 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard.go @@ -17,6 +17,11 @@ const ( var errDisallowedIdentityShard = errors.New("shard with factor of 1 is explicitly disallowed. It's equivalent to no sharding") +type FingerprintFilter interface { + Match(model.Fingerprint) bool + GetFromThrough() (model.Fingerprint, model.Fingerprint) +} + // ShardAnnotation is a convenience struct which holds data from a parsed shard label // Of MUST be a power of 2 to ensure sharding logic works correctly. type ShardAnnotation struct { @@ -72,9 +77,9 @@ func (shard ShardAnnotation) Validate() error { return nil } -// Bounds shows the [minimum, maximum) fingerprints. If there is no maximum +// GetFromThrough shows the [minimum, maximum) fingerprints. If there is no maximum // fingerprint (for example the last shard), math.MaxUint64 is used as the maximum. -func (shard ShardAnnotation) Bounds() (model.Fingerprint, model.Fingerprint) { +func (shard ShardAnnotation) GetFromThrough() (model.Fingerprint, model.Fingerprint) { requiredBits := model.Fingerprint(shard.RequiredBits()) from := model.Fingerprint(shard.Shard) << (64 - requiredBits) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard_test.go index 167e9f4baec81..f7613389c32c7 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/shard_test.go @@ -104,7 +104,7 @@ func TestShardBounds(t *testing.T) { }, } { t.Run(tc.shard.String(), func(t *testing.T) { - from, through := tc.shard.Bounds() + from, through := tc.shard.GetFromThrough() require.Equal(t, model.Fingerprint(tc.from), from) require.Equal(t, model.Fingerprint(tc.through), through) }) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_client.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_client.go index fae628a3a9078..d609dc0ed27f7 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_client.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_client.go @@ -71,7 +71,7 @@ func NewIndexClient(idx Index, opts IndexClientOptions, l Limits) *IndexClient { // In the future, we should use dynamic sharding in TSDB to determine the shard factors // and we may no longer wish to send a shard label inside the queries, // but rather expose it as part of the stores.Index interface -func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.ShardAnnotation, error) { +func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, index.FingerprintFilter, error) { // first use withoutNameLabel to make a copy with the name label removed matchers = withoutNameLabel(matchers) s, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers) @@ -79,13 +79,14 @@ func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.Shard return nil, nil, err } - var shard *index.ShardAnnotation + var fpFilter index.FingerprintFilter if s != nil { matchers = append(matchers[:shardLabelIndex], matchers[shardLabelIndex+1:]...) - shard = &index.ShardAnnotation{ + shard := index.ShardAnnotation{ Shard: uint32(s.Shard), Of: uint32(s.Of), } + fpFilter = shard if err := shard.Validate(); err != nil { return nil, nil, err @@ -97,7 +98,7 @@ func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.Shard matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, "", "")) } - return matchers, shard, err + return matchers, fpFilter, err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go index c07add72b6714..acace60c1e4b2 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go @@ -84,20 +84,20 @@ func (i *indexShipperQuerier) Close() error { return nil } -func (i *indexShipperQuerier) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *tsdbindex.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { +func (i *indexShipperQuerier) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter tsdbindex.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) { idx, err := i.indices(ctx, from, through, userID) if err != nil { return nil, err } - return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...) + return idx.GetChunkRefs(ctx, userID, from, through, res, fpFilter, matchers...) } -func (i *indexShipperQuerier) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *tsdbindex.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { +func (i *indexShipperQuerier) Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter tsdbindex.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) { idx, err := i.indices(ctx, from, through, userID) if err != nil { return nil, err } - return idx.Series(ctx, userID, from, through, res, shard, matchers...) + return idx.Series(ctx, userID, from, through, res, fpFilter, matchers...) } func (i *indexShipperQuerier) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) { @@ -116,22 +116,22 @@ func (i *indexShipperQuerier) LabelValues(ctx context.Context, userID string, fr return idx.LabelValues(ctx, userID, from, through, name, matchers...) } -func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *tsdbindex.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { +func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter tsdbindex.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { idx, err := i.indices(ctx, from, through, userID) if err != nil { return err } - return idx.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, matchers...) + return idx.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, matchers...) } -func (i *indexShipperQuerier) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *tsdbindex.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { +func (i *indexShipperQuerier) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter tsdbindex.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { idx, err := i.indices(ctx, from, through, userID) if err != nil { return err } - return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) + return idx.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) } type resultAccumulator struct { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/lazy_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/lazy_index.go index b9877a6ae1ab3..327566f1a0ecc 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/lazy_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/lazy_index.go @@ -36,19 +36,19 @@ func (f LazyIndex) Close() error { return i.Close() } -func (f LazyIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { +func (f LazyIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) { i, err := f() if err != nil { return nil, err } - return i.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...) + return i.GetChunkRefs(ctx, userID, from, through, res, fpFilter, matchers...) } -func (f LazyIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { +func (f LazyIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) { i, err := f() if err != nil { return nil, err } - return i.Series(ctx, userID, from, through, res, shard, matchers...) + return i.Series(ctx, userID, from, through, res, fpFilter, matchers...) } func (f LazyIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) { i, err := f() @@ -65,18 +65,18 @@ func (f LazyIndex) LabelValues(ctx context.Context, userID string, from, through return i.LabelValues(ctx, userID, from, through, name, matchers...) } -func (f LazyIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { +func (f LazyIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { i, err := f() if err != nil { return err } - return i.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, matchers...) + return i.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, matchers...) } -func (f LazyIndex) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { +func (f LazyIndex) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { i, err := f() if err != nil { return err } - return i.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) + return i.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go index 01935a842d539..08bf6bf4ff01e 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go @@ -131,7 +131,7 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model } -func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { +func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) { acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) { if res == nil { res = ChunkRefsPool.Get() @@ -165,7 +165,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro from, through, func(ctx context.Context, idx Index) error { - got, err := idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...) + got, err := idx.GetChunkRefs(ctx, userID, from, through, nil, fpFilter, matchers...) if err != nil { return err } @@ -187,7 +187,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro } -func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { +func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) { acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) { if res == nil { res = SeriesPool.Get() @@ -217,7 +217,7 @@ func (i *MultiIndex) Series(ctx context.Context, userID string, from, through mo from, through, func(ctx context.Context, idx Index) error { - got, err := idx.Series(ctx, userID, from, through, nil, shard, matchers...) + got, err := idx.Series(ctx, userID, from, through, nil, fpFilter, matchers...) if err != nil { return err } @@ -354,14 +354,14 @@ func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, throu return merged.([]string), nil } -func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { +func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { return i.forMatchingIndices(ctx, from, through, func(ctx context.Context, idx Index) error { - return idx.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, matchers...) + return idx.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, matchers...) }) } -func (i *MultiIndex) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { +func (i *MultiIndex) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { return i.forMatchingIndices(ctx, from, through, func(ctx context.Context, idx Index) error { - return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) + return idx.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) }) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multitenant.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multitenant.go index 9dda0886dbf40..ec582b6e21489 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multitenant.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multitenant.go @@ -51,12 +51,12 @@ func (m *MultiTenantIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilter func (m *MultiTenantIndex) Close() error { return m.idx.Close() } -func (m *MultiTenantIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { - return m.idx.GetChunkRefs(ctx, userID, from, through, res, shard, withTenantLabelMatcher(userID, matchers)...) +func (m *MultiTenantIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) { + return m.idx.GetChunkRefs(ctx, userID, from, through, res, fpFilter, withTenantLabelMatcher(userID, matchers)...) } -func (m *MultiTenantIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { - xs, err := m.idx.Series(ctx, userID, from, through, res, shard, withTenantLabelMatcher(userID, matchers)...) +func (m *MultiTenantIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) { + xs, err := m.idx.Series(ctx, userID, from, through, res, fpFilter, withTenantLabelMatcher(userID, matchers)...) if err != nil { return nil, err } @@ -89,10 +89,10 @@ func (m *MultiTenantIndex) LabelValues(ctx context.Context, userID string, from, return m.idx.LabelValues(ctx, userID, from, through, name, withTenantLabelMatcher(userID, matchers)...) } -func (m *MultiTenantIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { - return m.idx.Stats(ctx, userID, from, through, acc, shard, shouldIncludeChunk, withTenantLabelMatcher(userID, matchers)...) +func (m *MultiTenantIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { + return m.idx.Stats(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, withTenantLabelMatcher(userID, matchers)...) } -func (m *MultiTenantIndex) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { - return m.idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, withTenantLabelMatcher(userID, matchers)...) +func (m *MultiTenantIndex) Volume(ctx context.Context, userID string, from, through model.Time, acc VolumeAccumulator, fpFilter index.FingerprintFilter, shouldIncludeChunk shouldIncludeChunk, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) error { + return m.idx.Volume(ctx, userID, from, through, acc, fpFilter, shouldIncludeChunk, targetLabels, aggregateBy, withTenantLabelMatcher(userID, matchers)...) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go index 10498bdf94e3f..b1e3306b14d1f 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go @@ -61,7 +61,7 @@ type IndexReader interface { // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Input values must be sorted. - Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error) + Postings(name string, fpFilter index.FingerprintFilter, values ...string) (index.Postings, error) // Series populates the given labels and chunk metas for the series identified // by the reference. @@ -89,7 +89,7 @@ type IndexReader interface { // PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. The resulting postings are not ordered by series. -func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*labels.Matcher) (index.Postings, error) { +func PostingsForMatchers(ix IndexReader, fpFilter index.FingerprintFilter, ms ...*labels.Matcher) (index.Postings, error) { var its, notIts []index.Postings // See which label must be non-empty. // Optimization for case like {l=~".", l!="1"}. @@ -113,7 +113,7 @@ func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*la return nil, err } - it, err := postingsForMatcher(ix, shard, inverse) + it, err := postingsForMatcher(ix, fpFilter, inverse) if err != nil { return nil, err } @@ -126,14 +126,14 @@ func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*la return nil, err } - it, err := inversePostingsForMatcher(ix, shard, inverse) + it, err := inversePostingsForMatcher(ix, fpFilter, inverse) if err != nil { return nil, err } its = append(its, it) } else { // l="a" // Non-Not matcher, use normal postingsForMatcher. - it, err := postingsForMatcher(ix, shard, m) + it, err := postingsForMatcher(ix, fpFilter, m) if err != nil { return nil, err } @@ -144,7 +144,7 @@ func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*la // the series which don't have the label name set too. See: // https://github.com/prometheus/prometheus/issues/3575 and // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 - it, err := inversePostingsForMatcher(ix, shard, m) + it, err := inversePostingsForMatcher(ix, fpFilter, m) if err != nil { return nil, err } @@ -155,7 +155,7 @@ func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*la // If there's nothing to subtract from, add in everything and remove the notIts later. if len(its) == 0 && len(notIts) != 0 { k, v := index.AllPostingsKey() - allPostings, err := ix.Postings(k, shard, v) + allPostings, err := ix.Postings(k, fpFilter, v) if err != nil { return nil, err } @@ -171,12 +171,12 @@ func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*la return it, nil } -func postingsForMatcher(ix IndexReader, shard *index.ShardAnnotation, m *labels.Matcher) (index.Postings, error) { +func postingsForMatcher(ix IndexReader, fpFilter index.FingerprintFilter, m *labels.Matcher) (index.Postings, error) { // This method will not return postings for missing labels. // Fast-path for equal matching. if m.Type == labels.MatchEqual { - return ix.Postings(m.Name, shard, m.Value) + return ix.Postings(m.Name, fpFilter, m.Value) } // Fast-path for set matching. @@ -184,7 +184,7 @@ func postingsForMatcher(ix IndexReader, shard *index.ShardAnnotation, m *labels. setMatches := findSetMatches(m.GetRegexString()) if len(setMatches) > 0 { sort.Strings(setMatches) - return ix.Postings(m.Name, shard, setMatches...) + return ix.Postings(m.Name, fpFilter, setMatches...) } } @@ -212,11 +212,11 @@ func postingsForMatcher(ix IndexReader, shard *index.ShardAnnotation, m *labels. if !isSorted { sort.Strings(res) } - return ix.Postings(m.Name, shard, res...) + return ix.Postings(m.Name, fpFilter, res...) } // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. -func inversePostingsForMatcher(ix IndexReader, shard *index.ShardAnnotation, m *labels.Matcher) (index.Postings, error) { +func inversePostingsForMatcher(ix IndexReader, fpFilter index.FingerprintFilter, m *labels.Matcher) (index.Postings, error) { vals, err := ix.LabelValues(m.Name) if err != nil { return nil, err @@ -237,7 +237,7 @@ func inversePostingsForMatcher(ix IndexReader, shard *index.ShardAnnotation, m * if !isSorted { sort.Strings(res) } - return ix.Postings(m.Name, shard, res...) + return ix.Postings(m.Name, fpFilter, res...) } func findSetMatches(pattern string) []string { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index fb3a85c9f0460..0e1ae029a8677 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -157,7 +157,7 @@ func (i *TSDBIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { // fn must NOT capture it's arguments. They're reused across series iterations and returned to // a pool after completion. -func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, from model.Time, through model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), matchers ...*labels.Matcher) error { +func (i *TSDBIndex) ForSeries(ctx context.Context, fpFilter index.FingerprintFilter, from model.Time, through model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), matchers ...*labels.Matcher) error { // TODO(owen-d): use pool var ls labels.Labels @@ -169,7 +169,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, filterer = i.chunkFilter.ForRequest(ctx) } - return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { + return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { for p.Next() { hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks) if err != nil { @@ -177,7 +177,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, } // skip series that belong to different shards - if shard != nil && !shard.Match(model.Fingerprint(hash)) { + if fpFilter != nil && !fpFilter.Match(model.Fingerprint(hash)) { continue } @@ -194,25 +194,25 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, func (i *TSDBIndex) forPostings( _ context.Context, - shard *index.ShardAnnotation, + fpFilter index.FingerprintFilter, _, _ model.Time, matchers []*labels.Matcher, fn func(index.Postings) error, ) error { - p, err := PostingsForMatchers(i.reader, shard, matchers...) + p, err := PostingsForMatchers(i.reader, fpFilter, matchers...) if err != nil { return err } return fn(p) } -func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { +func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]ChunkRef, error) { if res == nil { res = ChunkRefsPool.Get() } res = res[:0] - if err := i.ForSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + if err := i.ForSeries(ctx, fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { for _, chk := range chks { res = append(res, ChunkRef{ @@ -230,13 +230,13 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu return res, nil } -func (i *TSDBIndex) Series(ctx context.Context, _ string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { +func (i *TSDBIndex) Series(ctx context.Context, _ string, from, through model.Time, res []Series, fpFilter index.FingerprintFilter, matchers ...*labels.Matcher) ([]Series, error) { if res == nil { res = SeriesPool.Get() } res = res[:0] - if err := i.ForSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + if err := i.ForSeries(ctx, fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { if len(chks) == 0 { return } @@ -280,8 +280,8 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier { } } -func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, _ shouldIncludeChunk, matchers ...*labels.Matcher) error { - return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { +func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Time, acc IndexStatsAccumulator, fpFilter index.FingerprintFilter, _ shouldIncludeChunk, matchers ...*labels.Matcher) error { + return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { // TODO(owen-d): use pool var ls labels.Labels var filterer chunk.Filterer @@ -296,7 +296,7 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim } // skip series that belong to different shards - if shard != nil && !shard.Match(model.Fingerprint(fp)) { + if fpFilter != nil && !fpFilter.Match(model.Fingerprint(fp)) { continue } @@ -339,7 +339,7 @@ func (i *TSDBIndex) Volume( _ string, from, through model.Time, acc VolumeAccumulator, - shard *index.ShardAnnotation, + fpFilter index.FingerprintFilter, _ shouldIncludeChunk, targetLabels []string, aggregateBy string, @@ -355,7 +355,7 @@ func (i *TSDBIndex) Volume( aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" - return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { + return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { var ls labels.Labels var filterer chunk.Filterer if i.chunkFilter != nil { @@ -369,7 +369,7 @@ func (i *TSDBIndex) Volume( } // skip series that belong to different shards - if shard != nil && !shard.Match(model.Fingerprint(fp)) { + if fpFilter != nil && !fpFilter.Match(model.Fingerprint(fp)) { continue }