From ca0b35ed086b940784bfade553df2f9cc9f9fbc4 Mon Sep 17 00:00:00 2001 From: Arunprasad Rajkumar Date: Wed, 21 Sep 2022 22:01:17 +0530 Subject: [PATCH] Add concurrency limit for remote write Prior to this commit, unbounded number of write would have been trigged from avalanche based on number of samples and batch size. Adding a new flag to limit the concurrency will be useful to simulate prometheus remote write sharding. Signed-off-by: Arunprasad Rajkumar --- cmd/avalanche.go | 40 +++++++++++++++++++++------------------- metrics/write.go | 6 ++++++ 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 5a0925a..2c26f95 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -30,25 +30,26 @@ import ( ) var ( - metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int() - labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int() - seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int() - metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() - labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() - constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() - valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int() - labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() - metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() - port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() - remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() - remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration() - remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() - remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() - remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() - remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() - tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() - remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() + metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int() + labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int() + seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int() + metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() + labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() + constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() + valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int() + labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() + metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() + port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() + remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() + remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() + remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration() + remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() + remoteConcurrencyLimit = kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int() + remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() + remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() + remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() + tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() + remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() ) func main() { @@ -77,6 +78,7 @@ func main() { RequestInterval: *remoteReqsInterval, BatchSize: *remoteBatchSize, RequestCount: *remoteRequestCount, + Concurrency: *remoteConcurrencyLimit, UpdateNotify: updateNotify, Tenant: *remoteTenant, TLSClientConfig: tls.Config{ diff --git a/metrics/write.go b/metrics/write.go index 20c35a0..921539c 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -54,6 +54,7 @@ type ConfigWrite struct { Tenant string TLSClientConfig tls.Config TenantHeader string + Concurrency int } // Client for the remote write requests. @@ -126,6 +127,7 @@ func (c *Client) write() error { log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() + concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) for ii := 0; ii < c.config.RequestCount; ii++ { // Download the pprofs during half of the iteration to get avarege readings. // Do that only when it is not set to take profiles at a given interval. @@ -151,7 +153,11 @@ func (c *Client) write() error { start := time.Now() for i := 0; i < len(tss); i += c.config.BatchSize { wgMetrics.Add(1) + concurrencyLimitCh <- struct{}{} go func(i int) { + defer func() { + <-concurrencyLimitCh + }() defer wgMetrics.Done() end := i + c.config.BatchSize if end > len(tss) {