Skip to content

Commit

Permalink
chore: add pr suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
gussf committed Jun 12, 2024
1 parent 5065f28 commit 84ebcd2
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 22 deletions.
3 changes: 2 additions & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,5 @@ rateLimiter:
host: "localhost"
port: 6379
password: ""
test: false
tls:
disabled: false
3 changes: 2 additions & 1 deletion config/docker_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,5 @@ rateLimiter:
host: "redis"
port: 6379
password: ""
test: true
tls:
disabled: true
3 changes: 2 additions & 1 deletion config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,5 @@ rateLimiter:
host: "localhost"
port: 6379
password: ""
test: true
tls:
disabled: true
2 changes: 1 addition & 1 deletion e2e/fcm_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPush
statsReport, err := extensions.NewStatsD(s.vConfig, logger, statsdClientMock)
s.Require().NoError(err)

rateLimiter := extensions.NewRateLimiter(s.vConfig, logger)
rateLimiter := extensions.NewRateLimiter(s.vConfig, []interfaces.StatsReporter{statsReport}, logger)

pushClient := firebaseMock.NewMockPushClient(ctrl)
gcmPusher.MessageHandler = map[string]interfaces.MessageHandler{
Expand Down
2 changes: 1 addition & 1 deletion extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (a *APNSMessageHandler) HandleMessages(ctx context.Context, message interfa
allowed := a.rateLimiter.Allow(ctx, notification.DeviceToken)
if !allowed {
statsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
l.Warn("rate limit reached")
l.WithField("message", message).Warn("rate limit reached")
return
}

Expand Down
6 changes: 6 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func statsReporterNotificationRateLimitReached(statsReporters []interfaces.Stats
}
}

func statsReporterNotificationRateLimitFailed(statsReporters []interfaces.StatsReporter, game string, platform string) {
for _, statsReporter := range statsReporters {
statsReporter.NotificationRateLimitFailed(game, platform)
}
}

func statsReporterReportSendNotificationLatency(statsReporters []interfaces.StatsReporter, latencyMs time.Duration, game string, platform string, labels ...string) {
for _, statsReporter := range statsReporters {
statsReporter.ReportSendNotificationLatency(latencyMs, game, platform, labels...)
Expand Down
9 changes: 9 additions & 0 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func (s *StatsD) NotificationRateLimitReached(game string, platform string) {
)
}

// NotificationRateLimitFailed stores how many times rate limits failed to be calculated
func (s *StatsD) NotificationRateLimitFailed(game string, platform string) {
s.Client.Incr(
"rate_limit_failed",
[]string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)},
1,
)
}

// InitializeFailure notifu error when is impossible tho initilizer an app
func (s *StatsD) InitializeFailure(game string, platform string) {
s.Client.Incr("initialize_failure", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
Expand Down
2 changes: 1 addition & 1 deletion extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
allowed := g.rateLimiter.Allow(context.Background(), km.To)
if !allowed {
statsReporterNotificationRateLimitReached(g.StatsReporters, message.Game, "gcm")
l.Warn("rate limit reached")
l.WithField("message", message).Warn("rate limit reached")
return errors.New("rate limit reached")
}
l.Debug("sending message to gcm")
Expand Down
2 changes: 1 addition & 1 deletion extensions/handler/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (h *messageHandler) HandleMessages(ctx context.Context, msg interfaces.Kafk
allowed := h.rateLimiter.Allow(ctx, km.To)
if !allowed {
h.reportRateLimitReached(msg.Game)
l.Warn("rate limit reached")
l.WithField("message", msg).Warn("rate limit reached")
return
}

Expand Down
25 changes: 15 additions & 10 deletions extensions/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,41 @@ import (
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/interfaces"
)

type rateLimiter struct {
redis *redis.Client
rpmLimit int
l *logrus.Entry
redis *redis.Client
rpmLimit int
statsReporters []interfaces.StatsReporter
l *logrus.Entry
}

func NewRateLimiter(config *viper.Viper, logger *logrus.Logger) rateLimiter {
func NewRateLimiter(config *viper.Viper, statsReporters []interfaces.StatsReporter, logger *logrus.Logger) rateLimiter {
host := config.GetString("rateLimiter.redis.host")
port := config.GetInt("rateLimiter.redis.port")
pwd := config.GetString("rateLimiter.redis.password")
limit := config.GetInt("rateLimiter.limit.rpm")
isTest := config.GetBool("rateLimiter.test")
disableTLS := config.GetBool("rateLimiter.tls.disabled")

addr := fmt.Sprintf("%s:%d", host, port)
opts := &redis.Options{
Addr: addr,
Password: pwd,
}
// Setting TLSConfig only for production due to not being able to enable TLS in the integration test container.
if !isTest {

// TLS for integration tests running in containers can raise connection errors.
// Not recommended to disable TLS for production.
if !disableTLS {
opts.TLSConfig = &tls.Config{}
}

rdb := redis.NewClient(opts)

return rateLimiter{
redis: rdb,
rpmLimit: limit,
redis: rdb,
rpmLimit: limit,
statsReporters: statsReporters,
l: logger.WithFields(logrus.Fields{
"extension": "RateLimiter",
"rpmLimit": limit,
Expand Down Expand Up @@ -82,7 +87,7 @@ func (r rateLimiter) Allow(ctx context.Context, device string) bool {

_, err = r.redis.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Incr(ctx, deviceKey)
pipe.Expire(ctx, deviceKey, 1*time.Minute)
pipe.Expire(ctx, deviceKey, time.Minute)
return nil
})
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions extensions/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/util"

"github.com/google/uuid"
Expand All @@ -27,9 +28,11 @@ var _ = FDescribe("Rate Limiter", func() {
Expect(err).NotTo(HaveOccurred())
hook.Reset()

statsClients := []interfaces.StatsReporter{}

Describe("Rate limiting", func() {
It("should return not-allowed when rate limit is reached", func() {
rl := NewRateLimiter(config, logger)
rl := NewRateLimiter(config, statsClients, logger)
rl.rpmLimit = 1
ctx := context.Background()
device := uuid.NewString()
Expand All @@ -42,7 +45,7 @@ var _ = FDescribe("Rate Limiter", func() {
})

It("should increment current rate if limit is not reached", func() {
rl := NewRateLimiter(config, logger)
rl := NewRateLimiter(config, statsClients, logger)
ctx := context.Background()
device := uuid.NewString()
currMin := time.Now().Minute()
Expand All @@ -56,6 +59,17 @@ var _ = FDescribe("Rate Limiter", func() {
Expect(actual).To(BeEquivalentTo("1"))
})

It("should return allowed if redis fails", func() {
wrongConfig := *config
wrongConfig.Set("rateLimiter.redis.host", "unreachable")
rl := NewRateLimiter(&wrongConfig, statsClients, logger)
ctx := context.Background()
device := uuid.NewString()

allowed := rl.Allow(ctx, device)
Expect(allowed).To(BeTrue())
})

})
})
})
1 change: 1 addition & 0 deletions interfaces/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type StatsReporter interface {
ReportMetricGauge(metric string, value float64, game string, platform string)
ReportMetricCount(metric string, value int64, game string, platform string)
NotificationRateLimitReached(game string, platform string)
NotificationRateLimitFailed(game string, platform string)
ReportSendNotificationLatency(latencyMs time.Duration, game string, platform string, labels ...string)
ReportFirebaseLatency(latencyMs time.Duration, game string, labels ...string)
}
2 changes: 1 addition & 1 deletion pusher/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
a.feedbackReporters,
queue,
interfaces.ConsumptionManager(q),
extensions.NewRateLimiter(a.ViperConfig, l.Logger),
extensions.NewRateLimiter(a.ViperConfig, a.StatsReporters, l.Logger),
)
if err == nil {
a.MessageHandler[k] = handler
Expand Down
4 changes: 2 additions & 2 deletions pusher/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (g *GCMPusher) createMessageHandlerForApps(ctx context.Context) error {
pushClient,
g.feedbackReporters,
g.StatsReporters,
extensions.NewRateLimiter(g.ViperConfig, l.Logger),
extensions.NewRateLimiter(g.ViperConfig, g.StatsReporters, l.Logger),
g.Logger,
g.Config.GCM.ConcurrentWorkers,
)
Expand All @@ -124,7 +124,7 @@ func (g *GCMPusher) createMessageHandlerForApps(ctx context.Context) error {
g.Queue.PendingMessagesWaitGroup(),
g.StatsReporters,
g.feedbackReporters,
extensions.NewRateLimiter(g.ViperConfig, l.Logger),
extensions.NewRateLimiter(g.ViperConfig, g.StatsReporters, l.Logger),
)
if err != nil {
l.WithError(err).Error("could not create gcm message handler")
Expand Down

0 comments on commit 84ebcd2

Please sign in to comment.