Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor remote write flags into init method #103

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
./avalanche
.build/
.idea/
mtypes
avalanche
45 changes: 9 additions & 36 deletions cmd/avalanche/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -62,58 +61,32 @@ func main() {

cfg := metrics.NewConfigFromFlags(kingpin.Flag)
port := kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL := kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remoteConcurrencyLimit := kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int()
remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteRequestCount := kingpin.Flag("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").Int()
remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader := kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
// TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time).
outOfOrder := kingpin.Flag("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool()
writeCfg := metrics.NewWriteConfigFromFlags(kingpin.Flag)

kingpin.Parse()
if err := cfg.Validate(); err != nil {
kingpin.FatalUsage("configuration error: %v", err)
}
log.Println("initializing avalanche...")
if err := writeCfg.Validate(); err != nil {
kingpin.FatalUsage("remote write config validation failed: %v", err)
}

collector := metrics.NewCollector(*cfg)
reg := prometheus.NewRegistry()
reg.MustRegister(collector)
writeCfg.UpdateNotify = collector.UpdateNotifyCh()

log.Println("initializing avalanche...")

var g run.Group
g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
g.Add(collector.Run, collector.Stop)

// One-off remote write send mode.
if *remoteURL != nil {
if (**remoteURL).Host == "" || (**remoteURL).Scheme == "" {
log.Fatal("remote host and scheme can't be empty")
}
if *remoteBatchSize <= 0 {
log.Fatal("remote send batch size should be more than zero")
}

config := &metrics.ConfigWrite{
URL: **remoteURL,
RequestInterval: *remoteReqsInterval,
BatchSize: *remoteBatchSize,
RequestCount: *remoteRequestCount,
UpdateNotify: collector.UpdateNotifyCh(),
Concurrency: *remoteConcurrencyLimit,
Tenant: *remoteTenant,
TLSClientConfig: tls.Config{
InsecureSkipVerify: *tlsClientInsecure,
},
TenantHeader: *remoteTenantHeader,
OutOfOrder: *outOfOrder,
}

if writeCfg.URL != nil {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
if err := metrics.SendRemoteWrite(ctx, config, reg); err != nil {
if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil {
return err
}
return nil // One-off.
Expand Down
57 changes: 50 additions & 7 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"gopkg.in/alecthomas/kingpin.v2"
)

const maxErrMsgLen = 256
Expand All @@ -43,7 +44,7 @@ var userAgent = "avalanche"

// ConfigWrite for the remote write requests.
type ConfigWrite struct {
URL url.URL
URL *url.URL
RequestInterval time.Duration
BatchSize,
RequestCount int
Expand All @@ -56,6 +57,48 @@ type ConfigWrite struct {
Concurrency int
}

func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite {
cfg := &ConfigWrite{}
flagReg("remote-url", "URL to send samples via remote_write API.").
URLVar(&cfg.URL)
flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").
IntVar(&cfg.Concurrency)
flagReg("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").
IntVar(&cfg.BatchSize)
flagReg("remote-requests-count", "How many requests to send in total to the remote_write API. Set to -1 to run indefinitely.").Default("100").
IntVar(&cfg.RequestCount)
flagReg("remote-write-interval", "delay between each remote write request.").Default("100ms").
DurationVar(&cfg.RequestInterval)
flagReg("remote-tenant", "Tenant ID to include in remote_write send").Default("0").
StringVar(&cfg.Tenant)
flagReg("tls-client-insecure", "Skip certificate check on tls connection").Default("false").
BoolVar(&cfg.TLSClientConfig.InsecureSkipVerify)
flagReg("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").
StringVar(&cfg.TenantHeader)
// TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time).
flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").
BoolVar(&cfg.OutOfOrder)

return cfg
}

func (c *ConfigWrite) Validate() error {
if c.URL != nil {
if c.URL.Host == "" || c.URL.Scheme == "" {
return fmt.Errorf("remote host and scheme can't be empty")
}

if c.BatchSize <= 0 {
return fmt.Errorf("remote send batch sizemust be greater than 0, got %v", c.BatchSize)
}

if c.RequestInterval <= 0 {
return fmt.Errorf("--remote-write-interval must be greater than 0, got %v", c.RequestInterval)
}
}
return nil
}

// Client for the remote write requests.
type Client struct {
client *http.Client
Expand All @@ -66,20 +109,20 @@ type Client struct {

// SendRemoteWrite initializes a http client and
// sends metrics to a prometheus compatible remote endpoint.
func SendRemoteWrite(ctx context.Context, config *ConfigWrite, gatherer prometheus.Gatherer) error {
func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error {
var rt http.RoundTripper = &http.Transport{
TLSClientConfig: &config.TLSClientConfig,
TLSClientConfig: &cfg.TLSClientConfig,
}
rt = &tenantRoundTripper{tenant: config.Tenant, tenantHeader: config.TenantHeader, rt: rt}
rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt}
httpClient := &http.Client{Transport: rt}

c := Client{
client := Client{
client: httpClient,
timeout: time.Minute,
config: config,
config: cfg,
gatherer: gatherer,
}
return c.write(ctx)
return client.write(ctx)
}

// Add the tenant ID header
Expand Down
Loading