diff --git a/.gitignore b/.gitignore index fc34d32..d2494cd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ ./avalanche .build/ .idea/ +mtypes +avalanche \ No newline at end of file diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 8872c76..6cb3cde 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -15,7 +15,6 @@ package main import ( "context" - "crypto/tls" "fmt" "log" "net/http" @@ -62,58 +61,32 @@ func main() { cfg := metrics.NewConfigFromFlags(kingpin.Flag) port := kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteURL := kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() - remoteConcurrencyLimit := kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int() - remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() - remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int() - remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() - remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() - tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() - remoteTenantHeader := kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() - // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). - outOfOrder := kingpin.Flag("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool() + writeCfg := metrics.NewWriteConfigFromFlags(kingpin.Flag) kingpin.Parse() if err := cfg.Validate(); err != nil { kingpin.FatalUsage("configuration error: %v", err) } - log.Println("initializing avalanche...") + if err := writeCfg.Validate(); err != nil { + kingpin.FatalUsage("remote write config validation failed: %v", err) + } collector := metrics.NewCollector(*cfg) reg := prometheus.NewRegistry() reg.MustRegister(collector) + writeCfg.UpdateNotify = collector.UpdateNotifyCh() + + log.Println("initializing avalanche...") var g run.Group g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM)) g.Add(collector.Run, collector.Stop) // One-off remote write send mode. - if *remoteURL != nil { - if (**remoteURL).Host == "" || (**remoteURL).Scheme == "" { - log.Fatal("remote host and scheme can't be empty") - } - if *remoteBatchSize <= 0 { - log.Fatal("remote send batch size should be more than zero") - } - - config := &metrics.ConfigWrite{ - URL: **remoteURL, - RequestInterval: *remoteReqsInterval, - BatchSize: *remoteBatchSize, - RequestCount: *remoteRequestCount, - UpdateNotify: collector.UpdateNotifyCh(), - Concurrency: *remoteConcurrencyLimit, - Tenant: *remoteTenant, - TLSClientConfig: tls.Config{ - InsecureSkipVerify: *tlsClientInsecure, - }, - TenantHeader: *remoteTenantHeader, - OutOfOrder: *outOfOrder, - } - + if writeCfg.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := metrics.SendRemoteWrite(ctx, config, reg); err != nil { + if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil { return err } return nil // One-off. diff --git a/metrics/write.go b/metrics/write.go index 4144cb5..ab7d1e8 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -35,6 +35,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + "gopkg.in/alecthomas/kingpin.v2" ) const maxErrMsgLen = 256 @@ -43,7 +44,7 @@ var userAgent = "avalanche" // ConfigWrite for the remote write requests. type ConfigWrite struct { - URL url.URL + URL *url.URL RequestInterval time.Duration BatchSize, RequestCount int @@ -56,6 +57,48 @@ type ConfigWrite struct { Concurrency int } +func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { + cfg := &ConfigWrite{} + flagReg("remote-url", "URL to send samples via remote_write API."). + URLVar(&cfg.URL) + flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20"). + IntVar(&cfg.Concurrency) + flagReg("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000"). + IntVar(&cfg.BatchSize) + flagReg("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100"). + IntVar(&cfg.RequestCount) + flagReg("remote-write-interval", "delay between each remote write request.").Default("100ms"). + DurationVar(&cfg.RequestInterval) + flagReg("remote-tenant", "Tenant ID to include in remote_write send").Default("0"). + StringVar(&cfg.Tenant) + flagReg("tls-client-insecure", "Skip certificate check on tls connection").Default("false"). + BoolVar(&cfg.TLSClientConfig.InsecureSkipVerify) + flagReg("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID"). + StringVar(&cfg.TenantHeader) + // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). + flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true"). + BoolVar(&cfg.OutOfOrder) + + return cfg +} + +func (c *ConfigWrite) Validate() error { + if c.URL != nil { + if c.URL.Host == "" || c.URL.Scheme == "" { + return fmt.Errorf("remote host and scheme can't be empty") + } + + if c.BatchSize <= 0 { + return fmt.Errorf("remote send batch sizemust be greater than 0, got %v", c.BatchSize) + } + + if c.RequestInterval <= 0 { + return fmt.Errorf("--remote-write-interval must be greater than 0, got %v", c.RequestInterval) + } + } + return nil +} + // Client for the remote write requests. type Client struct { client *http.Client @@ -66,20 +109,20 @@ type Client struct { // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(ctx context.Context, config *ConfigWrite, gatherer prometheus.Gatherer) error { +func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ - TLSClientConfig: &config.TLSClientConfig, + TLSClientConfig: &cfg.TLSClientConfig, } - rt = &tenantRoundTripper{tenant: config.Tenant, tenantHeader: config.TenantHeader, rt: rt} + rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt} httpClient := &http.Client{Transport: rt} - c := Client{ + client := Client{ client: httpClient, timeout: time.Minute, - config: config, + config: cfg, gatherer: gatherer, } - return c.write(ctx) + return client.write(ctx) } // Add the tenant ID header