Skip to content

Commit

Permalink
Implement limit in select
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 committed Sep 3, 2024
1 parent 36d0fd9 commit a0fbc2e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 22 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706
* [ENHANCEMENT] Storage: Implement limit in mergeGenericQuerier. #14489
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042

## 2.54.1 / 2024-08-27
Expand Down Expand Up @@ -76,7 +75,6 @@ This release changes the default for GOGC, the Go runtime control for the trade-
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` metric to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
* [ENHANCEMENT] Storage: Implement limit in Querier. #14109
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics and at least one identifying label is defined. #13991
* [BUGFIX] Scrape: Do no try to ingest native histograms when the native histograms feature is turned off. This happened when protobuf scrape was enabled by for example the created time feature. #13987
* [BUGFIX] Scaleway SD: Use the instance's public IP if no private IP is available as the `__address__` meta label. #13941
Expand Down
41 changes: 28 additions & 13 deletions storage/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,17 @@ func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
var limit int
if hints != nil {
limit = hints.Limit
}
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
Expand Down Expand Up @@ -169,7 +173,7 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
Expand Down Expand Up @@ -288,12 +292,12 @@ func truncateToLimit(s []string, hints *LabelHints) []string {
type VerticalSeriesMergeFunc func(...Series) Series

// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}

// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
Expand All @@ -303,30 +307,33 @@ func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) Seri
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries

// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, limit int, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}

// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc

heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
seriesLimit int
mergedSeries int // tracks the total number of series merged and returned.
}

// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
// If seriesLimit is set, only limited series are returned.
func newGenericMergeSeriesSet(sets []genericSeriesSet, seriesLimit int, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
Expand All @@ -346,13 +353,19 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMe
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
mergeFunc: mergeFunc,
sets: sets,
heap: h,
seriesLimit: seriesLimit,
}
}

func (c *genericMergeSeriesSet) Next() bool {
if c.seriesLimit > 0 && c.mergedSeries >= c.seriesLimit {
// Exit early if seriesLimit is set.
return false
}

// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
Expand Down Expand Up @@ -388,12 +401,14 @@ func (c *genericMergeSeriesSet) Next() bool {

func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
c.mergedSeries++
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
c.mergedSeries++
return c.mergeFunc(series...)
}

Expand Down
15 changes: 9 additions & 6 deletions storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ func makeMergeSeriesSet(serieses [][]Series) SeriesSet {
for i, s := range serieses {
seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)}
}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, 0, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}

func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) {
Expand Down Expand Up @@ -1590,28 +1590,31 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
secondaries: []Querier{
&mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
limit: 1,
limit: 2,
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
labels.FromStrings("test", "d"),
},
expectedLabels: []string{"a"},
expectedLabels: []string{"a", "b"},
expectedWarnings: annotations.New().Add(warnStorage),
},
} {
var labelHints *LabelHints
var selectHints *SelectHints
if tcase.limit > 0 {
labelHints = &LabelHints{
Limit: tcase.limit,
}
selectHints = &SelectHints{
Limit: tcase.limit,
}
}

t.Run(tcase.name, func(t *testing.T) {
q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })

t.Run("Select", func(t *testing.T) {
res := q.Select(context.Background(), false, nil)
res := q.Select(context.Background(), false, selectHints)
var lbls []labels.Labels
for res.Next() {
lbls = append(lbls, res.At().Labels())
Expand Down
2 changes: 1 addition & 1 deletion tsdb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
if len(sets) > 1 {
// Merge series using specified chunk series merger.
// The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
set = storage.NewMergeChunkSeriesSet(sets, 0, mergeFunc)
}

// Iterate over all sorted chunk series.
Expand Down

0 comments on commit a0fbc2e

Please sign in to comment.