Skip to content

Commit

Permalink
perf: Avoid looking up unnecessary TSDB symbols during Volume API (gr…
Browse files Browse the repository at this point in the history
…afana#13960)

Co-authored-by: Trevor Whitney <[email protected]>
  • Loading branch information
2 people authored and pascal-sochacki committed Aug 29, 2024
1 parent 81c3842 commit 8079928
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 17 deletions.
4 changes: 4 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
return lbs.Get("log_stream") == "dispatcher"
}

func (t *testFilter) RequiredLabelNames() []string {
return []string{"log_stream"}
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ type RequestChunkFilterer interface {
// Filterer filters chunks based on the metric.
type Filterer interface {
ShouldFilter(metric labels.Labels) bool
RequiredLabelNames() []string
}
4 changes: 4 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool {
return metric.Get("foo") == "bazz"
}

func (f fakeChunkFilterer) RequiredLabelNames() []string {
return []string{"foo"}
}

func Test_ChunkFilterer(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6
return s.fp, nil
}

func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) {
func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) {
s := h.head.series.getByID(uint64(ref))

if s == nil {
h.head.metrics.seriesNotFound.Inc()
return 0, index.ChunkStats{}, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.ls...)
if len(by) == 0 {
*lbls = append((*lbls)[:0], s.ls...)
} else {
*lbls = (*lbls)[:0]
for _, l := range s.ls {
if _, ok := by[l.Name]; ok {
*lbls = append(*lbls, l)
}
}
}

queryBounds := newBounds(model.Time(from), model.Time(through))

Expand Down
50 changes: 46 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l
return fprint, nil
}

func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
offset := id
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
Expand All @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab
return 0, ChunkStats{}, d.Err()
}

return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by)
}

func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) {
Expand Down Expand Up @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return &d, fprint, nil
}

// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names.
// If `by` is empty, it returns all labels for the series.
func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) {
if by == nil {
return dec.prepSeries(b, lbls, chks)
}
*lbls = (*lbls)[:0]
if chks != nil {
*chks = (*chks)[:0]
}

d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})

fprint := d.Be64()
k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
if _, ok := by[ln]; !ok {
continue
}

lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
Expand All @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
return &d, fprint, nil
}

func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeries(b, lbls, nil)
func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeriesBy(b, lbls, nil, by)
if err != nil {
return 0, ChunkStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type IndexReader interface {
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

// ChunkStats returns the stats for the chunks in the given series.
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error)
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error)

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
Expand Down
33 changes: 26 additions & 7 deletions pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,18 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim
// TODO(owen-d): use pool
var ls labels.Labels
var filterer chunk.Filterer
by := make(map[string]struct{})
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
return err
}
Expand Down Expand Up @@ -362,16 +368,29 @@ func (i *TSDBIndex) Volume(
seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch)))

aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == ""
var by map[string]struct{}
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
}
if !includeAll && (aggregateBySeries || len(targetLabels) > 0) {
by = make(map[string]struct{}, len(labelsToMatch))
for k := range labelsToMatch {
by[k] = struct{}{}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
// If we are aggregating by series, we need to include all labels in the series required for filtering chunks.
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
return fmt.Errorf("series volume: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
Expand Down Expand Up @@ -140,7 +141,6 @@ func TestSingleIdx(t *testing.T) {
End: 10,
Checksum: 3,
}}, shardedRefs)

})

t.Run("Series", func(t *testing.T) {
Expand Down Expand Up @@ -202,10 +202,8 @@ func TestSingleIdx(t *testing.T) {
require.Nil(t, err)
require.Equal(t, []string{"bar"}, vs)
})

})
}

}

func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) {
Expand Down Expand Up @@ -743,10 +741,50 @@ func TestTSDBIndex_Volume(t *testing.T) {
Limit: 10,
}, acc.Volumes())
})
// todo(cyriltovena): tests with chunk filterer
})
})
}

func BenchmarkTSDBIndex_Volume(b *testing.B) {
var series []LoadableSeries
for i := 0; i < 1000; i++ {
series = append(series, LoadableSeries{
Labels: mustParseLabels(fmt.Sprintf(`{foo="bar", fizz="fizz%d", buzz="buzz%d",bar="bar%d", bozz="bozz%d"}`, i, i, i, i)),
Chunks: []index.ChunkMeta{
{
MinTime: 0,
MaxTime: 10,
Checksum: uint32(i),
KB: 10,
Entries: 10,
},
{
MinTime: 10,
MaxTime: 20,
Checksum: uint32(i),
KB: 10,
Entries: 10,
},
},
})
}
ctx := context.Background()
from := model.Earliest
through := model.Latest
// Create the TSDB index
tempDir := b.TempDir()
tsdbIndex := BuildIndex(b, tempDir, series)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.Volume(ctx, "fake", from, through, acc, nil, nil, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"))
require.NoError(b, err)
}
}

type filterAll struct{}

func (f *filterAll) ForRequest(_ context.Context) chunk.Filterer {
Expand All @@ -758,3 +796,7 @@ type filterAllFilterer struct{}
func (f *filterAllFilterer) ShouldFilter(_ labels.Labels) bool {
return true
}

func (f *filterAllFilterer) RequiredLabelNames() []string {
return nil
}

0 comments on commit 8079928

Please sign in to comment.