Skip to content

Commit

Permalink
Add more test cases to show how accurate the result for each item in the
Browse files Browse the repository at this point in the history
topk actually is.

Signed-off-by: Callum Styan <[email protected]>
cstyan committed Oct 4, 2024
1 parent ecc9050 commit 24edc7b
Showing 3 changed files with 71 additions and 26 deletions.
7 changes: 4 additions & 3 deletions pkg/logql/count_min_sketch.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,8 @@ const (
CountMinSketchVectorType = "CountMinSketchVector"

epsilon = 0.0001
delta = 0.05
// delta of 0.05 results in a sketch size of 325KB, 0.01 gives a sketch size of 543KB
delta = 0.05
)

// CountMinSketchVector tracks the count or sum of values of a metric, ie list of label value pairs. It's storage for
@@ -145,7 +146,7 @@ type HeapCountMinSketchVector struct {
}

func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCountMinSketchVector {
f, _ := sketch.NewCountMinSketchFromErroAndProbability(epsilon, delta)
f, _ := sketch.NewCountMinSketchFromErrorAndProbability(epsilon, delta)

if metricsLength >= maxLabels {
metricsLength = maxLabels
@@ -176,7 +177,7 @@ func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
heap.Fix(v, 0)
}

// The maximum number of labels has been reached, so drop the smalles element.
// The maximum number of labels has been reached, so drop the smallest element.
if len(v.Metrics) > v.maxLabels {
metric := heap.Pop(v).(labels.Labels)
delete(v.observed, metric.String())
88 changes: 66 additions & 22 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package logql

import (
"context"
"fmt"
"math"
"testing"
"time"
@@ -278,36 +279,77 @@ func TestMappingEquivalenceSketches(t *testing.T) {

func TestApproxTopkSketches(t *testing.T) {
var (
shards = 3
nStreams = 10_000
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true)
limit = 100
rounds = 20
limit = 100
)

for _, tc := range []struct {
labelShards int
totalStreams int
shardedQuery string
regularQuery string
realtiveError float64
}{
// Note:our data generation results in less spread between topk things for 10k streams than for 100k streams
// if we have 1k streams, we can get much more accurate results for topk 10 than topk 100
{
labelShards: 3,
totalStreams: 100,
shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0012,
},
{
labelShards: 10,
totalStreams: 100,
shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.002,
realtiveError: 0.005,
},
{
labelShards: 10,
totalStreams: 1_000,
shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0015,
},
{
labelShards: 100,
totalStreams: 1_000,
shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.022,
},
{
labelShards: 100,
totalStreams: 10_000,
shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.008,
},
{
labelShards: 100,
totalStreams: 100_000,
shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0015,
},
} {
q := NewMockQuerier(
shards,
streams,
)

opts := EngineOpts{
MaxCountMinSketchHeapSize: 10_000,
}
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())
t.Run(fmt.Sprintf("%s/%d/%d", tc.shardedQuery, tc.labelShards, tc.totalStreams), func(t *testing.T) {
streams := randomStreams(tc.totalStreams, rounds+1, tc.labelShards, []string{"a", "b", "c", "d"}, true)

q := NewMockQuerier(
tc.labelShards,
streams,
)

opts := EngineOpts{
MaxCountMinSketchHeapSize: 10_000,
}
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.shardedQuery, func(t *testing.T) {
// for an instant query we set the start and end to the same timestamp
// plus set step and interval to 0
params, err := NewLiteralParams(
@@ -325,7 +367,7 @@ func TestApproxTopkSketches(t *testing.T) {
qry := regular.Query(params.Copy())
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
strategy := NewPowerOfTwoStrategy(ConstantShards(tc.labelShards))
mapper := NewShardMapper(strategy, nilShardMetrics, []string{ShardQuantileOverTime, SupportApproxTopk})

params.queryString = tc.shardedQuery
@@ -346,7 +388,6 @@ func TestApproxTopkSketches(t *testing.T) {

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

relativeErrorVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector), tc.realtiveError)
})
}
@@ -743,12 +784,15 @@ func relativeErrorVector(t *testing.T, expected, actual promql.Vector, alpha flo

e := make([]float64, len(expected))
a := make([]float64, len(expected))
inTopk := 0
for i := 0; i < len(expected); i++ {
require.Equal(t, expected[i].Metric, actual[i].Metric)

e[i] = expected[i].F
a[i] = actual[i].F
if labels.Equal(expected[i].Metric, actual[i].Metric) {
e[i] = expected[i].F
a[i] = actual[i].F
inTopk++
}
}
require.True(t, float64(inTopk/len(expected)) > 0.9, "not enough of the real topk elements were in the output %f", float64(inTopk/len(expected)))
require.InEpsilonSlice(t, e, a, alpha)
}

2 changes: 1 addition & 1 deletion pkg/logql/sketch/cms.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) {
}, nil
}

func NewCountMinSketchFromErroAndProbability(epsilon float64, delta float64) (*CountMinSketch, error) {
func NewCountMinSketchFromErrorAndProbability(epsilon float64, delta float64) (*CountMinSketch, error) {
width := math.Ceil(math.E / epsilon)
depth := math.Ceil(math.Log(1.0 / delta))
return NewCountMinSketch(uint32(width), uint32(depth))

0 comments on commit 24edc7b

Please sign in to comment.