Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prom/scrape): recreate internal scraper after reloaded #5541

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ v0.37.2 (2023-10-16)
config not being included in the river output. (@erikbaranowski)

- Fix issue with default values in `discovery.nomad`. (@marctc)

- Fix issue where two scrape options `prometheus.scrape` component not getting
applied after agent got reloaded. (hainenber)

### Enhancements

Expand Down
60 changes: 38 additions & 22 deletions component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,12 @@ var (

// New creates a new prometheus.scrape component.
func New(o component.Options, args Arguments) (*Component, error) {
data, err := o.GetServiceData(http.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about HTTP server: %w", err)
}
httpData := data.(http.Data)

data, err = o.GetServiceData(cluster.ServiceName)
data, err := o.GetServiceData(cluster.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about cluster: %w", err)
}
clusterData := data.(cluster.Cluster)

flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer)
scrapeOptions := &scrape.Options{
ExtraMetrics: args.ExtraMetrics,
HTTPClientOptions: []config_util.HTTPClientOption{
config_util.WithDialContextFunc(httpData.DialFunc),
},
EnableProtobufNegotiation: args.EnableProtobufNegotiation,
}
scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable)

targetsGauge := client_prometheus.NewGauge(client_prometheus.GaugeOpts{
Name: "agent_prometheus_scrape_targets_gauge",
Help: "Number of targets this component is configured to scrape"})
Expand All @@ -174,11 +158,19 @@ func New(o component.Options, args Arguments) (*Component, error) {
opts: o,
cluster: clusterData,
reloadTargets: make(chan struct{}, 1),
scraper: scraper,
appendable: flowAppendable,
targetsGauge: targetsGauge,
}

// Update created component with prometheus.Fanout and prometheus.ScrapeManager
data, err = o.GetServiceData(http.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about HTTP server: %w", err)
}
httpData := data.(http.Data)
flowAppendable, scraper := c.createPromScrapeResources(httpData, args)
c.appendable = flowAppendable
c.scraper = scraper
Comment on lines +164 to +172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary, if the same code will run in Update on line 175?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to retain the previous setup. Now you're saying, it does look extra. Let see if the upcoming unit test can invalidate this step and I'll remove it


// Call to Update() to set the receivers and targets once at the start.
if err := c.Update(args); err != nil {
return nil, err
Expand Down Expand Up @@ -236,13 +228,22 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()
c.args = newArgs

// Update scraper with scrape options
data, err := c.opts.GetServiceData(http.ServiceName)
if err != nil {
return fmt.Errorf("failed to get information about HTTP server: %w", err)
}
newFlowAppendables, newScraper := c.createPromScrapeResources(data.(http.Data), newArgs)
c.appendable = newFlowAppendables
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have to recreate the appendables; the call to UpdateChildren on line 240 should be sufficient, and we can return to having only one instance of c.appendable for the lifetime of the component.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, -1 work for me then 😄

c.scraper = newScraper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue here: the Run method runs the original c.scraper, and will never terminate that scraper/run the new one that's created. This means the changes here on subsequent updates will never truly apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally missed this one out 🫣

But I do think we can rewrite the scraper's Run goroutine into a for-select loop to handle scraper restart.

Let me try out my luck on this.


c.appendable.UpdateChildren(newArgs.ForwardTo)

sc := getPromScrapeConfigs(c.opts.ID, newArgs)
err := c.scraper.ApplyConfig(&config.Config{

if err := c.scraper.ApplyConfig(&config.Config{
ScrapeConfigs: []*config.ScrapeConfig{sc},
})
if err != nil {
}); err != nil {
return fmt.Errorf("error applying scrape configs: %w", err)
}
level.Debug(c.opts.Logger).Log("msg", "scrape config was updated")
Expand Down Expand Up @@ -377,6 +378,21 @@ func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Targe
return map[string][]*targetgroup.Group{jobName: {promGroup}}
}

// newScrapeManager creates new prometheus.scrape manager
func (c *Component) createPromScrapeResources(httpData http.Data, args Arguments) (*prometheus.Fanout, *scrape.Manager) {
flowAppendable := prometheus.NewFanout(args.ForwardTo, c.opts.ID, c.opts.Registerer)
scrapeOptions := &scrape.Options{
ExtraMetrics: args.ExtraMetrics,
HTTPClientOptions: []config_util.HTTPClientOption{
config_util.WithDialContextFunc(httpData.DialFunc),
},
EnableProtobufNegotiation: args.EnableProtobufNegotiation,
}
scraper := scrape.NewManager(scrapeOptions, c.opts.Logger, flowAppendable)

return flowAppendable, scraper
}

func convertLabelSet(tg discovery.Target) model.LabelSet {
lset := make(model.LabelSet, len(tg))
for k, v := range tg {
Expand Down
Loading