Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tune latency attribute buckets to reduce cardinality #2432

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, ca
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil))
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, logBucket(8, msToComplete)))
attrs = append(attrs, attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, asyncLogBucket(msToComplete)))
m.completed.Add(ctx, 1, metric.WithAttributes(attrs...))

m.queueDepth.Record(ctx, queueDepth)
Expand Down Expand Up @@ -155,7 +155,11 @@ func RetrieveTraceContextFromContext(ctx context.Context) ([]byte, error) {
}

func extractAsyncCallAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool) []attribute.KeyValue {
return append(extractRefAttrs(verb, catchVerb, origin, isCatching), attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, logBucket(8, timeSinceMS(scheduledAt))))
return append(extractRefAttrs(verb, catchVerb, origin, isCatching), attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, asyncLogBucket(timeSinceMS(scheduledAt))))
}

func asyncLogBucket(msToComplete int64) string {
return logBucket(4, msToComplete, optional.Some(4), optional.Some(6))
}

func extractRefAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, isCatching bool) []attribute.KeyValue {
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime
msToComplete := timeSinceMS(startTime)
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(callRunTimeBucketAttr, logBucket(2, msToComplete)))
attrs = append(attrs, attribute.String(callRunTimeBucketAttr, logBucket(4, msToComplete, optional.Some(3), optional.Some(7))))
m.requests.Add(ctx, 1, metric.WithAttributes(attrs...))
}
2 changes: 1 addition & 1 deletion backend/controller/observability/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func (m *IngressMetrics) Request(ctx context.Context, method string, path string
msToComplete := timeSinceMS(startTime)
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(ingressRunTimeBucketAttr, logBucket(2, msToComplete)))
attrs = append(attrs, attribute.String(ingressRunTimeBucketAttr, logBucket(4, msToComplete, optional.Some(3), optional.Some(7))))
m.requests.Add(ctx, 1, metric.WithAttributes(attrs...))
}
31 changes: 30 additions & 1 deletion backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"math"
"time"

"github.com/alecthomas/types/optional"
)

var (
Expand Down Expand Up @@ -54,14 +56,41 @@ func timeSinceMS(start time.Time) int64 {
//
// <1, [1-8), [8-64), [64-512), etc.
//
// The buckets are then demarcated by `min` and `max`, such that any `num` < `base`^`min`
// will be bucketed together into the min bucket, and similarly, any `num` >= `base`^`max`
// will go in the `max` bucket. This constrains output cardinality by chopping the long
// tails on both ends of the normal distribution and lumping them all into terminal
// buckets. When `min` and `max` are not provided, the effective `min` is 0, and there is
// no max.
//
// Go only supports a few bases with math.Log*, so this func performs a change of base:
// log_b(x) = log_a(x) / log_a(b)
func logBucket(base int, num int64) string {
func logBucket(base int, num int64, optMin, optMax optional.Option[int]) string {
if num < 1 {
return "<1"
}
b := float64(base)

// Check max
maxBucket, ok := optMax.Get()
if ok {
maxThreshold := int64(math.Pow(b, float64(maxBucket)))
if num >= maxThreshold {
return fmt.Sprintf(">=%d", maxThreshold)
}
}

// Check min
minBucket, ok := optMin.Get()
if ok {
minThreshold := int64(math.Pow(b, float64(minBucket)))
if num < minThreshold {
return fmt.Sprintf("<%d", minThreshold)
}
}

logB := math.Log(float64(num)) / math.Log(b)
bucketExpLo := math.Floor(logB)

return fmt.Sprintf("[%d,%d)", int(math.Pow(b, bucketExpLo)), int(math.Pow(b, bucketExpLo+1)))
}
50 changes: 49 additions & 1 deletion backend/controller/observability/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,98 @@ import (
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestLogBucket(t *testing.T) {
tests := []struct {
name string
base int
min optional.Option[int]
max optional.Option[int]
num int
want string
}{
// Without demarcating min/max buckets
{
name: "<1",
base: 8,
min: optional.None[int](),
max: optional.None[int](),
num: 0,
want: "<1",
},
{
name: "EqualLowEndOfRange",
base: 8,
min: optional.None[int](),
max: optional.None[int](),
num: 1,
want: "[1,8)",
},
{
name: "HigherEndOfRange",
base: 8,
min: optional.None[int](),
max: optional.None[int](),
num: 7,
want: "[1,8)",
},
{
name: "BigInputNum",
base: 8,
min: optional.None[int](),
max: optional.None[int](),
num: 16800000,
want: "[16777216,134217728)",
},
{
name: "Base2",
base: 2,
min: optional.None[int](),
max: optional.None[int](),
num: 8,
want: "[8,16)",
},

// With min/max buckets
{
name: "LessThanMin",
base: 2,
min: optional.Some(2),
max: optional.None[int](),
num: 3,
want: "<4",
},
{
name: "EqualToMax",
base: 2,
min: optional.None[int](),
max: optional.Some(2),
num: 4,
want: ">=4",
},
{
name: "EqualToMaxWhenMinMaxEqual",
base: 2,
min: optional.Some(2),
max: optional.Some(2),
num: 4,
want: ">=4",
},
{
name: "GreaterThanMax",
base: 2,
min: optional.Some(2),
max: optional.Some(2),
num: 4000,
want: ">=4",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.want, logBucket(test.base, int64(test.num)))
assert.Equal(t, test.want, logBucket(test.base, int64(test.num), test.min, test.max))
})
}
}
Loading