diff --git a/pkg/logql/accumulator.go b/pkg/logql/accumulator.go new file mode 100644 index 0000000000000..9e9784cb037ef --- /dev/null +++ b/pkg/logql/accumulator.go @@ -0,0 +1,379 @@ +package logql + +import ( + "container/heap" + "context" + "fmt" + "sort" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/logqlmodel/metadata" + "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" + "github.com/grafana/loki/pkg/util/math" +) + +// NewBufferedAccumulator returns an accumulator which aggregates all query +// results in a slice. This is useful for metric queries, which are generally +// small payloads and the memory overhead for buffering is negligible. +func NewBufferedAccumulator(n int) *BufferedAccumulator { + return &BufferedAccumulator{ + results: make([]logqlmodel.Result, n), + } +} + +type BufferedAccumulator struct { + results []logqlmodel.Result +} + +func (a *BufferedAccumulator) Accumulate(_ context.Context, acc logqlmodel.Result, i int) error { + a.results[i] = acc + return nil +} + +func (a *BufferedAccumulator) Result() []logqlmodel.Result { + return a.results +} + +type QuantileSketchAccumulator struct { + matrix ProbabilisticQuantileMatrix +} + +// newQuantileSketchAccumulator returns an accumulator for sharded +// probabilistic quantile queries that merges results as they come in. +func newQuantileSketchAccumulator() *QuantileSketchAccumulator { + return &QuantileSketchAccumulator{} +} + +func (a *QuantileSketchAccumulator) Accumulate(_ context.Context, res logqlmodel.Result, _ int) error { + if res.Data.Type() != QuantileSketchMatrixType { + return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), QuantileSketchMatrixType) + } + data, ok := res.Data.(ProbabilisticQuantileMatrix) + if !ok { + return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) + } + if a.matrix == nil { + a.matrix = data + return nil + } + + var err error + a.matrix, err = a.matrix.Merge(data) + return err +} + +func (a *QuantileSketchAccumulator) Result() []logqlmodel.Result { + return []logqlmodel.Result{{Data: a.matrix}} +} + +// heap impl for keeping only the top n results across m streams +// importantly, AccumulatedStreams is _bounded_, so it will only +// store the top `limit` results across all streams. +// To implement this, we use a min-heap when looking +// for the max values (logproto.FORWARD) +// and vice versa for logproto.BACKWARD. +// This allows us to easily find the 'worst' value +// and replace it with a better one. +// Once we've fully processed all log lines, +// we return the heap in opposite order and then reverse it +// to get the correct order. +// Heap implements container/heap.Interface +// solely to use heap.Interface as a library. +// It is not intended for the heap pkg functions +// to otherwise call this type. +type AccumulatedStreams struct { + count, limit int + labelmap map[string]int + streams []*logproto.Stream + order logproto.Direction + + stats stats.Result // for accumulating statistics from downstream requests + headers map[string][]string // for accumulating headers from downstream requests +} + +// NewStreamAccumulator returns an accumulator for limited log queries. +// Log queries, sharded thousands of times and each returning +// results, can be _considerably_ larger. In this case, we eagerly +// accumulate the results into a logsAccumulator, discarding values +// over the limit to keep memory pressure down while other subqueries +// are executing. +func NewStreamAccumulator(params Params) *AccumulatedStreams { + // the stream accumulator stores a heap with reversed order + // from the results we expect, so we need to reverse the direction + order := logproto.FORWARD + if params.Direction() == logproto.FORWARD { + order = logproto.BACKWARD + } + + return &AccumulatedStreams{ + labelmap: make(map[string]int), + order: order, + limit: int(params.Limit()), + + headers: make(map[string][]string), + } +} + +// returns the top priority +func (acc *AccumulatedStreams) top() (time.Time, bool) { + if len(acc.streams) > 0 && len(acc.streams[0].Entries) > 0 { + return acc.streams[0].Entries[len(acc.streams[0].Entries)-1].Timestamp, true + } + return time.Time{}, false +} + +func (acc *AccumulatedStreams) Find(labels string) (int, bool) { + i, ok := acc.labelmap[labels] + return i, ok +} + +// number of streams +func (acc *AccumulatedStreams) Len() int { return len(acc.streams) } + +func (acc *AccumulatedStreams) Swap(i, j int) { + // for i=0, j=1 + + // {'a': 0, 'b': 1} + // [a, b] + acc.streams[i], acc.streams[j] = acc.streams[j], acc.streams[i] + // {'a': 0, 'b': 1} + // [b, a] + acc.labelmap[acc.streams[i].Labels] = i + acc.labelmap[acc.streams[j].Labels] = j + // {'a': 1, 'b': 0} + // [b, a] +} + +// first order by timestamp, then by labels +func (acc *AccumulatedStreams) Less(i, j int) bool { + // order by the 'oldest' entry in the stream + if a, b := acc.streams[i].Entries[len(acc.streams[i].Entries)-1].Timestamp, acc.streams[j].Entries[len(acc.streams[j].Entries)-1].Timestamp; !a.Equal(b) { + return acc.less(a, b) + } + return acc.streams[i].Labels <= acc.streams[j].Labels +} + +func (acc *AccumulatedStreams) less(a, b time.Time) bool { + // use after for stable sort + if acc.order == logproto.FORWARD { + return !a.After(b) + } + return !b.After(a) +} + +func (acc *AccumulatedStreams) Push(x any) { + s := x.(*logproto.Stream) + if len(s.Entries) == 0 { + return + } + + if room := acc.limit - acc.count; room >= len(s.Entries) { + if i, ok := acc.Find(s.Labels); ok { + // stream already exists, append entries + + // these are already guaranteed to be sorted + // Reasoning: we shard subrequests so each stream exists on only one + // shard. Therefore, the only time a stream should already exist + // is in successive splits, which are already guaranteed to be ordered + // and we can just append. + acc.appendTo(acc.streams[i], s) + + return + } + + // new stream + acc.addStream(s) + return + } + + // there's not enough room for all the entries, + // so we need to + acc.push(s) +} + +// there's not enough room for all the entries. +// since we store them in a reverse heap relative to what we _want_ +// (i.e. the max value for FORWARD, the min value for BACKWARD), +// we test if the new entry is better than the worst entry, +// swapping them if so. +func (acc *AccumulatedStreams) push(s *logproto.Stream) { + worst, ok := acc.top() + room := math.Min(acc.limit-acc.count, len(s.Entries)) + + if !ok { + if room == 0 { + // special case: limit must be zero since there's no room and no worst entry + return + } + s.Entries = s.Entries[:room] + // special case: there are no entries in the heap. Push entries up to the limit + acc.addStream(s) + return + } + + // since entries are sorted by timestamp from best -> worst, + // we can discard the entire stream if the incoming best entry + // is worse than the worst entry in the heap. + cutoff := sort.Search(len(s.Entries), func(i int) bool { + // TODO(refactor label comparison -- should be in another fn) + if worst.Equal(s.Entries[i].Timestamp) { + return acc.streams[0].Labels < s.Labels + } + return acc.less(s.Entries[i].Timestamp, worst) + }) + s.Entries = s.Entries[:cutoff] + + for i := 0; i < len(s.Entries) && acc.less(worst, s.Entries[i].Timestamp); i++ { + + // push one entry at a time + room = acc.limit - acc.count + // pop if there's no room to make the heap small enough for an append; + // in the short path of Push() we know that there's room for at least one entry + if room == 0 { + acc.Pop() + } + + cpy := *s + cpy.Entries = []logproto.Entry{s.Entries[i]} + acc.Push(&cpy) + + // update worst + worst, _ = acc.top() + } +} + +func (acc *AccumulatedStreams) addStream(s *logproto.Stream) { + // ensure entries conform to order we expect + // TODO(owen-d): remove? should be unnecessary since we insert in appropriate order + // but it's nice to have the safeguard + sort.Slice(s.Entries, func(i, j int) bool { + return acc.less(s.Entries[j].Timestamp, s.Entries[i].Timestamp) + }) + + acc.streams = append(acc.streams, s) + i := len(acc.streams) - 1 + acc.labelmap[s.Labels] = i + acc.count += len(s.Entries) + heap.Fix(acc, i) +} + +// dst must already exist in acc +func (acc *AccumulatedStreams) appendTo(dst, src *logproto.Stream) { + // these are already guaranteed to be sorted + // Reasoning: we shard subrequests so each stream exists on only one + // shard. Therefore, the only time a stream should already exist + // is in successive splits, which are already guaranteed to be ordered + // and we can just append. + + var needsSort bool + for _, e := range src.Entries { + // sort if order has broken + if len(dst.Entries) > 0 && acc.less(dst.Entries[len(dst.Entries)-1].Timestamp, e.Timestamp) { + needsSort = true + } + dst.Entries = append(dst.Entries, e) + } + + if needsSort { + sort.Slice(dst.Entries, func(i, j int) bool { + // store in reverse order so we can more reliably insert without sorting and pop from end + return acc.less(dst.Entries[j].Timestamp, dst.Entries[i].Timestamp) + }) + } + + acc.count += len(src.Entries) + heap.Fix(acc, acc.labelmap[dst.Labels]) + +} + +// Pop returns a stream with one entry. It pops the first entry of the first stream +func (acc *AccumulatedStreams) Pop() any { + n := acc.Len() + if n == 0 { + return nil + } + + stream := acc.streams[0] + cpy := *stream + cpy.Entries = []logproto.Entry{cpy.Entries[len(stream.Entries)-1]} + stream.Entries = stream.Entries[:len(stream.Entries)-1] + + acc.count-- + + if len(stream.Entries) == 0 { + // remove stream + acc.Swap(0, n-1) + acc.streams[n-1] = nil // avoid leaking reference + delete(acc.labelmap, stream.Labels) + acc.streams = acc.streams[:n-1] + + } + + if acc.Len() > 0 { + heap.Fix(acc, 0) + } + + return &cpy +} + +// Note: can only be called once as it will alter stream ordreing. +func (acc *AccumulatedStreams) Result() []logqlmodel.Result { + // sort streams by label + sort.Slice(acc.streams, func(i, j int) bool { + return acc.streams[i].Labels < acc.streams[j].Labels + }) + + streams := make(logqlmodel.Streams, 0, len(acc.streams)) + + for _, s := range acc.streams { + // sort entries by timestamp, inversely based on direction + sort.Slice(s.Entries, func(i, j int) bool { + return acc.less(s.Entries[j].Timestamp, s.Entries[i].Timestamp) + }) + streams = append(streams, *s) + } + + res := logqlmodel.Result{ + // stats & headers are already aggregated in the context + Data: streams, + Statistics: acc.stats, + Headers: make([]*definitions.PrometheusResponseHeader, 0, len(acc.headers)), + } + + for name, vals := range acc.headers { + res.Headers = append( + res.Headers, + &definitions.PrometheusResponseHeader{ + Name: name, + Values: vals, + }, + ) + } + + return []logqlmodel.Result{res} +} + +func (acc *AccumulatedStreams) Accumulate(_ context.Context, x logqlmodel.Result, _ int) error { + // TODO(owen-d/ewelch): Shard counts should be set by the querier + // so we don't have to do it in tricky ways in multiple places. + // See pkg/logql/downstream.go:DownstreamEvaluator.Downstream + // for another example. + if x.Statistics.Summary.Shards == 0 { + x.Statistics.Summary.Shards = 1 + } + acc.stats.Merge(x.Statistics) + metadata.ExtendHeaders(acc.headers, x.Headers) + + switch got := x.Data.(type) { + case logqlmodel.Streams: + for i := range got { + acc.Push(&got[i]) + } + default: + return fmt.Errorf("unexpected response type during response result accumulation. Got (%T), wanted %s", got, logqlmodel.ValueTypeStreams) + } + return nil +} diff --git a/pkg/logql/accumulator_test.go b/pkg/logql/accumulator_test.go new file mode 100644 index 0000000000000..d827e3ea02e71 --- /dev/null +++ b/pkg/logql/accumulator_test.go @@ -0,0 +1,273 @@ +package logql + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/sketch" + "github.com/grafana/loki/pkg/logqlmodel" +) + +func TestAccumulatedStreams(t *testing.T) { + lim := 30 + nStreams := 10 + start, end := 0, 10 + // for a logproto.BACKWARD query, we use a min heap based on FORWARD + // to store the _earliest_ timestamp of the _latest_ entries, up to `limit` + xs := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, nStreams, logproto.BACKWARD) + acc := NewStreamAccumulator(LiteralParams{ + direction: logproto.BACKWARD, + limit: uint32(lim), + }) + for _, x := range xs { + acc.Push(x) + } + + for i := 0; i < lim; i++ { + got := acc.Pop().(*logproto.Stream) + require.Equal(t, fmt.Sprintf(`{n="%d"}`, i%nStreams), got.Labels) + exp := (nStreams*(end-start) - lim + i) / nStreams + require.Equal(t, time.Unix(int64(exp), 0), got.Entries[0].Timestamp) + } + +} + +func TestDownstreamAccumulatorSimple(t *testing.T) { + lim := 30 + start, end := 0, 10 + direction := logproto.BACKWARD + + streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, 10, direction) + x := make(logqlmodel.Streams, 0, len(streams)) + for _, s := range streams { + x = append(x, *s) + } + // dummy params. Only need to populate direction & limit + params, err := NewLiteralParams( + `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, direction, uint32(lim), nil, + ) + require.NoError(t, err) + + acc := NewStreamAccumulator(params) + result := logqlmodel.Result{ + Data: x, + } + + require.Nil(t, acc.Accumulate(context.Background(), result, 0)) + + res := acc.Result()[0] + got, ok := res.Data.(logqlmodel.Streams) + require.Equal(t, true, ok) + require.Equal(t, 10, len(got), "correct number of streams") + + // each stream should have the top 3 entries + for i := 0; i < 10; i++ { + require.Equal(t, 3, len(got[i].Entries), "correct number of entries in stream") + for j := 0; j < 3; j++ { + require.Equal(t, time.Unix(int64(9-j), 0), got[i].Entries[j].Timestamp, "correct timestamp") + } + } +} + +// TestDownstreamAccumulatorMultiMerge simulates merging multiple +// sub-results from different queries. +func TestDownstreamAccumulatorMultiMerge(t *testing.T) { + for _, direction := range []logproto.Direction{logproto.BACKWARD, logproto.FORWARD} { + t.Run(direction.String(), func(t *testing.T) { + nQueries := 10 + delta := 10 // 10 entries per stream, 1s apart + streamsPerQuery := 10 + lim := 30 + + payloads := make([]logqlmodel.Streams, 0, nQueries) + for i := 0; i < nQueries; i++ { + start := i * delta + end := start + delta + streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, streamsPerQuery, direction) + var res logqlmodel.Streams + for i := range streams { + res = append(res, *streams[i]) + } + payloads = append(payloads, res) + + } + + // queries are always dispatched in the correct order. + // oldest time ranges first in the case of logproto.FORWARD + // and newest time ranges first in the case of logproto.BACKWARD + if direction == logproto.BACKWARD { + for i, j := 0, len(payloads)-1; i < j; i, j = i+1, j-1 { + payloads[i], payloads[j] = payloads[j], payloads[i] + } + } + + // dummy params. Only need to populate direction & limit + params, err := NewLiteralParams( + `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, direction, uint32(lim), nil, + ) + require.NoError(t, err) + + acc := NewStreamAccumulator(params) + for i := 0; i < nQueries; i++ { + err := acc.Accumulate(context.Background(), logqlmodel.Result{ + Data: payloads[i], + }, i) + require.Nil(t, err) + } + + got, ok := acc.Result()[0].Data.(logqlmodel.Streams) + require.Equal(t, true, ok) + require.Equal(t, int64(nQueries), acc.Result()[0].Statistics.Summary.Shards) + + // each stream should have the top 3 entries + for i := 0; i < streamsPerQuery; i++ { + stream := got[i] + require.Equal(t, fmt.Sprintf(`{n="%d"}`, i), stream.Labels, "correct labels") + ln := lim / streamsPerQuery + require.Equal(t, ln, len(stream.Entries), "correct number of entries in stream") + switch direction { + case logproto.BACKWARD: + for i := 0; i < ln; i++ { + offset := delta*nQueries - 1 - i + require.Equal(t, time.Unix(int64(offset), 0), stream.Entries[i].Timestamp, "correct timestamp") + } + default: + for i := 0; i < ln; i++ { + offset := i + require.Equal(t, time.Unix(int64(offset), 0), stream.Entries[i].Timestamp, "correct timestamp") + } + } + } + }) + } +} + +func BenchmarkAccumulator(b *testing.B) { + + // dummy params. Only need to populate direction & limit + lim := 30 + params, err := NewLiteralParams( + `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, logproto.BACKWARD, uint32(lim), nil, + ) + require.NoError(b, err) + + for acc, tc := range map[string]struct { + results []logqlmodel.Result + newAcc func(Params, []logqlmodel.Result) Accumulator + params Params + }{ + "streams": { + newStreamResults(), + func(p Params, _ []logqlmodel.Result) Accumulator { + return NewStreamAccumulator(p) + }, + params, + }, + "quantile sketches": { + newQuantileSketchResults(), + func(p Params, _ []logqlmodel.Result) Accumulator { + return newQuantileSketchAccumulator() + }, + params, + }, + } { + b.Run(acc, func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + + acc := tc.newAcc(params, tc.results) + for i, r := range tc.results { + err := acc.Accumulate(context.Background(), r, i) + require.Nil(b, err) + } + + acc.Result() + } + }) + } +} + +func newStreamResults() []logqlmodel.Result { + nQueries := 50 + delta := 100 // 10 entries per stream, 1s apart + streamsPerQuery := 50 + + results := make([]logqlmodel.Result, nQueries) + for i := 0; i < nQueries; i++ { + start := i * delta + end := start + delta + streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, streamsPerQuery, logproto.BACKWARD) + var res logqlmodel.Streams + for i := range streams { + res = append(res, *streams[i]) + } + results[i] = logqlmodel.Result{Data: res} + + } + + return results +} + +func newQuantileSketchResults() []logqlmodel.Result { + results := make([]logqlmodel.Result, 100) + + for r := range results { + vectors := make([]ProbabilisticQuantileVector, 10) + for i := range vectors { + vectors[i] = make(ProbabilisticQuantileVector, 10) + for j := range vectors[i] { + vectors[i][j] = ProbabilisticQuantileSample{ + T: int64(i), + F: newRandomSketch(), + Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}}, + } + } + } + results[r] = logqlmodel.Result{Data: ProbabilisticQuantileMatrix(vectors)} + } + + return results +} + +func newStreamWithDirection(start, end time.Time, delta time.Duration, ls string, direction logproto.Direction) *logproto.Stream { + s := &logproto.Stream{ + Labels: ls, + } + for t := start; t.Before(end); t = t.Add(delta) { + s.Entries = append(s.Entries, logproto.Entry{ + Timestamp: t, + Line: fmt.Sprintf("%d", t.Unix()), + }) + } + if direction == logproto.BACKWARD { + // simulate data coming in reverse order (logproto.BACKWARD) + for i, j := 0, len(s.Entries)-1; i < j; i, j = i+1, j-1 { + s.Entries[i], s.Entries[j] = s.Entries[j], s.Entries[i] + } + } + return s +} + +func newStreams(start, end time.Time, delta time.Duration, n int, direction logproto.Direction) (res []*logproto.Stream) { + for i := 0; i < n; i++ { + res = append(res, newStreamWithDirection(start, end, delta, fmt.Sprintf(`{n="%d"}`, i), direction)) + } + return res +} + +func newRandomSketch() sketch.QuantileSketch { + r := rand.New(rand.NewSource(42)) + s := sketch.NewDDSketch() + for i := 0; i < 1000; i++ { + _ = s.Add(r.Float64()) + } + return s +} diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 76594dc040c22..e29b47054fea6 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -244,7 +244,13 @@ type Resp struct { // Downstreamer is an interface for deferring responsibility for query execution. // It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs. type Downstreamer interface { - Downstream(context.Context, []DownstreamQuery) ([]logqlmodel.Result, error) + Downstream(context.Context, []DownstreamQuery, Accumulator) ([]logqlmodel.Result, error) +} + +// Accumulator is an interface for accumulating query results. +type Accumulator interface { + Accumulate(context.Context, logqlmodel.Result, int) error + Result() []logqlmodel.Result } // DownstreamEvaluator is an evaluator which handles shard aware AST nodes @@ -254,8 +260,8 @@ type DownstreamEvaluator struct { } // Downstream runs queries and collects stats from the embedded Downstreamer -func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) { - results, err := ev.Downstreamer.Downstream(ctx, queries) +func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []DownstreamQuery, acc Accumulator) ([]logqlmodel.Result, error) { + results, err := ev.Downstreamer.Downstream(ctx, queries, acc) if err != nil { return nil, err } @@ -314,12 +320,13 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( if e.shard != nil { shards = append(shards, *e.shard) } + acc := NewBufferedAccumulator(1) results, err := ev.Downstream(ctx, []DownstreamQuery{{ Params: ParamsWithShardsOverride{ Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.SampleExpr}, ShardsOverride: Shards(shards).Encode(), }, - }}) + }}, acc) if err != nil { return nil, err } @@ -339,7 +346,8 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( cur = cur.next } - results, err := ev.Downstream(ctx, queries) + acc := NewBufferedAccumulator(len(queries)) + results, err := ev.Downstream(ctx, queries, acc) if err != nil { return nil, err } @@ -379,7 +387,8 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( } } - results, err := ev.Downstream(ctx, queries) + acc := newQuantileSketchAccumulator() + results, err := ev.Downstream(ctx, queries, acc) if err != nil { return nil, err } @@ -413,12 +422,13 @@ func (ev *DownstreamEvaluator) NewIterator( if e.shard != nil { shards = append(shards, *e.shard) } + acc := NewStreamAccumulator(params) results, err := ev.Downstream(ctx, []DownstreamQuery{{ Params: ParamsWithShardsOverride{ Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.LogSelectorExpr}, ShardsOverride: shards.Encode(), }, - }}) + }}, acc) if err != nil { return nil, err } @@ -438,7 +448,8 @@ func (ev *DownstreamEvaluator) NewIterator( cur = cur.next } - results, err := ev.Downstream(ctx, queries) + acc := NewStreamAccumulator(params) + results, err := ev.Downstream(ctx, queries, acc) if err != nil { return nil, err } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 82442e09bf60d..72b8429e11bf9 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -215,7 +215,7 @@ type MockDownstreamer struct { func (m MockDownstreamer) Downstreamer(_ context.Context) Downstreamer { return m } -func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) { +func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery, _ Accumulator) ([]logqlmodel.Result, error) { results := make([]logqlmodel.Result, 0, len(queries)) for _, query := range queries { res, err := m.Query(query.Params).Exec(ctx) diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index d8514e8a4ee75..31f8997ed767e 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -1,12 +1,9 @@ package queryrange import ( - "container/heap" "context" "fmt" "reflect" - "sort" - "time" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" @@ -16,14 +13,10 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" - "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" - "github.com/grafana/loki/pkg/logqlmodel/metadata" - "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/plan" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" "github.com/grafana/loki/pkg/util/spanlogger" ) @@ -103,8 +96,8 @@ type instance struct { handler queryrangebase.Handler } -func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery) ([]logqlmodel.Result, error) { - return in.For(ctx, queries, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) { +func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery, acc logql.Accumulator) ([]logqlmodel.Result, error) { + return in.For(ctx, queries, acc, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) { req := ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String()) sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance") defer sp.Finish() @@ -124,6 +117,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue func (in instance) For( ctx context.Context, queries []logql.DownstreamQuery, + acc logql.Accumulator, fn func(logql.DownstreamQuery) (logqlmodel.Result, error), ) ([]logqlmodel.Result, error) { ctx, cancel := context.WithCancel(ctx) @@ -159,12 +153,11 @@ func (in instance) For( close(ch) }() - acc := newDownstreamAccumulator(queries[0].Params, len(queries)) for resp := range ch { if resp.Err != nil { return nil, resp.Err } - if err := acc.Accumulate(ctx, resp.I, resp.Res); err != nil { + if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil { return nil, err } } @@ -210,407 +203,3 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value { } return xs } - -// downstreamAccumulator is one of three variants: -// a logsAccumulator, a bufferedAccumulator, or a quantileSketchAccumulator. -// Which variant is detected on the first call to Accumulate. -// Metric queries, which are generally small payloads, are buffered -// since the memory overhead is negligible. -// Log queries, sharded thousands of times and each returning -// results, can be _considerably_ larger. In this case, we eagerly -// accumulate the results into a logsAccumulator, discarding values -// over the limit to keep memory pressure down while other subqueries -// are executing. -// Sharded probabilistic quantile query results are merged as they come in. -type downstreamAccumulator struct { - acc resultAccumulator - params logql.Params - n int // number of queries, used to build slice size -} - -type resultAccumulator interface { - Accumulate(logqlmodel.Result, int) error - Result() []logqlmodel.Result -} - -func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccumulator { - return &downstreamAccumulator{params: params, n: nQueries} -} - -func (a *downstreamAccumulator) build(acc logqlmodel.Result) { - switch acc.Data.Type() { - case logqlmodel.ValueTypeStreams: - - // the stream accumulator stores a heap with reversed order - // from the results we expect, so we need to reverse the direction - direction := logproto.FORWARD - if a.params.Direction() == logproto.FORWARD { - direction = logproto.BACKWARD - } - - a.acc = newStreamAccumulator(direction, int(a.params.Limit())) - case logql.QuantileSketchMatrixType: - a.acc = newQuantileSketchAccumulator() - default: - a.acc = &bufferedAccumulator{ - results: make([]logqlmodel.Result, a.n), - } - - } -} - -func (a *downstreamAccumulator) Accumulate(_ context.Context, index int, acc logqlmodel.Result) error { - // on first pass, determine which accumulator to use - if a.acc == nil { - a.build(acc) - } - - return a.acc.Accumulate(acc, index) -} - -func (a *downstreamAccumulator) Result() []logqlmodel.Result { - if a.acc == nil { - return nil - } - return a.acc.Result() - -} - -type bufferedAccumulator struct { - results []logqlmodel.Result -} - -func (a *bufferedAccumulator) Accumulate(acc logqlmodel.Result, i int) error { - a.results[i] = acc - return nil -} - -func (a *bufferedAccumulator) Result() []logqlmodel.Result { - return a.results -} - -type quantileSketchAccumulator struct { - matrix logql.ProbabilisticQuantileMatrix -} - -func newQuantileSketchAccumulator() *quantileSketchAccumulator { - return &quantileSketchAccumulator{} -} - -func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error { - if res.Data.Type() != logql.QuantileSketchMatrixType { - return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType) - } - data, ok := res.Data.(logql.ProbabilisticQuantileMatrix) - if !ok { - return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) - } - if a.matrix == nil { - a.matrix = data - return nil - } - - var err error - a.matrix, err = a.matrix.Merge(data) - return err -} - -func (a *quantileSketchAccumulator) Result() []logqlmodel.Result { - return []logqlmodel.Result{{Data: a.matrix}} -} - -// heap impl for keeping only the top n results across m streams -// importantly, accumulatedStreams is _bounded_, so it will only -// store the top `limit` results across all streams. -// To implement this, we use a min-heap when looking -// for the max values (logproto.FORWARD) -// and vice versa for logproto.BACKWARD. -// This allows us to easily find the 'worst' value -// and replace it with a better one. -// Once we've fully processed all log lines, -// we return the heap in opposite order and then reverse it -// to get the correct order. -// Heap implements container/heap.Interface -// solely to use heap.Interface as a library. -// It is not intended for the heap pkg functions -// to otherwise call this type. -type accumulatedStreams struct { - count, limit int - labelmap map[string]int - streams []*logproto.Stream - order logproto.Direction - - stats stats.Result // for accumulating statistics from downstream requests - headers map[string][]string // for accumulating headers from downstream requests -} - -func newStreamAccumulator(order logproto.Direction, limit int) *accumulatedStreams { - return &accumulatedStreams{ - labelmap: make(map[string]int), - order: order, - limit: limit, - - headers: make(map[string][]string), - } -} - -// returns the top priority -func (acc *accumulatedStreams) top() (time.Time, bool) { - if len(acc.streams) > 0 && len(acc.streams[0].Entries) > 0 { - return acc.streams[0].Entries[len(acc.streams[0].Entries)-1].Timestamp, true - } - return time.Time{}, false -} - -func (acc *accumulatedStreams) Find(labels string) (int, bool) { - i, ok := acc.labelmap[labels] - return i, ok -} - -// number of streams -func (acc *accumulatedStreams) Len() int { return len(acc.streams) } - -func (acc *accumulatedStreams) Swap(i, j int) { - // for i=0, j=1 - - // {'a': 0, 'b': 1} - // [a, b] - acc.streams[i], acc.streams[j] = acc.streams[j], acc.streams[i] - // {'a': 0, 'b': 1} - // [b, a] - acc.labelmap[acc.streams[i].Labels] = i - acc.labelmap[acc.streams[j].Labels] = j - // {'a': 1, 'b': 0} - // [b, a] -} - -// first order by timestamp, then by labels -func (acc *accumulatedStreams) Less(i, j int) bool { - // order by the 'oldest' entry in the stream - if a, b := acc.streams[i].Entries[len(acc.streams[i].Entries)-1].Timestamp, acc.streams[j].Entries[len(acc.streams[j].Entries)-1].Timestamp; !a.Equal(b) { - return acc.less(a, b) - } - return acc.streams[i].Labels <= acc.streams[j].Labels -} - -func (acc *accumulatedStreams) less(a, b time.Time) bool { - // use after for stable sort - if acc.order == logproto.FORWARD { - return !a.After(b) - } - return !b.After(a) -} - -func (acc *accumulatedStreams) Push(x any) { - s := x.(*logproto.Stream) - if len(s.Entries) == 0 { - return - } - - if room := acc.limit - acc.count; room >= len(s.Entries) { - if i, ok := acc.Find(s.Labels); ok { - // stream already exists, append entries - - // these are already guaranteed to be sorted - // Reasoning: we shard subrequests so each stream exists on only one - // shard. Therefore, the only time a stream should already exist - // is in successive splits, which are already guaranteed to be ordered - // and we can just append. - acc.appendTo(acc.streams[i], s) - - return - } - - // new stream - acc.addStream(s) - return - } - - // there's not enough room for all the entries, - // so we need to - acc.push(s) -} - -// there's not enough room for all the entries. -// since we store them in a reverse heap relative to what we _want_ -// (i.e. the max value for FORWARD, the min value for BACKWARD), -// we test if the new entry is better than the worst entry, -// swapping them if so. -func (acc *accumulatedStreams) push(s *logproto.Stream) { - worst, ok := acc.top() - room := min(acc.limit-acc.count, len(s.Entries)) - - if !ok { - if room == 0 { - // special case: limit must be zero since there's no room and no worst entry - return - } - s.Entries = s.Entries[:room] - // special case: there are no entries in the heap. Push entries up to the limit - acc.addStream(s) - return - } - - // since entries are sorted by timestamp from best -> worst, - // we can discard the entire stream if the incoming best entry - // is worse than the worst entry in the heap. - cutoff := sort.Search(len(s.Entries), func(i int) bool { - // TODO(refactor label comparison -- should be in another fn) - if worst.Equal(s.Entries[i].Timestamp) { - return acc.streams[0].Labels < s.Labels - } - return acc.less(s.Entries[i].Timestamp, worst) - }) - s.Entries = s.Entries[:cutoff] - - for i := 0; i < len(s.Entries) && acc.less(worst, s.Entries[i].Timestamp); i++ { - - // push one entry at a time - room = acc.limit - acc.count - // pop if there's no room to make the heap small enough for an append; - // in the short path of Push() we know that there's room for at least one entry - if room == 0 { - acc.Pop() - } - - cpy := *s - cpy.Entries = []logproto.Entry{s.Entries[i]} - acc.Push(&cpy) - - // update worst - worst, _ = acc.top() - } -} - -func (acc *accumulatedStreams) addStream(s *logproto.Stream) { - // ensure entries conform to order we expect - // TODO(owen-d): remove? should be unnecessary since we insert in appropriate order - // but it's nice to have the safeguard - sort.Slice(s.Entries, func(i, j int) bool { - return acc.less(s.Entries[j].Timestamp, s.Entries[i].Timestamp) - }) - - acc.streams = append(acc.streams, s) - i := len(acc.streams) - 1 - acc.labelmap[s.Labels] = i - acc.count += len(s.Entries) - heap.Fix(acc, i) -} - -// dst must already exist in acc -func (acc *accumulatedStreams) appendTo(dst, src *logproto.Stream) { - // these are already guaranteed to be sorted - // Reasoning: we shard subrequests so each stream exists on only one - // shard. Therefore, the only time a stream should already exist - // is in successive splits, which are already guaranteed to be ordered - // and we can just append. - - var needsSort bool - for _, e := range src.Entries { - // sort if order has broken - if len(dst.Entries) > 0 && acc.less(dst.Entries[len(dst.Entries)-1].Timestamp, e.Timestamp) { - needsSort = true - } - dst.Entries = append(dst.Entries, e) - } - - if needsSort { - sort.Slice(dst.Entries, func(i, j int) bool { - // store in reverse order so we can more reliably insert without sorting and pop from end - return acc.less(dst.Entries[j].Timestamp, dst.Entries[i].Timestamp) - }) - } - - acc.count += len(src.Entries) - heap.Fix(acc, acc.labelmap[dst.Labels]) - -} - -// Pop returns a stream with one entry. It pops the first entry of the first stream -func (acc *accumulatedStreams) Pop() any { - n := acc.Len() - if n == 0 { - return nil - } - - stream := acc.streams[0] - cpy := *stream - cpy.Entries = []logproto.Entry{cpy.Entries[len(stream.Entries)-1]} - stream.Entries = stream.Entries[:len(stream.Entries)-1] - - acc.count-- - - if len(stream.Entries) == 0 { - // remove stream - acc.Swap(0, n-1) - acc.streams[n-1] = nil // avoid leaking reference - delete(acc.labelmap, stream.Labels) - acc.streams = acc.streams[:n-1] - - } - - if acc.Len() > 0 { - heap.Fix(acc, 0) - } - - return &cpy -} - -// Note: can only be called once as it will alter stream ordreing. -func (acc *accumulatedStreams) Result() []logqlmodel.Result { - // sort streams by label - sort.Slice(acc.streams, func(i, j int) bool { - return acc.streams[i].Labels < acc.streams[j].Labels - }) - - streams := make(logqlmodel.Streams, 0, len(acc.streams)) - - for _, s := range acc.streams { - // sort entries by timestamp, inversely based on direction - sort.Slice(s.Entries, func(i, j int) bool { - return acc.less(s.Entries[j].Timestamp, s.Entries[i].Timestamp) - }) - streams = append(streams, *s) - } - - res := logqlmodel.Result{ - // stats & headers are already aggregated in the context - Data: streams, - Statistics: acc.stats, - Headers: make([]*definitions.PrometheusResponseHeader, 0, len(acc.headers)), - } - - for name, vals := range acc.headers { - res.Headers = append( - res.Headers, - &definitions.PrometheusResponseHeader{ - Name: name, - Values: vals, - }, - ) - } - - return []logqlmodel.Result{res} -} - -func (acc *accumulatedStreams) Accumulate(x logqlmodel.Result, _ int) error { - // TODO(owen-d/ewelch): Shard counts should be set by the querier - // so we don't have to do it in tricky ways in multiple places. - // See pkg/logql/downstream.go:DownstreamEvaluator.Downstream - // for another example. - if x.Statistics.Summary.Shards == 0 { - x.Statistics.Summary.Shards = 1 - } - acc.stats.Merge(x.Statistics) - metadata.ExtendHeaders(acc.headers, x.Headers) - - switch got := x.Data.(type) { - case logqlmodel.Streams: - for i := range got { - acc.Push(&got[i]) - } - default: - return fmt.Errorf("unexpected response type during response result accumulation. Got (%T), wanted %s", got, logqlmodel.ValueTypeStreams) - } - return nil -} diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index 007166c30c305..a23f2a381b007 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -3,8 +3,6 @@ package queryrange import ( "context" "errors" - "fmt" - "math/rand" "strconv" "strings" "sync" @@ -19,7 +17,6 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/logql/sketch" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -250,8 +247,10 @@ func TestInstanceFor(t *testing.T) { var mtx sync.Mutex var ct int + acc := logql.NewBufferedAccumulator(len(queries)) + // ensure we can execute queries that number more than the parallelism parameter - _, err := in.For(context.TODO(), queries, func(_ logql.DownstreamQuery) (logqlmodel.Result, error) { + _, err := in.For(context.TODO(), queries, acc, func(_ logql.DownstreamQuery) (logqlmodel.Result, error) { mtx.Lock() defer mtx.Unlock() ct++ @@ -266,7 +265,7 @@ func TestInstanceFor(t *testing.T) { // ensure an early error abandons the other queues queries in = mkIn() ct = 0 - _, err = in.For(context.TODO(), queries, func(_ logql.DownstreamQuery) (logqlmodel.Result, error) { + _, err = in.For(context.TODO(), queries, acc, func(_ logql.DownstreamQuery) (logqlmodel.Result, error) { mtx.Lock() defer mtx.Unlock() ct++ @@ -302,6 +301,7 @@ func TestInstanceFor(t *testing.T) { }, }, }, + logql.NewBufferedAccumulator(2), func(qry logql.DownstreamQuery) (logqlmodel.Result, error) { // Decode shard s := strings.Split(qry.Params.Shards()[0], "_") @@ -383,7 +383,7 @@ func TestInstanceDownstream(t *testing.T) { results, err := DownstreamHandler{ limits: fakeLimits{}, next: handler, - }.Downstreamer(context.Background()).Downstream(context.Background(), queries) + }.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries))) require.Equal(t, want, got) @@ -402,6 +402,7 @@ func TestCancelWhileWaitingResponse(t *testing.T) { in := mkIn() queries := make([]logql.DownstreamQuery, in.parallelism+1) + acc := logql.NewBufferedAccumulator(len(queries)) ctx, cancel := context.WithCancel(context.Background()) @@ -409,7 +410,7 @@ func TestCancelWhileWaitingResponse(t *testing.T) { // to prove it will exit when the context is canceled. b := atomic.NewBool(false) go func() { - _, _ = in.For(ctx, queries, func(_ logql.DownstreamQuery) (logqlmodel.Result, error) { + _, _ = in.For(ctx, queries, acc, func(_ logql.DownstreamQuery) (logqlmodel.Result, error) { // Intended to keep the For method from returning unless the context is canceled. time.Sleep(100 * time.Second) return logqlmodel.Result{}, nil @@ -443,250 +444,3 @@ func TestDownstreamerUsesCorrectParallelism(t *testing.T) { } require.Equal(t, l.maxQueryParallelism, ct) } - -func newStream(start, end time.Time, delta time.Duration, ls string, direction logproto.Direction) *logproto.Stream { - s := &logproto.Stream{ - Labels: ls, - } - for t := start; t.Before(end); t = t.Add(delta) { - s.Entries = append(s.Entries, logproto.Entry{ - Timestamp: t, - Line: fmt.Sprintf("%d", t.Unix()), - }) - } - if direction == logproto.BACKWARD { - // simulate data coming in reverse order (logproto.BACKWARD) - for i, j := 0, len(s.Entries)-1; i < j; i, j = i+1, j-1 { - s.Entries[i], s.Entries[j] = s.Entries[j], s.Entries[i] - } - } - return s -} - -func newStreams(start, end time.Time, delta time.Duration, n int, direction logproto.Direction) (res []*logproto.Stream) { - for i := 0; i < n; i++ { - res = append(res, newStream(start, end, delta, fmt.Sprintf(`{n="%d"}`, i), direction)) - } - return res -} - -func TestAccumulatedStreams(t *testing.T) { - lim := 30 - nStreams := 10 - start, end := 0, 10 - // for a logproto.BACKWARD query, we use a min heap based on FORWARD - // to store the _earliest_ timestamp of the _latest_ entries, up to `limit` - xs := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, nStreams, logproto.BACKWARD) - acc := newStreamAccumulator(logproto.FORWARD, lim) - for _, x := range xs { - acc.Push(x) - } - - for i := 0; i < lim; i++ { - got := acc.Pop().(*logproto.Stream) - require.Equal(t, fmt.Sprintf(`{n="%d"}`, i%nStreams), got.Labels) - exp := (nStreams*(end-start) - lim + i) / nStreams - require.Equal(t, time.Unix(int64(exp), 0), got.Entries[0].Timestamp) - } - -} - -func TestDownstreamAccumulatorSimple(t *testing.T) { - lim := 30 - start, end := 0, 10 - direction := logproto.BACKWARD - - streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, 10, direction) - x := make(logqlmodel.Streams, 0, len(streams)) - for _, s := range streams { - x = append(x, *s) - } - // dummy params. Only need to populate direction & limit - params, err := logql.NewLiteralParams( - `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, direction, uint32(lim), nil, - ) - require.NoError(t, err) - - acc := newDownstreamAccumulator(params, 1) - result := logqlmodel.Result{ - Data: x, - } - - require.Nil(t, acc.Accumulate(context.Background(), 0, result)) - - res := acc.Result()[0] - got, ok := res.Data.(logqlmodel.Streams) - require.Equal(t, true, ok) - require.Equal(t, 10, len(got), "correct number of streams") - - // each stream should have the top 3 entries - for i := 0; i < 10; i++ { - require.Equal(t, 3, len(got[i].Entries), "correct number of entries in stream") - for j := 0; j < 3; j++ { - require.Equal(t, time.Unix(int64(9-j), 0), got[i].Entries[j].Timestamp, "correct timestamp") - } - } -} - -// TestDownstreamAccumulatorMultiMerge simulates merging multiple -// sub-results from different queries. -func TestDownstreamAccumulatorMultiMerge(t *testing.T) { - for _, direction := range []logproto.Direction{logproto.BACKWARD, logproto.FORWARD} { - t.Run(direction.String(), func(t *testing.T) { - nQueries := 10 - delta := 10 // 10 entries per stream, 1s apart - streamsPerQuery := 10 - lim := 30 - - payloads := make([]logqlmodel.Streams, 0, nQueries) - for i := 0; i < nQueries; i++ { - start := i * delta - end := start + delta - streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, streamsPerQuery, direction) - var res logqlmodel.Streams - for i := range streams { - res = append(res, *streams[i]) - } - payloads = append(payloads, res) - - } - - // queries are always dispatched in the correct order. - // oldest time ranges first in the case of logproto.FORWARD - // and newest time ranges first in the case of logproto.BACKWARD - if direction == logproto.BACKWARD { - for i, j := 0, len(payloads)-1; i < j; i, j = i+1, j-1 { - payloads[i], payloads[j] = payloads[j], payloads[i] - } - } - - // dummy params. Only need to populate direction & limit - params, err := logql.NewLiteralParams( - `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, direction, uint32(lim), nil, - ) - require.NoError(t, err) - - acc := newDownstreamAccumulator(params, 1) - for i := 0; i < nQueries; i++ { - err := acc.Accumulate(context.Background(), i, logqlmodel.Result{ - Data: payloads[i], - }) - require.Nil(t, err) - } - - got, ok := acc.Result()[0].Data.(logqlmodel.Streams) - require.Equal(t, true, ok) - require.Equal(t, int64(nQueries), acc.Result()[0].Statistics.Summary.Shards) - - // each stream should have the top 3 entries - for i := 0; i < streamsPerQuery; i++ { - stream := got[i] - require.Equal(t, fmt.Sprintf(`{n="%d"}`, i), stream.Labels, "correct labels") - ln := lim / streamsPerQuery - require.Equal(t, ln, len(stream.Entries), "correct number of entries in stream") - switch direction { - case logproto.BACKWARD: - for i := 0; i < ln; i++ { - offset := delta*nQueries - 1 - i - require.Equal(t, time.Unix(int64(offset), 0), stream.Entries[i].Timestamp, "correct timestamp") - } - default: - for i := 0; i < ln; i++ { - offset := i - require.Equal(t, time.Unix(int64(offset), 0), stream.Entries[i].Timestamp, "correct timestamp") - } - } - } - }) - } -} - -func BenchmarkAccumulator(b *testing.B) { - - // dummy params. Only need to populate direction & limit - lim := 30 - params, err := logql.NewLiteralParams( - `{app="foo"}`, time.Time{}, time.Time{}, 0, 0, logproto.BACKWARD, uint32(lim), nil, - ) - require.NoError(b, err) - - for acc, tc := range map[string]struct { - results []logqlmodel.Result - params logql.Params - }{ - "streams": { - newStreamResults(), - params, - }, - "quantile sketches": { - newQuantileSketchResults(), - params, - }, - } { - b.Run(acc, func(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - for n := 0; n < b.N; n++ { - - acc := newDownstreamAccumulator(params, len(tc.results)) - for i, r := range tc.results { - err := acc.Accumulate(context.Background(), i, r) - require.Nil(b, err) - } - - acc.Result() - } - }) - } -} - -func newStreamResults() []logqlmodel.Result { - nQueries := 50 - delta := 100 // 10 entries per stream, 1s apart - streamsPerQuery := 50 - - results := make([]logqlmodel.Result, nQueries) - for i := 0; i < nQueries; i++ { - start := i * delta - end := start + delta - streams := newStreams(time.Unix(int64(start), 0), time.Unix(int64(end), 0), time.Second, streamsPerQuery, logproto.BACKWARD) - var res logqlmodel.Streams - for i := range streams { - res = append(res, *streams[i]) - } - results[i] = logqlmodel.Result{Data: res} - - } - - return results -} - -func newQuantileSketchResults() []logqlmodel.Result { - results := make([]logqlmodel.Result, 100) - - for r := range results { - vectors := make([]logql.ProbabilisticQuantileVector, 10) - for i := range vectors { - vectors[i] = make(logql.ProbabilisticQuantileVector, 10) - for j := range vectors[i] { - vectors[i][j] = logql.ProbabilisticQuantileSample{ - T: int64(i), - F: newRandomSketch(), - Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}}, - } - } - } - results[r] = logqlmodel.Result{Data: logql.ProbabilisticQuantileMatrix(vectors)} - } - - return results -} - -func newRandomSketch() sketch.QuantileSketch { - r := rand.New(rand.NewSource(42)) - s := sketch.NewDDSketch() - for i := 0; i < 1000; i++ { - _ = s.Add(r.Float64()) - } - return s -} diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index aed0e96e2b47d..652637a724655 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -3,7 +3,7 @@ package queryrange import ( "context" "fmt" - math "math" + "math" strings "strings" "time" @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" + utilMath "github.com/grafana/loki/pkg/util/math" "github.com/grafana/loki/pkg/util/spanlogger" "github.com/grafana/loki/pkg/util/validation" valid "github.com/grafana/loki/pkg/validation" @@ -231,7 +232,7 @@ func guessShardFactor(stats stats.Stats, maxBytesPerShard, maxShards int) int { // reset this edge case manually factor := int(math.Pow(2, power)) if maxShards > 0 { - factor = min(factor, maxShards) + factor = utilMath.Min(factor, maxShards) } // shortcut: no need to run any sharding logic when factor=1 @@ -241,10 +242,3 @@ func guessShardFactor(stats stats.Stats, maxBytesPerShard, maxShards int) int { } return factor } - -func min(a, b int) int { - if a < b { - return a - } - return b -}