diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 8ecdbb9..fb077bb 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -68,6 +68,7 @@ func main() { // TODO(bwplotka): Kill pprof feature, you can install OSS continuous profiling easily instead. remotePprofURLs := kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() remotePprofInterval := kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles. When not provided it will download a profile once before the end of the test.").Duration() + remoteConcurrencyLimit := kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int() remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int() remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() @@ -106,6 +107,7 @@ func main() { BatchSize: *remoteBatchSize, RequestCount: *remoteRequestCount, UpdateNotify: collector.UpdateNotifyCh(), + Concurrency: *remoteConcurrencyLimit, Tenant: *remoteTenant, TLSClientConfig: tls.Config{ InsecureSkipVerify: *tlsClientInsecure, diff --git a/metrics/write.go b/metrics/write.go index 9cecd02..9e0af3f 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -54,6 +54,7 @@ type ConfigWrite struct { TLSClientConfig tls.Config TenantHeader string OutOfOrder bool + Concurrency int } // Client for the remote write requests. @@ -144,6 +145,8 @@ func (c *Client) write(ctx context.Context) error { ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() + concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + for i := 0; ; { if ctx.Err() != nil { return ctx.Err() @@ -180,7 +183,11 @@ func (c *Client) write(ctx context.Context) error { start := time.Now() for i := 0; i < len(tss); i += c.config.BatchSize { wgMetrics.Add(1) + concurrencyLimitCh <- struct{}{} go func(i int) { + defer func() { + <-concurrencyLimitCh + }() defer wgMetrics.Done() end := i + c.config.BatchSize if end > len(tss) {