Skip to content

Commit

Permalink
Refactor remote write flags into init method (#103)
Browse files Browse the repository at this point in the history
* Refactor remote write flags into init method

Signed-off-by: Saswata Mukherjee <[email protected]>

* Fix lint

Signed-off-by: Saswata Mukherjee <[email protected]>

* Implement suggestions

Signed-off-by: Saswata Mukherjee <[email protected]>

* Address comments

Signed-off-by: Saswata Mukherjee <[email protected]>

---------

Signed-off-by: Saswata Mukherjee <[email protected]>
  • Loading branch information
saswatamcode authored Dec 16, 2024
1 parent 191578f commit 84b8425
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
./avalanche
.build/
.idea/
mtypes
avalanche
45 changes: 9 additions & 36 deletions cmd/avalanche/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -62,58 +61,32 @@ func main() {

cfg := metrics.NewConfigFromFlags(kingpin.Flag)
port := kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL := kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remoteConcurrencyLimit := kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int()
remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int()
remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader := kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
// TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time).
outOfOrder := kingpin.Flag("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool()
writeCfg := metrics.NewWriteConfigFromFlags(kingpin.Flag)

kingpin.Parse()
if err := cfg.Validate(); err != nil {
kingpin.FatalUsage("configuration error: %v", err)
}
log.Println("initializing avalanche...")
if err := writeCfg.Validate(); err != nil {
kingpin.FatalUsage("remote write config validation failed: %v", err)
}

collector := metrics.NewCollector(*cfg)
reg := prometheus.NewRegistry()
reg.MustRegister(collector)
writeCfg.UpdateNotify = collector.UpdateNotifyCh()

log.Println("initializing avalanche...")

var g run.Group
g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
g.Add(collector.Run, collector.Stop)

// One-off remote write send mode.
if *remoteURL != nil {
if (**remoteURL).Host == "" || (**remoteURL).Scheme == "" {
log.Fatal("remote host and scheme can't be empty")
}
if *remoteBatchSize <= 0 {
log.Fatal("remote send batch size should be more than zero")
}

config := &metrics.ConfigWrite{
URL: **remoteURL,
RequestInterval: *remoteReqsInterval,
BatchSize: *remoteBatchSize,
RequestCount: *remoteRequestCount,
UpdateNotify: collector.UpdateNotifyCh(),
Concurrency: *remoteConcurrencyLimit,
Tenant: *remoteTenant,
TLSClientConfig: tls.Config{
InsecureSkipVerify: *tlsClientInsecure,
},
TenantHeader: *remoteTenantHeader,
OutOfOrder: *outOfOrder,
}

if writeCfg.URL != nil {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
if err := metrics.SendRemoteWrite(ctx, config, reg); err != nil {
if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil {
return err
}
return nil // One-off.
Expand Down
57 changes: 50 additions & 7 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"gopkg.in/alecthomas/kingpin.v2"
)

const maxErrMsgLen = 256
Expand All @@ -43,7 +44,7 @@ var userAgent = "avalanche"

// ConfigWrite for the remote write requests.
type ConfigWrite struct {
URL url.URL
URL *url.URL
RequestInterval time.Duration
BatchSize,
RequestCount int
Expand All @@ -56,6 +57,48 @@ type ConfigWrite struct {
Concurrency int
}

func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite {
cfg := &ConfigWrite{}
flagReg("remote-url", "URL to send samples via remote_write API.").
URLVar(&cfg.URL)
flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").
IntVar(&cfg.Concurrency)
flagReg("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").
IntVar(&cfg.BatchSize)
flagReg("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").
IntVar(&cfg.RequestCount)
flagReg("remote-write-interval", "delay between each remote write request.").Default("100ms").
DurationVar(&cfg.RequestInterval)
flagReg("remote-tenant", "Tenant ID to include in remote_write send").Default("0").
StringVar(&cfg.Tenant)
flagReg("tls-client-insecure", "Skip certificate check on tls connection").Default("false").
BoolVar(&cfg.TLSClientConfig.InsecureSkipVerify)
flagReg("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").
StringVar(&cfg.TenantHeader)
// TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time).
flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").
BoolVar(&cfg.OutOfOrder)

return cfg
}

func (c *ConfigWrite) Validate() error {
if c.URL != nil {
if c.URL.Host == "" || c.URL.Scheme == "" {
return fmt.Errorf("remote host and scheme can't be empty")
}

if c.BatchSize <= 0 {
return fmt.Errorf("remote send batch sizemust be greater than 0, got %v", c.BatchSize)
}

if c.RequestInterval <= 0 {
return fmt.Errorf("--remote-write-interval must be greater than 0, got %v", c.RequestInterval)
}
}
return nil
}

// Client for the remote write requests.
type Client struct {
client *http.Client
Expand All @@ -66,20 +109,20 @@ type Client struct {

// SendRemoteWrite initializes a http client and
// sends metrics to a prometheus compatible remote endpoint.
func SendRemoteWrite(ctx context.Context, config *ConfigWrite, gatherer prometheus.Gatherer) error {
func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error {
var rt http.RoundTripper = &http.Transport{
TLSClientConfig: &config.TLSClientConfig,
TLSClientConfig: &cfg.TLSClientConfig,
}
rt = &tenantRoundTripper{tenant: config.Tenant, tenantHeader: config.TenantHeader, rt: rt}
rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt}
httpClient := &http.Client{Transport: rt}

c := Client{
client := Client{
client: httpClient,
timeout: time.Minute,
config: config,
config: cfg,
gatherer: gatherer,
}
return c.write(ctx)
return client.write(ctx)
}

// Add the tenant ID header
Expand Down

0 comments on commit 84b8425

Please sign in to comment.