Skip to content

Commit

Permalink
Add rate limit for agent requests
Browse files Browse the repository at this point in the history
  • Loading branch information
matthiasBT committed Oct 1, 2023
1 parent 6e4ae96 commit 95f269e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
17 changes: 8 additions & 9 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -40,14 +39,14 @@ func main() {
Data: &dataExchange,
Ticker: time.NewTicker(time.Duration(conf.ReportInterval) * time.Second),
Done: done,
SendAdapter: &adapters.HTTPReportAdapter{
Logger: logger,
ServerAddr: conf.Addr,
UpdateURL: conf.UpdateURL,
Retrier: retrier,
Lock: &sync.Mutex{},
HMACKey: []byte(conf.HMACKey),
},
SendAdapter: adapters.NewHTTPReportAdapter(
logger,
conf.Addr,
conf.UpdateURL,
retrier,
[]byte(conf.HMACKey),
conf.RateLimit,
),
}
poller := poll.Poller{
Logger: logger,
Expand Down
39 changes: 36 additions & 3 deletions internal/agent/adapters/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,43 @@ type HTTPReportAdapter struct {
ServerAddr string
UpdateURL string
Retrier utils.Retrier
Lock *sync.Mutex
HMACKey []byte
Lock *sync.Mutex
jobs chan []byte
}

var ErrResponseNotOK = errors.New("response not OK")

func NewHTTPReportAdapter(
logger logging.ILogger,
serverAddr string,
updateURL string,
retrier utils.Retrier,
hmacKey []byte,
workerNum uint,
) *HTTPReportAdapter {
jobs := make(chan []byte, workerNum)
adapter := HTTPReportAdapter{
Logger: logger,
ServerAddr: serverAddr,
UpdateURL: updateURL,
Retrier: retrier,
HMACKey: hmacKey,
Lock: &sync.Mutex{},
jobs: jobs,
}
var i uint
for i = 0; i < workerNum; i++ {
go func() {
select {
case data := <-jobs:
adapter.report(&data)
}
}()
}
return &adapter
}

func (r *HTTPReportAdapter) Report(metrics *common.Metrics) error {
r.Lock.Lock()
defer r.Lock.Unlock()
Expand All @@ -37,7 +68,8 @@ func (r *HTTPReportAdapter) Report(metrics *common.Metrics) error {
r.Logger.Errorf("Failed to marshal a metric: %v", metrics)
return err
}
return r.report(&payload)
r.jobs <- payload
return nil
}

func (r *HTTPReportAdapter) ReportBatch(batch []*common.Metrics) error {
Expand All @@ -48,7 +80,8 @@ func (r *HTTPReportAdapter) ReportBatch(batch []*common.Metrics) error {
r.Logger.Errorf("Failed to marshal a batch of metrics: %v\n", err.Error())
return err
}
return r.report(&payload)
r.jobs <- payload
return nil
}

func (r *HTTPReportAdapter) report(payload *[]byte) error {
Expand Down
5 changes: 5 additions & 0 deletions internal/infra/config/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
ReportInterval uint `env:"REPORT_INTERVAL"`
PollInterval uint `env:"POLL_INTERVAL"`
HMACKey string `env:"KEY"`
RateLimit uint `env:"RATE_LIMIT"`
RetryAttempts int
RetryIntervalInitial time.Duration
RetryIntervalBackoff time.Duration
Expand All @@ -40,6 +41,7 @@ func InitConfig() (*Config, error) {
)
pollInterval := flag.Uint("p", DefPollInterval, "How often to query metrics, seconds")
hmacKey := flag.String("k", "", "HMAC key for integrity checks")
rateLimit := flag.Uint("l", 1, "Max number of active workers")
flag.Parse()
if conf.Addr == "" {
conf.Addr = *addr
Expand All @@ -53,6 +55,9 @@ func InitConfig() (*Config, error) {
if conf.HMACKey == "" {
conf.HMACKey = *hmacKey
}
if conf.RateLimit == 0 {
conf.RateLimit = *rateLimit
}
conf.UpdateURL = updateURL
conf.RetryAttempts = DefRetryAttempts
conf.RetryIntervalInitial = DefRetryIntervalInitial
Expand Down

0 comments on commit 95f269e

Please sign in to comment.