Skip to content

Commit

Permalink
Add concurrency limit for remote write
Browse files Browse the repository at this point in the history
Prior to this commit, unbounded number of write would have been trigged from avalanche based on number of samples and batch size. Adding a new flag to limit the concurrency will be useful to simulate prometheus remote write sharding.

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar committed Sep 21, 2022
1 parent 19b624b commit c8cc20b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 19 deletions.
40 changes: 21 additions & 19 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,26 @@ import (
)

var (
metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int()
labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int()
seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration()
remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int()
remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int()
labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int()
seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration()
remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteConcurrencyLimit = kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int()
remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int()
remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
)

func main() {
Expand Down Expand Up @@ -77,6 +78,7 @@ func main() {
RequestInterval: *remoteReqsInterval,
BatchSize: *remoteBatchSize,
RequestCount: *remoteRequestCount,
Concurrency: *remoteConcurrencyLimit,
UpdateNotify: updateNotify,
Tenant: *remoteTenant,
TLSClientConfig: tls.Config{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.15.0
github.com/prometheus/prometheus v1.8.2-0.20201119181812-c8f810083d3f
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20201223074533-0d417f636930 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
8 changes: 8 additions & 0 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"golang.org/x/sync/semaphore"

"github.com/prometheus-community/avalanche/pkg/download"

Expand All @@ -54,6 +55,7 @@ type ConfigWrite struct {
Tenant string
TLSClientConfig tls.Config
TenantHeader string
Concurrency int
}

// Client for the remote write requests.
Expand Down Expand Up @@ -126,6 +128,7 @@ func (c *Client) write() error {
log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
ticker := time.NewTicker(c.config.RequestInterval)
defer ticker.Stop()
sem := semaphore.NewWeighted(int64(c.config.Concurrency))
for ii := 0; ii < c.config.RequestCount; ii++ {
// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
Expand Down Expand Up @@ -153,6 +156,11 @@ func (c *Client) write() error {
wgMetrics.Add(1)
go func(i int) {
defer wgMetrics.Done()
if err := sem.Acquire(context.TODO(), 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
return
}
defer sem.Release(1)
end := i + c.config.BatchSize
if end > len(tss) {
end = len(tss)
Expand Down

0 comments on commit c8cc20b

Please sign in to comment.