From c10eedf4815e9706ba579a20a40d28898095507d Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 20 Feb 2024 12:40:29 -0800 Subject: [PATCH] backport! quantile sharding; fix bug related to # of steps (instant query mem usage) and use CollapsingLowestDense store for ddsketch (#11905) Signed-off-by: Callum Styan --- pkg/logql/downstream_test.go | 51 ++++++++++++++++++++- pkg/logql/engine.go | 2 +- pkg/logql/quantile_over_time_sketch.go | 8 +++- pkg/logql/quantile_over_time_sketch_test.go | 6 ++- pkg/logql/sketch/quantile.go | 2 +- 5 files changed, 62 insertions(+), 7 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index ec5f3170468d0..46575c44d8edb 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -146,7 +146,7 @@ func TestMappingEquivalenceSketches(t *testing.T) { regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) - t.Run(tc.query, func(t *testing.T) { + t.Run(tc.query+"_range", func(t *testing.T) { params, err := NewLiteralParams( tc.query, start, @@ -178,6 +178,40 @@ func TestMappingEquivalenceSketches(t *testing.T) { relativeError(t, res.Data.(promql.Matrix), shardedRes.Data.(promql.Matrix), tc.realtiveError) }) + t.Run(tc.query+"_instant", func(t *testing.T) { + // for an instant query we set the start and end to the same timestamp + // plus set step and interval to 0 + params, err := NewLiteralParams( + tc.query, + time.Unix(0, int64(rounds+1)), + time.Unix(0, int64(rounds+1)), + 0, + 0, + logproto.FORWARD, + uint32(limit), + nil, + ) + require.NoError(t, err) + qry := regular.Query(params) + ctx := user.InjectOrgID(context.Background(), "fake") + + mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics, []string{ShardQuantileOverTime}) + _, _, mapped, err := mapper.Parse(params.GetExpression()) + require.NoError(t, err) + + shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: mapped, + }) + + res, err := qry.Exec(ctx) + require.NoError(t, err) + + shardedRes, err := shardedQry.Exec(ctx) + require.NoError(t, err) + + relativeErrorVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector), tc.realtiveError) + }) } } @@ -546,6 +580,21 @@ func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) } } +func relativeErrorVector(t *testing.T, expected, actual promql.Vector, alpha float64) { + require.Len(t, actual, len(expected)) + + e := make([]float64, len(expected)) + a := make([]float64, len(expected)) + for i := 0; i < len(expected); i++ { + require.Equal(t, expected[i].Metric, actual[i].Metric) + + e[i] = expected[i].F + a[i] = expected[i].F + } + require.InEpsilonSlice(t, e, a, alpha) + +} + func TestFormat_ShardedExpr(t *testing.T) { oldMax := syntax.MaxCharsPerLine syntax.MaxCharsPerLine = 20 diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 8d951ad64c946..a9f3dabe14eed 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -363,7 +363,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries) case ProbabilisticQuantileVector: - return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params) + return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params) default: return nil, fmt.Errorf("unsupported result type: %T", r) } diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index 507c72b208ab8..24a8a05d89ede 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -262,13 +262,17 @@ func (r *quantileSketchBatchRangeVectorIterator) agg(samples []promql.FPoint) sk return s } -// JoinQuantileSketchVector joins the results from stepEvaluator into a ProbabilisticQuantileMatrix. -func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator, params Params) (promql_parser.Value, error) { +// MergeQuantileSketchVector joins the results from stepEvaluator into a ProbabilisticQuantileMatrix. +func MergeQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator, params Params) (promql_parser.Value, error) { vec := r.QuantileSketchVec() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } + if GetRangeType(params) == InstantType { + return ProbabilisticQuantileMatrix{vec}, nil + } + stepCount := int(math.Ceil(float64(params.End().Sub(params.Start()).Nanoseconds()) / float64(params.Step().Nanoseconds()))) if stepCount <= 0 { stepCount = 1 diff --git a/pkg/logql/quantile_over_time_sketch_test.go b/pkg/logql/quantile_over_time_sketch_test.go index dc1ff31f509a4..488ebdec26f06 100644 --- a/pkg/logql/quantile_over_time_sketch_test.go +++ b/pkg/logql/quantile_over_time_sketch_test.go @@ -69,7 +69,7 @@ func TestJoinQuantileSketchVectorError(t *testing.T) { ev := errorStepEvaluator{ err: errors.New("could not evaluate"), } - _, err := JoinQuantileSketchVector(true, result, ev, LiteralParams{}) + _, err := MergeQuantileSketchVector(true, result, ev, LiteralParams{}) require.ErrorContains(t, err, "could not evaluate") } @@ -136,7 +136,7 @@ func BenchmarkJoinQuantileSketchVector(b *testing.B) { iter: iter, } _, _, r := ev.Next() - m, err := JoinQuantileSketchVector(true, r.QuantileSketchVec(), ev, params) + m, err := MergeQuantileSketchVector(true, r.QuantileSketchVec(), ev, params) require.NoError(b, err) m.(ProbabilisticQuantileMatrix).Release() } @@ -148,7 +148,9 @@ func BenchmarkQuantileBatchRangeVectorIteratorAt(b *testing.B) { }{ {numberSamples: 1}, {numberSamples: 1_000}, + {numberSamples: 10_000}, {numberSamples: 100_000}, + {numberSamples: 1_000_000}, } { b.Run(fmt.Sprintf("%d-samples", tc.numberSamples), func(b *testing.B) { r := rand.New(rand.NewSource(42)) diff --git a/pkg/logql/sketch/quantile.go b/pkg/logql/sketch/quantile.go index 1fa20c38e5bcc..3b8b0f22fc8e0 100644 --- a/pkg/logql/sketch/quantile.go +++ b/pkg/logql/sketch/quantile.go @@ -47,7 +47,7 @@ const relativeAccuracy = 0.01 var ddsketchPool = sync.Pool{ New: func() any { m, _ := mapping.NewCubicallyInterpolatedMapping(relativeAccuracy) - return ddsketch.NewDDSketchFromStoreProvider(m, store.SparseStoreConstructor) + return ddsketch.NewDDSketch(m, store.NewCollapsingLowestDenseStore(2048), store.NewCollapsingLowestDenseStore(2048)) }, }