Skip to content

Commit

Permalink
add rate handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong committed Feb 22, 2024
1 parent dba1bb1 commit fc8fe39
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

- Add sampling handler for sampling records at request level (#3).
- Add handler to emit JSON logs to GCP Cloud Logging (#6).
- Add handler to limit records with give rate (#8).
63 changes: 63 additions & 0 deletions rate/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2024 The sloth authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package rate

import (
"log/slog"
"sync/atomic"
"time"
)

const (
gapPerLevel = slog.LevelError - slog.LevelWarn
levels = (slog.LevelError-slog.LevelDebug)/gapPerLevel + 1
countersPerLevel = 4096
)

type counters [levels][countersPerLevel]counter

func (c *counters) get(level slog.Level, key string) *counter {
i := (min(slog.LevelDebug, max(slog.LevelError, level)) - slog.LevelDebug) / gapPerLevel
j := fnv32a(key) % countersPerLevel

return &c[i][j]
}

func fnv32a(str string) uint32 {
const (
offset32 = 2166136261
prime32 = 16777619
)
hash := uint32(offset32)
for i := 0; i < len(str); i++ {
hash ^= uint32(str[i])
hash *= prime32
}

return hash
}

type counter struct {
resetAt atomic.Int64
counter atomic.Uint64
}

func (c *counter) Inc(t time.Time, tick time.Duration) uint64 {
now := t.UnixNano()
resetAfter := c.resetAt.Load()
if resetAfter > now {
return c.counter.Add(1)
}

c.counter.Store(1)

newResetAfter := now + tick.Nanoseconds()
if !c.resetAt.CompareAndSwap(resetAfter, newResetAfter) {
// We raced with another goroutine trying to reset, and it also reset
// the counter to 1, so we need to reincrement the counter.
return c.counter.Add(1)

Check warning on line 59 in rate/counter.go

View check run for this annotation

Codecov / codecov/patch

rate/counter.go#L59

Added line #L59 was not covered by tests
}

return 1
}
41 changes: 41 additions & 0 deletions rate/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2024 The sloth authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package rate

import "time"

// WithFirst provides N that logs the first N records with a given level and message each interval.
//
// If the first N is 0, the handler assumes 100.
func WithFirst(first uint64) Option {
return func(options *options) {
options.first = first
}
}

// WithEvery provides M that logs every Mth record after first N records
// with a given level and message each interval.
// If M is 0, it will drop all log records after the first N in that interval.
//
// The default M is 100.
func WithEvery(every uint64) Option {
return func(options *options) {
options.every = every
}
}

// WithInterval provides the interval for rate limiting.
//
// If the interval is <= 0, the handler assumes 1 second.
func WithInterval(interval time.Duration) Option {
return func(options *options) {
options.interval = interval
}
}

type (
// Option configures the Handler with specific options.
Option func(*options)
options Handler
)
85 changes: 85 additions & 0 deletions rate/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2024 The sloth authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

/*
Package rate provides a Handler that limits records with give rate.
It caps the CPU and I/O load of logging while attempting to preserve a representative subset of your logs.
It logs the first N records with a given level and message each interval.
If more records with the same level and message are seen during the same interval,
every Mth message is logged and the rest are dropped.
Keep in mind that the implementation is optimized for speed over absolute precision;
under load, each tick may be slightly over- or under-sampled.
*/
package rate

import (
"context"
"log/slog"
"time"
)

// Handler limits records with give rate, which caps the CPU and I/O load
// of logging while attempting to preserve a representative subset of your logs.
//
// To create a new Handler, call [New].
type Handler struct {
handler slog.Handler

interval time.Duration
first uint64
every uint64

counts *counters
}

// New creates a new Handler with the given Option(s).
func New(handler slog.Handler, opts ...Option) Handler {
if handler == nil {
panic("cannot create Handler with nil handler")
}

option := &options{
handler: handler,
counts: &counters{},
every: 100, //nolint:gomnd
}
for _, opt := range opts {
opt(option)
}
if option.interval <= 0 {
option.interval = time.Second
}
if option.first == 0 {
option.first = 100
}

return Handler(*option)
}

func (h Handler) Enabled(ctx context.Context, level slog.Level) bool {
return h.handler.Enabled(ctx, level)
}

func (h Handler) Handle(ctx context.Context, record slog.Record) error {
count := h.counts.get(record.Level, record.Message)
n := count.Inc(record.Time, h.interval)
if n > h.first && (h.every == 0 || (n-h.first)%h.every != 0) {
return nil
}

return h.handler.Handle(ctx, record)
}

func (h Handler) WithAttrs(attrs []slog.Attr) slog.Handler {
h.handler = h.handler.WithAttrs(attrs)

return h
}

func (h Handler) WithGroup(name string) slog.Handler {
h.handler = h.handler.WithGroup(name)

return h
}
146 changes: 146 additions & 0 deletions rate/rate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2024 The sloth authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package rate_test

import (
"bytes"
"context"
"log/slog"
"sync"
"testing"
"time"

"github.com/nil-go/sloth/internal/assert"
"github.com/nil-go/sloth/rate"
)

func TestNew_panic(t *testing.T) {
t.Parallel()

defer func() {
assert.Equal(t, "cannot create Handler with nil handler", recover().(string))
}()

rate.New(nil)
t.Fail()
}

func TestHandler(t *testing.T) {
t.Parallel()

testcases := []struct {
description string
level slog.Level
expected string
}{
{
description: "level error",
level: slog.LevelError,
expected: "level=ERROR msg=msg pos=first\n" +
"level=ERROR msg=msg pos=second\n" +
"level=ERROR msg=msg pos=fourth\n" +
"level=ERROR msg=msg g.pos=after\n",
},
{
description: "level warn",
level: slog.LevelWarn,
expected: "level=WARN msg=msg pos=first\n" +
"level=WARN msg=msg pos=second\n" +
"level=WARN msg=msg pos=fourth\n" +
"level=WARN msg=msg g.pos=after\n",
},
{
description: "level info",
level: slog.LevelInfo,
expected: "level=INFO msg=msg pos=first\n" +
"level=INFO msg=msg pos=second\n" +
"level=INFO msg=msg pos=fourth\n" +
"level=INFO msg=msg g.pos=after\n",
},
{
description: "level debug",
level: slog.LevelDebug,
expected: "level=DEBUG msg=msg pos=first\n" +
"level=DEBUG msg=msg pos=second\n" +
"level=DEBUG msg=msg pos=fourth\n" +
"level=DEBUG msg=msg g.pos=after\n",
},
}

for _, testcase := range testcases {
testcase := testcase

t.Run(testcase.description, func(t *testing.T) {
t.Parallel()

buf := &bytes.Buffer{}
handler := rate.New(
slog.NewTextHandler(buf, &slog.HandlerOptions{
Level: slog.LevelDebug,
ReplaceAttr: func(groups []string, attr slog.Attr) slog.Attr {
if len(groups) == 0 && attr.Key == slog.TimeKey {
return slog.Attr{}
}

return attr
},
}),
rate.WithFirst(2),
rate.WithEvery(2),
rate.WithInterval(time.Second),
)
logger := slog.New(handler)
ctx := context.Background()

logger.Log(ctx, testcase.level, "msg", "pos", "first")
logger.Log(ctx, testcase.level, "msg", "pos", "second")
logger.Log(ctx, testcase.level, "msg", "pos", "third")
logger.Log(ctx, testcase.level, "msg", "pos", "fourth")
time.Sleep(time.Second)
logger.WithGroup("g").With("pos", "after").Log(ctx, testcase.level, "msg")

assert.Equal(t, testcase.expected, buf.String())
})
}
}

func TestHandler_race(t *testing.T) {
t.Parallel()

buf := &bytes.Buffer{}
handler := rate.New(
slog.NewTextHandler(buf, &slog.HandlerOptions{
Level: slog.LevelDebug,
ReplaceAttr: func(groups []string, attr slog.Attr) slog.Attr {
if len(groups) == 0 && attr.Key == slog.TimeKey {
return slog.Attr{}
}

return attr
},
}),
rate.WithEvery(0),
)
logger := slog.New(handler)
ctx := context.Background()

start := make(chan struct{})
procs := 1000
var waitGroup sync.WaitGroup
waitGroup.Add(procs)
for i := 0; i < procs; i++ {
go func() {
defer waitGroup.Done()

<-start
logger.Log(ctx, slog.LevelInfo, "msg")
time.Sleep(time.Second)
logger.Log(ctx, slog.LevelInfo, "msg")
}()
}
close(start)
waitGroup.Wait()

assert.Equal(t, 200, bytes.Count(buf.Bytes(), []byte("\n")))
}

0 comments on commit fc8fe39

Please sign in to comment.