Skip to content

Commit

Permalink
Merge branch 'main' into tpatterson/structured-metadata-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle committed Aug 28, 2024
2 parents cdafa90 + 8afdfd5 commit 89ce577
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -9679,7 +9679,7 @@ false
<td>string</td>
<td>Docker image tag</td>
<td><pre lang="json">
"1.24.3"
"1.27.5"
</pre>
</td>
</tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
return lbs.Get("log_stream") == "dispatcher"
}

func (t *testFilter) RequiredLabelNames() []string {
return []string{"log_stream"}
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}
Expand Down
13 changes: 7 additions & 6 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,17 @@ func (i *streamIterator) Close() error {
return nil
}

// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
// Not safe for concurrent use
type HeapIterator interface {
// MergeEntryIterator exposes additional fields that are used by the Tailer only.
// Not safe for concurrent use!
type MergeEntryIterator interface {
EntryIterator

Peek() time.Time
IsEmpty() bool
Push(EntryIterator)
}

// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries.
// mergeEntryIterator implements the MergeEntryIterator interface functions.
type mergeEntryIterator struct {
tree *loser.Tree[sortFields, EntryIterator]
stats *stats.Context
Expand All @@ -74,11 +75,11 @@ type mergeEntryIterator struct {
errs []error
}

// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any.
// NewMergeEntryIterator returns a new iterator which uses a looser tree to merge together entries for multiple iterators and deduplicate entries if any.
// The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate entries, use `NewSortEntryIterator` instead.
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) MergeEntryIterator {
maxVal, less := treeLess(direction)
result := &mergeEntryIterator{stats: stats.FromContext(ctx)}
result.tree = loser.New(is, maxVal, sortFieldsAt, less, result.closeEntry)
Expand Down
8 changes: 4 additions & 4 deletions pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,16 @@ func TestIteratorMultipleLabels(t *testing.T) {
func TestMergeIteratorPrefetch(t *testing.T) {
t.Parallel()

type tester func(t *testing.T, i HeapIterator)
type tester func(t *testing.T, i MergeEntryIterator)

tests := map[string]tester{
"prefetch on IsEmpty() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on IsEmpty() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.Equal(t, false, i.IsEmpty())
},
"prefetch on Peek() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on Peek() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.Equal(t, time.Unix(0, 0), i.Peek())
},
"prefetch on Next() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on Next() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.True(t, i.Next())
assert.Equal(t, logproto.Entry{Timestamp: time.Unix(0, 0), Line: "0"}, i.At())
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// Tailer manages complete lifecycle of a tail request
type Tailer struct {
// openStreamIterator is for streams already open
openStreamIterator iter.HeapIterator
openStreamIterator iter.MergeEntryIterator
streamMtx sync.Mutex // for synchronizing access to openStreamIterator

currEntry logproto.Entry
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels())
iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk)
iterator, err := it.buildMergeIterator(chunks, from, through, streamPipeline, nextChunk)
if err != nil {
return nil, err
}
Expand All @@ -433,7 +433,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
return result, nil
}

func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
func (it *logBatchIterator) buildMergeIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))

for i := range chks {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,9 +1649,9 @@ func TestBuildHeapIterator(t *testing.T) {
ctx: ctx,
pipeline: log.NewNoopPipeline(),
}
it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
it, err := b.buildMergeIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
t.Errorf("buildMergeIterator error = %v", err)
return
}
req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil)
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ type RequestChunkFilterer interface {
// Filterer filters chunks based on the metric.
type Filterer interface {
ShouldFilter(metric labels.Labels) bool
RequiredLabelNames() []string
}
4 changes: 4 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool {
return metric.Get("foo") == "bazz"
}

func (f fakeChunkFilterer) RequiredLabelNames() []string {
return []string{"foo"}
}

func Test_ChunkFilterer(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6
return s.fp, nil
}

func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) {
func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) {
s := h.head.series.getByID(uint64(ref))

if s == nil {
h.head.metrics.seriesNotFound.Inc()
return 0, index.ChunkStats{}, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.ls...)
if len(by) == 0 {
*lbls = append((*lbls)[:0], s.ls...)
} else {
*lbls = (*lbls)[:0]
for _, l := range s.ls {
if _, ok := by[l.Name]; ok {
*lbls = append(*lbls, l)
}
}
}

queryBounds := newBounds(model.Time(from), model.Time(through))

Expand Down
50 changes: 46 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l
return fprint, nil
}

func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
offset := id
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
Expand All @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab
return 0, ChunkStats{}, d.Err()
}

return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by)
}

func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) {
Expand Down Expand Up @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return &d, fprint, nil
}

// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names.
// If `by` is empty, it returns all labels for the series.
func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) {
if by == nil {
return dec.prepSeries(b, lbls, chks)
}
*lbls = (*lbls)[:0]
if chks != nil {
*chks = (*chks)[:0]
}

d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})

fprint := d.Be64()
k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
if _, ok := by[ln]; !ok {
continue
}

lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
Expand All @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
return &d, fprint, nil
}

func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeries(b, lbls, nil)
func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeriesBy(b, lbls, nil, by)
if err != nil {
return 0, ChunkStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type IndexReader interface {
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

// ChunkStats returns the stats for the chunks in the given series.
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error)
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error)

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
Expand Down
33 changes: 26 additions & 7 deletions pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,18 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim
// TODO(owen-d): use pool
var ls labels.Labels
var filterer chunk.Filterer
by := make(map[string]struct{})
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
return err
}
Expand Down Expand Up @@ -362,16 +368,29 @@ func (i *TSDBIndex) Volume(
seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch)))

aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == ""
var by map[string]struct{}
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
}
if !includeAll && (aggregateBySeries || len(targetLabels) > 0) {
by = make(map[string]struct{}, len(labelsToMatch))
for k := range labelsToMatch {
by[k] = struct{}{}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
// If we are aggregating by series, we need to include all labels in the series required for filtering chunks.
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
return fmt.Errorf("series volume: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
Expand Down Expand Up @@ -140,7 +141,6 @@ func TestSingleIdx(t *testing.T) {
End: 10,
Checksum: 3,
}}, shardedRefs)

})

t.Run("Series", func(t *testing.T) {
Expand Down Expand Up @@ -202,10 +202,8 @@ func TestSingleIdx(t *testing.T) {
require.Nil(t, err)
require.Equal(t, []string{"bar"}, vs)
})

})
}

}

func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) {
Expand Down Expand Up @@ -743,10 +741,50 @@ func TestTSDBIndex_Volume(t *testing.T) {
Limit: 10,
}, acc.Volumes())
})
// todo(cyriltovena): tests with chunk filterer
})
})
}

func BenchmarkTSDBIndex_Volume(b *testing.B) {
var series []LoadableSeries
for i := 0; i < 1000; i++ {
series = append(series, LoadableSeries{
Labels: mustParseLabels(fmt.Sprintf(`{foo="bar", fizz="fizz%d", buzz="buzz%d",bar="bar%d", bozz="bozz%d"}`, i, i, i, i)),
Chunks: []index.ChunkMeta{
{
MinTime: 0,
MaxTime: 10,
Checksum: uint32(i),
KB: 10,
Entries: 10,
},
{
MinTime: 10,
MaxTime: 20,
Checksum: uint32(i),
KB: 10,
Entries: 10,
},
},
})
}
ctx := context.Background()
from := model.Earliest
through := model.Latest
// Create the TSDB index
tempDir := b.TempDir()
tsdbIndex := BuildIndex(b, tempDir, series)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.Volume(ctx, "fake", from, through, acc, nil, nil, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"))
require.NoError(b, err)
}
}

type filterAll struct{}

func (f *filterAll) ForRequest(_ context.Context) chunk.Filterer {
Expand All @@ -758,3 +796,7 @@ type filterAllFilterer struct{}
func (f *filterAllFilterer) ShouldFilter(_ labels.Labels) bool {
return true
}

func (f *filterAllFilterer) RequiredLabelNames() []string {
return nil
}
Loading

0 comments on commit 89ce577

Please sign in to comment.