diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 5a0925a..2c26f95 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -30,25 +30,26 @@ import ( ) var ( - metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int() - labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int() - seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int() - metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() - labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() - constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() - valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int() - labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() - metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() - port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() - remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() - remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration() - remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() - remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() - remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() - remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() - tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() - remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() + metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int() + labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int() + seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int() + metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() + labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() + constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() + valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int() + labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() + metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() + port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() + remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() + remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() + remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration() + remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() + remoteConcurrencyLimit = kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int() + remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() + remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() + remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() + tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() + remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() ) func main() { @@ -77,6 +78,7 @@ func main() { RequestInterval: *remoteReqsInterval, BatchSize: *remoteBatchSize, RequestCount: *remoteRequestCount, + Concurrency: *remoteConcurrencyLimit, UpdateNotify: updateNotify, Tenant: *remoteTenant, TLSClientConfig: tls.Config{ diff --git a/go.mod b/go.mod index 4b567e3..f31b32c 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.15.0 github.com/prometheus/prometheus v1.8.2-0.20201119181812-c8f810083d3f + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20201223074533-0d417f636930 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) diff --git a/go.sum b/go.sum index 05c23cb..18df747 100644 --- a/go.sum +++ b/go.sum @@ -847,6 +847,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/metrics/write.go b/metrics/write.go index 20c35a0..9df6484 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -31,6 +31,7 @@ import ( "github.com/golang/snappy" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + "golang.org/x/sync/semaphore" "github.com/prometheus-community/avalanche/pkg/download" @@ -54,6 +55,7 @@ type ConfigWrite struct { Tenant string TLSClientConfig tls.Config TenantHeader string + Concurrency int } // Client for the remote write requests. @@ -126,6 +128,7 @@ func (c *Client) write() error { log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) ticker := time.NewTicker(c.config.RequestInterval) defer ticker.Stop() + sem := semaphore.NewWeighted(int64(c.config.Concurrency)) for ii := 0; ii < c.config.RequestCount; ii++ { // Download the pprofs during half of the iteration to get avarege readings. // Do that only when it is not set to take profiles at a given interval. @@ -153,6 +156,11 @@ func (c *Client) write() error { wgMetrics.Add(1) go func(i int) { defer wgMetrics.Done() + if err := sem.Acquire(context.TODO(), 1); err != nil { + log.Printf("Failed to acquire semaphore: %v", err) + return + } + defer sem.Release(1) end := i + c.config.BatchSize if end > len(tss) { end = len(tss)