From 629dc95d5bf3a832945615dbd1c493b3f8d290bc Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 21 Oct 2024 10:39:57 +0530 Subject: [PATCH 1/4] Refactor remote write flags into init method Signed-off-by: Saswata Mukherjee --- .gitignore | 2 ++ cmd/avalanche/avalanche.go | 46 ++++++++----------------------------- metrics/write.go | 47 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 57 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index fc34d32..d2494cd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ ./avalanche .build/ .idea/ +mtypes +avalanche \ No newline at end of file diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 8872c76..0041b37 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -15,7 +15,6 @@ package main import ( "context" - "crypto/tls" "fmt" "log" "net/http" @@ -62,58 +61,33 @@ func main() { cfg := metrics.NewConfigFromFlags(kingpin.Flag) port := kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteURL := kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() - remoteConcurrencyLimit := kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int() - remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() - remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int() - remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() - remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() - tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() - remoteTenantHeader := kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() - // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). - outOfOrder := kingpin.Flag("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool() + remoteWriteConfig := metrics.NewRemoteWriteConfigFromFlags(kingpin.Flag) kingpin.Parse() if err := cfg.Validate(); err != nil { kingpin.FatalUsage("configuration error: %v", err) } - log.Println("initializing avalanche...") collector := metrics.NewCollector(*cfg) reg := prometheus.NewRegistry() reg.MustRegister(collector) + remoteWriteConfig.UpdateNotify = collector.UpdateNotifyCh() + + if err := remoteWriteConfig.Validate(); err != nil { + kingpin.FatalUsage("remote write config validation failed: %v", err) + } + + log.Println("initializing avalanche...") var g run.Group g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM)) g.Add(collector.Run, collector.Stop) // One-off remote write send mode. - if *remoteURL != nil { - if (**remoteURL).Host == "" || (**remoteURL).Scheme == "" { - log.Fatal("remote host and scheme can't be empty") - } - if *remoteBatchSize <= 0 { - log.Fatal("remote send batch size should be more than zero") - } - - config := &metrics.ConfigWrite{ - URL: **remoteURL, - RequestInterval: *remoteReqsInterval, - BatchSize: *remoteBatchSize, - RequestCount: *remoteRequestCount, - UpdateNotify: collector.UpdateNotifyCh(), - Concurrency: *remoteConcurrencyLimit, - Tenant: *remoteTenant, - TLSClientConfig: tls.Config{ - InsecureSkipVerify: *tlsClientInsecure, - }, - TenantHeader: *remoteTenantHeader, - OutOfOrder: *outOfOrder, - } - + if remoteWriteConfig.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := metrics.SendRemoteWrite(ctx, config, reg); err != nil { + if err := remoteWriteConfig.SendRemoteWrite(ctx, reg); err != nil { return err } return nil // One-off. diff --git a/metrics/write.go b/metrics/write.go index 4144cb5..aa08746 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -28,6 +28,7 @@ import ( "time" "github.com/prometheus-community/avalanche/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -43,7 +44,7 @@ var userAgent = "avalanche" // ConfigWrite for the remote write requests. type ConfigWrite struct { - URL url.URL + URL *url.URL RequestInterval time.Duration BatchSize, RequestCount int @@ -56,6 +57,48 @@ type ConfigWrite struct { Concurrency int } +func NewRemoteWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { + cfg := &ConfigWrite{} + flagReg("remote-url", "URL to send samples via remote_write API."). + URLVar(&cfg.URL) + flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20"). + IntVar(&cfg.Concurrency) + flagReg("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000"). + IntVar(&cfg.BatchSize) + flagReg("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100"). + IntVar(&cfg.RequestCount) + flagReg("remote-write-interval", "delay between each remote write request.").Default("100ms"). + DurationVar(&cfg.RequestInterval) + flagReg("remote-tenant", "Tenant ID to include in remote_write send").Default("0"). + StringVar(&cfg.Tenant) + flagReg("tls-client-insecure", "Skip certificate check on tls connection").Default("false"). + BoolVar(&cfg.TLSClientConfig.InsecureSkipVerify) + flagReg("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID"). + StringVar(&cfg.TenantHeader) + // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). + flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true"). + BoolVar(&cfg.OutOfOrder) + + return cfg +} + +func (c *ConfigWrite) Validate() error { + if c.URL != nil { + if c.URL.Host == "" || c.URL.Scheme == "" { + return fmt.Errorf("remote host and scheme can't be empty") + } + + if c.BatchSize <= 0 { + return fmt.Errorf("remote send batch sizemust be greater than 0, got %v", c.BatchSize) + } + + if c.RequestInterval <= 0 { + return fmt.Errorf("--remote-write-interval must be greater than 0, got %v", c.RequestInterval) + } + } + return nil +} + // Client for the remote write requests. type Client struct { client *http.Client @@ -66,7 +109,7 @@ type Client struct { // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(ctx context.Context, config *ConfigWrite, gatherer prometheus.Gatherer) error { +func (config *ConfigWrite) SendRemoteWrite(ctx context.Context, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &config.TLSClientConfig, } From 251d2b0f813bd7a671e8d008102e35446865ed10 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 21 Oct 2024 10:45:28 +0530 Subject: [PATCH 2/4] Fix lint Signed-off-by: Saswata Mukherjee --- metrics/write.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metrics/write.go b/metrics/write.go index aa08746..2872518 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -28,7 +28,6 @@ import ( "time" "github.com/prometheus-community/avalanche/pkg/errors" - "gopkg.in/alecthomas/kingpin.v2" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -36,6 +35,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + "gopkg.in/alecthomas/kingpin.v2" ) const maxErrMsgLen = 256 @@ -109,20 +109,20 @@ type Client struct { // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func (config *ConfigWrite) SendRemoteWrite(ctx context.Context, gatherer prometheus.Gatherer) error { +func (c *ConfigWrite) SendRemoteWrite(ctx context.Context, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ - TLSClientConfig: &config.TLSClientConfig, + TLSClientConfig: &c.TLSClientConfig, } - rt = &tenantRoundTripper{tenant: config.Tenant, tenantHeader: config.TenantHeader, rt: rt} + rt = &tenantRoundTripper{tenant: c.Tenant, tenantHeader: c.TenantHeader, rt: rt} httpClient := &http.Client{Transport: rt} - c := Client{ + client := Client{ client: httpClient, timeout: time.Minute, - config: config, + config: c, gatherer: gatherer, } - return c.write(ctx) + return client.write(ctx) } // Add the tenant ID header From b6407936c8437b3d73dc5a43a504b45b40ddf365 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 21 Oct 2024 12:30:20 +0530 Subject: [PATCH 3/4] Implement suggestions Signed-off-by: Saswata Mukherjee --- cmd/avalanche/avalanche.go | 9 ++++----- metrics/write.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 0041b37..0e8b69e 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -61,22 +61,21 @@ func main() { cfg := metrics.NewConfigFromFlags(kingpin.Flag) port := kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteWriteConfig := metrics.NewRemoteWriteConfigFromFlags(kingpin.Flag) + remoteWriteConfig := metrics.NewWriteConfigFromFlags(kingpin.Flag) kingpin.Parse() if err := cfg.Validate(); err != nil { kingpin.FatalUsage("configuration error: %v", err) } + if err := remoteWriteConfig.Validate(); err != nil { + kingpin.FatalUsage("remote write config validation failed: %v", err) + } collector := metrics.NewCollector(*cfg) reg := prometheus.NewRegistry() reg.MustRegister(collector) remoteWriteConfig.UpdateNotify = collector.UpdateNotifyCh() - if err := remoteWriteConfig.Validate(); err != nil { - kingpin.FatalUsage("remote write config validation failed: %v", err) - } - log.Println("initializing avalanche...") var g run.Group diff --git a/metrics/write.go b/metrics/write.go index 2872518..59767fc 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -57,7 +57,7 @@ type ConfigWrite struct { Concurrency int } -func NewRemoteWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { +func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { cfg := &ConfigWrite{} flagReg("remote-url", "URL to send samples via remote_write API."). URLVar(&cfg.URL) From 2a809c4ffb31be5d16d461fc44bc427d845d2994 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 16 Dec 2024 08:19:11 +0000 Subject: [PATCH 4/4] Address comments Signed-off-by: Saswata Mukherjee --- cmd/avalanche/avalanche.go | 10 +++++----- metrics/write.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 0e8b69e..6cb3cde 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -61,20 +61,20 @@ func main() { cfg := metrics.NewConfigFromFlags(kingpin.Flag) port := kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteWriteConfig := metrics.NewWriteConfigFromFlags(kingpin.Flag) + writeCfg := metrics.NewWriteConfigFromFlags(kingpin.Flag) kingpin.Parse() if err := cfg.Validate(); err != nil { kingpin.FatalUsage("configuration error: %v", err) } - if err := remoteWriteConfig.Validate(); err != nil { + if err := writeCfg.Validate(); err != nil { kingpin.FatalUsage("remote write config validation failed: %v", err) } collector := metrics.NewCollector(*cfg) reg := prometheus.NewRegistry() reg.MustRegister(collector) - remoteWriteConfig.UpdateNotify = collector.UpdateNotifyCh() + writeCfg.UpdateNotify = collector.UpdateNotifyCh() log.Println("initializing avalanche...") @@ -83,10 +83,10 @@ func main() { g.Add(collector.Run, collector.Stop) // One-off remote write send mode. - if remoteWriteConfig.URL != nil { + if writeCfg.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := remoteWriteConfig.SendRemoteWrite(ctx, reg); err != nil { + if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil { return err } return nil // One-off. diff --git a/metrics/write.go b/metrics/write.go index 59767fc..ab7d1e8 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -109,17 +109,17 @@ type Client struct { // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func (c *ConfigWrite) SendRemoteWrite(ctx context.Context, gatherer prometheus.Gatherer) error { +func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ - TLSClientConfig: &c.TLSClientConfig, + TLSClientConfig: &cfg.TLSClientConfig, } - rt = &tenantRoundTripper{tenant: c.Tenant, tenantHeader: c.TenantHeader, rt: rt} + rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt} httpClient := &http.Client{Transport: rt} client := Client{ client: httpClient, timeout: time.Minute, - config: c, + config: cfg, gatherer: gatherer, } return client.write(ctx)