Skip to content

Commit

Permalink
Merge branch 'main' into fix-resolve-endpoint-v2-not-found-in-aws-sdk…
Browse files Browse the repository at this point in the history
…-go-v2
  • Loading branch information
hainenber authored Feb 15, 2024
2 parents 081d9bc + 6eb1889 commit a793764
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ Main (unreleased)

- Fix `ResolveEndpointV2 not found` for AWS-related components. (@hainenber)

- Fix bug where custom headers were not actually being set in loki client. (@captncraig)

- Fix `ResolveEndpointV2 not found` for AWS-related components. (@hainenber)

### Other changes

- Removed support for Windows 2012 in line with Microsoft end of life. (@mattdurham)
Expand Down
10 changes: 6 additions & 4 deletions component/loki/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,14 @@ func (c *Component) Update(args component.Arguments) error {
}

cfgs := newArgs.convertClientConfigs()

uid := agentseed.Get().UID
for _, cfg := range cfgs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
for i := range cfgs {
//cfgs is slice of struct values, so we set by index
if cfgs[i].Headers == nil {
cfgs[i].Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
cfgs[i].Headers[agentseed.HeaderName] = uid
}
walCfg := wal.Config{
Enabled: newArgs.WAL.Enabled,
Expand Down
16 changes: 8 additions & 8 deletions component/pyroscope/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func NewDefaultArguments() Arguments {
Scheme: "http",
HTTPClientConfig: component_config.DefaultHTTPClientConfig,
ScrapeInterval: 15 * time.Second,
ScrapeTimeout: 15*time.Second + (3 * time.Second),
ScrapeTimeout: 10 * time.Second,
ProfilingConfig: DefaultProfilingConfig,
}
}
Expand All @@ -205,16 +205,16 @@ func (arg *Arguments) SetToDefault() {

// Validate implements river.Validator.
func (arg *Arguments) Validate() error {
if arg.ScrapeTimeout <= 0 {
if arg.ScrapeTimeout.Seconds() <= 0 {
return fmt.Errorf("scrape_timeout must be greater than 0")
}
if arg.ScrapeTimeout <= arg.ScrapeInterval {
return fmt.Errorf("scrape_timeout must be greater than scrape_interval")
}

if cfg, ok := arg.ProfilingConfig.ProcessCPU, true; ok {
if cfg.Enabled && arg.ScrapeTimeout < time.Second*2 {
return fmt.Errorf("%v scrape_timeout must be at least 2 seconds", pprofProcessCPU)
// ScrapeInterval must be at least 2 seconds, because if
// ProfilingTarget.Delta is true the ScrapeInterval - 1s is propagated in
// the `seconds` parameter and it must be >= 1.
for _, target := range arg.ProfilingConfig.AllTargets() {
if target.Enabled && target.Delta && arg.ScrapeInterval.Seconds() < 2 {
return fmt.Errorf("scrape_interval must be at least 2 seconds when using delta profiling")
}
}

Expand Down
7 changes: 7 additions & 0 deletions component/pyroscope/scrape/scrape_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ type scrapeLoop struct {
}

func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Appendable, interval, timeout time.Duration, logger log.Logger) *scrapeLoop {
// if the URL parameter have a seconds parameter, then the collection will
// take at least scrape_duration - 1 second, as the HTTP request will block
// until the profile is collected.
if t.Params().Has("seconds") {
timeout += interval - time.Second
}

return &scrapeLoop{
Target: t,
logger: logger,
Expand Down
7 changes: 6 additions & 1 deletion component/pyroscope/scrape/scrape_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ func TestScrapePool(t *testing.T) {
args.ScrapeInterval = 2 * time.Second
p.reload(args)
for _, ta := range p.activeTargets {
require.Equal(t, 1*time.Second, ta.timeout)
if paramsSeconds := ta.params.Get("seconds"); paramsSeconds != "" {
// if the param is set timeout includes interval - 1s
require.Equal(t, 2*time.Second, ta.timeout)
} else {
require.Equal(t, 1*time.Second, ta.timeout)
}
require.Equal(t, 2*time.Second, ta.interval)
}
}
Expand Down
24 changes: 17 additions & 7 deletions component/pyroscope/scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,30 +142,40 @@ func TestUnmarshalConfig(t *testing.T) {
return r
},
},
"invalid cpu timeout": {
"invalid cpu scrape_interval": {
in: `
targets = []
forward_to = null
scrape_timeout = "1s"
scrape_interval = "0.5s"
`,
expectedErr: "process_cpu scrape_timeout must be at least 2 seconds",
expectedErr: "scrape_interval must be at least 2 seconds when using delta profiling",
},
"invalid timeout/interval": {
"allow short scrape_intervals without delta": {
in: `
targets = []
forward_to = null
scrape_timeout = "4s"
scrape_interval = "5s"
scrape_interval = "0.5s"
profiling_config {
profile.process_cpu {
enabled = false
}
}
`,
expectedErr: "scrape_timeout must be greater than scrape_interval",
expected: func() Arguments {
r := NewDefaultArguments()
r.Targets = make([]discovery.Target, 0)
r.ScrapeInterval = 500 * time.Millisecond
r.ProfilingConfig.ProcessCPU.Enabled = false
return r
},
},
"invalid HTTPClientConfig": {
in: `
targets = []
forward_to = null
scrape_timeout = "5s"
scrape_interval = "1s"
scrape_interval = "2s"
bearer_token = "token"
bearer_token_file = "/path/to/file.token"
`,
Expand Down
9 changes: 5 additions & 4 deletions pkg/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e
}

uid := agentseed.Get().UID
for _, cfg := range c.ClientConfigs {
if cfg.Headers == nil {
cfg.Headers = map[string]string{}
for i := range c.ClientConfigs {
// ClientConfigs is a slice of struct, so we set values with the index
if c.ClientConfigs[i].Headers == nil {
c.ClientConfigs[i].Headers = map[string]string{}
}
cfg.Headers[agentseed.HeaderName] = uid
c.ClientConfigs[i].Headers[agentseed.HeaderName] = uid
}

clientMetrics := client.NewMetrics(i.reg)
Expand Down
28 changes: 25 additions & 3 deletions pkg/metrics/instance/instance_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -73,8 +74,15 @@ remote_write: []
require.NoError(t, err)

instCtx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
err := inst.Run(instCtx)
require.NoError(t, err)
}()
Expand Down Expand Up @@ -142,8 +150,15 @@ remote_write: []
require.NoError(t, err)

instCtx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
err := inst.Run(instCtx)
require.NoError(t, err)
}()
Expand Down Expand Up @@ -193,8 +208,15 @@ remote_write: []
require.NoError(t, err)

instCtx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
err := inst.Run(instCtx)
require.NoError(t, err)
}()
Expand Down

0 comments on commit a793764

Please sign in to comment.