diff --git a/pkg/logql/count_min_sketch.go b/pkg/logql/count_min_sketch.go index 8c65115df05fa..59c366167a4d1 100644 --- a/pkg/logql/count_min_sketch.go +++ b/pkg/logql/count_min_sketch.go @@ -17,7 +17,8 @@ const ( CountMinSketchVectorType = "CountMinSketchVector" epsilon = 0.0001 - delta = 0.05 + // delta of 0.05 results in a sketch size of 325KB, 0.01 gives a sketch size of 543KB + delta = 0.05 ) // CountMinSketchVector tracks the count or sum of values of a metric, ie list of label value pairs. It's storage for @@ -145,7 +146,7 @@ type HeapCountMinSketchVector struct { } func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCountMinSketchVector { - f, _ := sketch.NewCountMinSketchFromErroAndProbability(epsilon, delta) + f, _ := sketch.NewCountMinSketchFromErrorAndProbability(epsilon, delta) if metricsLength >= maxLabels { metricsLength = maxLabels @@ -176,7 +177,7 @@ func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) { heap.Fix(v, 0) } - // The maximum number of labels has been reached, so drop the smalles element. + // The maximum number of labels has been reached, so drop the smallest element. if len(v.Metrics) > v.maxLabels { metric := heap.Pop(v).(labels.Labels) delete(v.observed, metric.String()) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 3aa240df685a5..e2254b8b5cca5 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -2,6 +2,7 @@ package logql import ( "context" + "fmt" "math" "testing" "time" @@ -278,36 +279,77 @@ func TestMappingEquivalenceSketches(t *testing.T) { func TestApproxTopkSketches(t *testing.T) { var ( - shards = 3 - nStreams = 10_000 - rounds = 20 - streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true) - limit = 100 + rounds = 20 + limit = 100 ) for _, tc := range []struct { + labelShards int + totalStreams int shardedQuery string regularQuery string realtiveError float64 }{ + // Note:our data generation results in less spread between topk things for 10k streams than for 100k streams + // if we have 1k streams, we can get much more accurate results for topk 10 than topk 100 + { + labelShards: 3, + totalStreams: 100, + shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + realtiveError: 0.0012, + }, { + labelShards: 10, + totalStreams: 100, shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, - realtiveError: 0.002, + realtiveError: 0.005, + }, + { + labelShards: 10, + totalStreams: 1_000, + shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + realtiveError: 0.0015, + }, + { + labelShards: 100, + totalStreams: 1_000, + shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + realtiveError: 0.022, + }, + { + labelShards: 100, + totalStreams: 10_000, + shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + realtiveError: 0.008, + }, + { + labelShards: 100, + totalStreams: 100_000, + shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, + realtiveError: 0.0015, }, } { - q := NewMockQuerier( - shards, - streams, - ) - opts := EngineOpts{ - MaxCountMinSketchHeapSize: 10_000, - } - regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) - sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) + t.Run(fmt.Sprintf("%s/%d/%d", tc.shardedQuery, tc.labelShards, tc.totalStreams), func(t *testing.T) { + streams := randomStreams(tc.totalStreams, rounds+1, tc.labelShards, []string{"a", "b", "c", "d"}, true) + + q := NewMockQuerier( + tc.labelShards, + streams, + ) + + opts := EngineOpts{ + MaxCountMinSketchHeapSize: 10_000, + } + regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) + sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) - t.Run(tc.shardedQuery, func(t *testing.T) { // for an instant query we set the start and end to the same timestamp // plus set step and interval to 0 params, err := NewLiteralParams( @@ -325,7 +367,7 @@ func TestApproxTopkSketches(t *testing.T) { qry := regular.Query(params.Copy()) ctx := user.InjectOrgID(context.Background(), "fake") - strategy := NewPowerOfTwoStrategy(ConstantShards(shards)) + strategy := NewPowerOfTwoStrategy(ConstantShards(tc.labelShards)) mapper := NewShardMapper(strategy, nilShardMetrics, []string{ShardQuantileOverTime, SupportApproxTopk}) params.queryString = tc.shardedQuery @@ -346,7 +388,6 @@ func TestApproxTopkSketches(t *testing.T) { shardedRes, err := shardedQry.Exec(ctx) require.NoError(t, err) - relativeErrorVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector), tc.realtiveError) }) } @@ -743,12 +784,15 @@ func relativeErrorVector(t *testing.T, expected, actual promql.Vector, alpha flo e := make([]float64, len(expected)) a := make([]float64, len(expected)) + inTopk := 0 for i := 0; i < len(expected); i++ { - require.Equal(t, expected[i].Metric, actual[i].Metric) - - e[i] = expected[i].F - a[i] = actual[i].F + if labels.Equal(expected[i].Metric, actual[i].Metric) { + e[i] = expected[i].F + a[i] = actual[i].F + inTopk++ + } } + require.True(t, float64(inTopk/len(expected)) > 0.9, "not enough of the real topk elements were in the output %f", float64(inTopk/len(expected))) require.InEpsilonSlice(t, e, a, alpha) } diff --git a/pkg/logql/sketch/cms.go b/pkg/logql/sketch/cms.go index e53f7c9aa2789..e980b1a1b26d8 100644 --- a/pkg/logql/sketch/cms.go +++ b/pkg/logql/sketch/cms.go @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) { }, nil } -func NewCountMinSketchFromErroAndProbability(epsilon float64, delta float64) (*CountMinSketch, error) { +func NewCountMinSketchFromErrorAndProbability(epsilon float64, delta float64) (*CountMinSketch, error) { width := math.Ceil(math.E / epsilon) depth := math.Ceil(math.Log(1.0 / delta)) return NewCountMinSketch(uint32(width), uint32(depth))