From 2aacb5885aa704f70e79dc1d09066e67271d5363 Mon Sep 17 00:00:00 2001 From: Jaz Date: Sun, 17 Nov 2024 00:26:00 -0800 Subject: [PATCH] Add per-IP rate limiters --- pkg/server/server.go | 19 +++++++++++-------- pkg/server/subscriber.go | 8 +++++++- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 78bd318..6f88610 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -17,15 +17,17 @@ import ( "github.com/labstack/echo/v4" "go.opentelemetry.io/otel" "golang.org/x/sync/semaphore" + "golang.org/x/time/rate" ) type Server struct { - Subscribers map[int64]*Subscriber - lk sync.RWMutex - nextSub int64 - Consumer *consumer.Consumer - maxSubRate float64 - seq int64 + Subscribers map[int64]*Subscriber + lk sync.RWMutex + nextSub int64 + Consumer *consumer.Consumer + maxSubRate float64 + seq int64 + perIPLimiters map[string]*rate.Limiter } var upgrader = websocket.Upgrader{ @@ -40,8 +42,9 @@ var tracer = otel.Tracer("jetstream-server") func NewServer(maxSubRate float64) (*Server, error) { s := Server{ - Subscribers: make(map[int64]*Subscriber), - maxSubRate: maxSubRate, + Subscribers: make(map[int64]*Subscriber), + maxSubRate: maxSubRate, + perIPLimiters: make(map[string]*rate.Limiter), } return &s, nil diff --git a/pkg/server/subscriber.go b/pkg/server/subscriber.go index fc31fca..4eef5ab 100644 --- a/pkg/server/subscriber.go +++ b/pkg/server/subscriber.go @@ -195,6 +195,12 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib s.lk.Lock() defer s.lk.Unlock() + lim := s.perIPLimiters[realIP] + if lim == nil { + lim = rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)) + s.perIPLimiters[realIP] = lim + } + sub := Subscriber{ ws: ws, realIP: realIP, @@ -207,7 +213,7 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib compress: opts.Compress, deliveredCounter: eventsDelivered.WithLabelValues(realIP), bytesCounter: bytesDelivered.WithLabelValues(realIP), - rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)), + rl: lim, } s.Subscribers[s.nextSub] = &sub