From df2ff189ad9c4a0f0c0bef4aeb5f5702850762a3 Mon Sep 17 00:00:00 2001 From: Arunprasad Rajkumar Date: Mon, 14 Oct 2024 21:23:07 +0530 Subject: [PATCH] Add concurrency limit for remote write (#46) Prior to this commit, unbounded number of write would have been trigged from avalanche based on number of samples and batch size. Adding a new flag to limit the concurrency will be useful to simulate prometheus remote write sharding. Signed-off-by: Arunprasad Rajkumar --- cmd/avalanche/avalanche.go | 2 ++ metrics/write.go | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 84bd593..bf8f6f8 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -71,6 +71,7 @@ func main() { // TODO(bwplotka): Kill pprof feature, you can install OSS continuous profiling easily instead. remotePprofURLs := kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() remotePprofInterval := kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles. When not provided it will download a profile once before the end of the test.").Duration() + remoteConcurrencyLimit := kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int() remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int() remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() @@ -109,6 +110,7 @@ func main() { BatchSize: *remoteBatchSize, RequestCount: *remoteRequestCount, UpdateNotify: collector.UpdateNotifyCh(), + Concurrency: *remoteConcurrencyLimit, Tenant: *remoteTenant, TLSClientConfig: tls.Config{ InsecureSkipVerify: *tlsClientInsecure, diff --git a/metrics/write.go b/metrics/write.go index 9cecd02..9e0af3f 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -54,6 +54,7 @@ type ConfigWrite struct { TLSClientConfig tls.Config TenantHeader string OutOfOrder bool + Concurrency int } // Client for the remote write requests. @@ -144,6 +145,8 @@ func (c *Client) write(ctx context.Context) error { ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() + concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + for i := 0; ; { if ctx.Err() != nil { return ctx.Err() @@ -180,7 +183,11 @@ func (c *Client) write(ctx context.Context) error { start := time.Now() for i := 0; i < len(tss); i += c.config.BatchSize { wgMetrics.Add(1) + concurrencyLimitCh <- struct{}{} go func(i int) { + defer func() { + <-concurrencyLimitCh + }() defer wgMetrics.Done() end := i + c.config.BatchSize if end > len(tss) {