Skip to content

Commit

Permalink
Fix issue with unnecessary config reload in static mode (#6977)
Browse files Browse the repository at this point in the history
* Fix issue with unnecessary config reload

* Update changelog

* Simplify byte slice to string conversion

* Avoid race conditions when checking logs in tests
  • Loading branch information
ptodev authored Jul 11, 2024
1 parent e325e8a commit 8dbd241
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ v0.41.1 (2024-06-07)

- Updated pyroscope to v0.4.6 introducing `symbols_map_size` and `pid_map_size` configuration. (@simonswine)

### Bugfixes

- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed.
The bug only affected the "metrics" and "logs" subsystems in Static mode.

v0.41.0 (2024-05-31)
--------------------
Expand Down
26 changes: 2 additions & 24 deletions cmd/grafana-agent-service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os/exec"
"path/filepath"
"runtime"
"sync"
"testing"

"github.com/go-kit/log"
Expand Down Expand Up @@ -84,7 +83,7 @@ func Test_serviceManager(t *testing.T) {
t.Run("can forward to stdout", func(t *testing.T) {
listenHost := getListenHost(t)

var buf syncBuffer
var buf util.SyncBuffer

mgr := newServiceManager(l, serviceManagerConfig{
Path: serviceBinary,
Expand Down Expand Up @@ -112,7 +111,7 @@ func Test_serviceManager(t *testing.T) {
t.Run("can forward to stderr", func(t *testing.T) {
listenHost := getListenHost(t)

var buf syncBuffer
var buf util.SyncBuffer

mgr := newServiceManager(l, serviceManagerConfig{
Path: serviceBinary,
Expand Down Expand Up @@ -186,24 +185,3 @@ func makeServiceRequest(host string, path string, body []byte) ([]byte, error) {
}
return io.ReadAll(resp.Body)
}

// syncBuffer wraps around a bytes.Buffer and makes it safe to use from
// multiple goroutines.
type syncBuffer struct {
mut sync.RWMutex
buf bytes.Buffer
}

func (sb *syncBuffer) Bytes() []byte {
sb.mut.RLock()
defer sb.mut.RUnlock()

return sb.buf.Bytes()
}

func (sb *syncBuffer) Write(p []byte) (n int, err error) {
sb.mut.Lock()
defer sb.mut.Unlock()

return sb.buf.Write(p)
}
34 changes: 34 additions & 0 deletions internal/util/syncbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package util

import (
"bytes"
"sync"
)

// SyncBuffer wraps around a bytes.Buffer and makes it safe to use from
// multiple goroutines.
type SyncBuffer struct {
mut sync.RWMutex
buf bytes.Buffer
}

func (sb *SyncBuffer) Bytes() []byte {
sb.mut.RLock()
defer sb.mut.RUnlock()

return sb.buf.Bytes()
}

func (sb *SyncBuffer) String() string {
sb.mut.RLock()
defer sb.mut.RUnlock()

return sb.buf.String()
}

func (sb *SyncBuffer) Write(p []byte) (n int, err error) {
sb.mut.Lock()
defer sb.mut.Unlock()

return sb.buf.Write(p)
}
13 changes: 11 additions & 2 deletions static/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/wal"
"github.com/grafana/loki/pkg/tracing"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
)

func init() {
Expand Down Expand Up @@ -121,6 +122,8 @@ type Instance struct {
log log.Logger
reg *util.Unregisterer

previousConfig string

promtail *promtail.Promtail
}

Expand Down Expand Up @@ -155,14 +158,20 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e
defer i.mut.Unlock()

// No-op if the configs haven't changed.
if util.CompareYAML(c, i.cfg) {
newConfigByteArr, err := yaml.Marshal(c)
if err != nil {
return fmt.Errorf("failed to marshal new logs instance config: %w", err)
}
newConfig := string(newConfigByteArr)
if newConfig == i.previousConfig {
level.Debug(i.log).Log("msg", "instance config hasn't changed, not recreating Promtail")
return nil
}
i.previousConfig = newConfig
i.cfg = c

positionsDir := filepath.Dir(c.PositionsConfig.PositionsFile)
err := os.MkdirAll(positionsDir, 0775)
err = os.MkdirAll(positionsDir, 0775)
if err != nil {
level.Warn(i.log).Log("msg", "failed to create the positions directory. logs may be unable to save their position", "path", positionsDir, "err", err)
}
Expand Down
31 changes: 30 additions & 1 deletion static/logs/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package logs

import (
"bytes"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -30,6 +31,12 @@ func TestLogs_NilConfig(t *testing.T) {
defer l.Stop()
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug component=logs logs_config=default msg="instance config hasn't changed, not recreating Promtail"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func TestLogs(t *testing.T) {
//
// Create a temporary file to tail
Expand Down Expand Up @@ -87,7 +94,8 @@ configs:
dec.SetStrict(true)
require.NoError(t, dec.Decode(&cfg))
require.NoError(t, cfg.ApplyDefaults())
logger := log.NewSyncLogger(log.NewNopLogger())
logBuffer := bytes.Buffer{}
logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer))
l, err := New(prometheus.NewRegistry(), &cfg, logger, false)
require.NoError(t, err)
defer l.Stop()
Expand All @@ -103,6 +111,23 @@ configs:
require.Equal(t, "Hello, world!", req.Streams[0].Entries[0].Line)
}

// The config did change.
// We expect the config reload log line to not be printed.
checkConfigReloadLog(t, logBuffer.String(), 0)

//
// Apply the same config and try reloading.
// Recreate the config struct to make sure it's clean.
//
var sameCfg Config
dec = yaml.NewDecoder(strings.NewReader(cfgText))
dec.SetStrict(true)
require.NoError(t, dec.Decode(&sameCfg))
require.NoError(t, sameCfg.ApplyDefaults())
require.NoError(t, l.ApplyConfig(&sameCfg, false))

checkConfigReloadLog(t, logBuffer.String(), 1)

//
// Apply a new config and write a new line.
//
Expand Down Expand Up @@ -138,6 +163,10 @@ configs:
require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line)
}

// The config did change this time.
// We expect the config reload log line to not be printed again.
checkConfigReloadLog(t, logBuffer.String(), 1)

t.Run("update to nil", func(t *testing.T) {
// Applying a nil config should remove all instances.
err := l.ApplyConfig(nil, false)
Expand Down
12 changes: 11 additions & 1 deletion static/metrics/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/grafana/agent/internal/util"
"github.com/grafana/agent/static/metrics/cluster"
Expand Down Expand Up @@ -150,6 +151,8 @@ type Agent struct {
actor chan func()

initialBootDone atomic.Bool

previousConfig string
}

// New creates and starts a new Agent.
Expand Down Expand Up @@ -227,9 +230,16 @@ func (a *Agent) ApplyConfig(cfg Config) error {
a.mut.Lock()
defer a.mut.Unlock()

if util.CompareYAML(a.cfg, cfg) {
newConfigByteArr, err := yaml.Marshal(cfg)
if err != nil {
return fmt.Errorf("failed to marshal new config: %w", err)
}
newConfig := string(newConfigByteArr)
if newConfig == a.previousConfig {
level.Debug(a.logger).Log("msg", "not recreating metrics instance because config hasn't changed")
return nil
}
a.previousConfig = newConfig

if a.stopped {
return fmt.Errorf("agent stopped")
Expand Down
138 changes: 138 additions & 0 deletions static/metrics/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/grafana/agent/internal/util"
"github.com/grafana/agent/static/metrics/instance"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -113,6 +115,142 @@ configs:
require.Greater(t, int64(scrapeConfig.ScrapeInterval), int64(0))
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug agent=prometheus msg="not recreating metrics instance because config hasn't changed"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func TestConfigReload(t *testing.T) {
cfgText := `
wal_directory: /tmp/wal
configs:
- name: instance_a
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
`
var cfg Config

err := yaml.Unmarshal([]byte(cfgText), &cfg)
require.NoError(t, err)
err = cfg.ApplyDefaults()
require.NoError(t, err)

fact := newFakeInstanceFactory()

logBuffer := util.SyncBuffer{}
logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer))

reg := prometheus.NewRegistry()

a, err := newAgent(reg, cfg, logger, fact.factory)
require.NoError(t, err)

util.Eventually(t, func(t require.TestingT) {
require.NotNil(t, fact.created)
require.Equal(t, 1, int(fact.created.Load()))
require.Equal(t, 1, len(a.mm.ListInstances()))
})

t.Run("instances should be running", func(t *testing.T) {
for _, mi := range fact.Mocks() {
// Each instance should have wait called on it
util.Eventually(t, func(t require.TestingT) {
require.True(t, mi.running.Load())
})
}
})

util.Eventually(t, func(t require.TestingT) {
if err := testutil.GatherAndCompare(reg,
strings.NewReader(`
# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs
# TYPE agent_metrics_configs_changed_total counter
agent_metrics_configs_changed_total{event="created"} 1
`), "agent_metrics_configs_changed_total"); err != nil {
t.Errorf("mismatch metrics: %v", err)
t.FailNow()
}
})

// The config has changed (it used to be ""). The log line won't be printed.
checkConfigReloadLog(t, logBuffer.String(), 0)

//
// Try the same config.
//
var sameCfg Config

err = yaml.Unmarshal([]byte(cfgText), &sameCfg)
require.NoError(t, err)
err = sameCfg.ApplyDefaults()
require.NoError(t, err)

a.ApplyConfig(sameCfg)

util.Eventually(t, func(t require.TestingT) {
if err := testutil.GatherAndCompare(reg,
strings.NewReader(`
# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs
# TYPE agent_metrics_configs_changed_total counter
agent_metrics_configs_changed_total{event="created"} 1
`), "agent_metrics_configs_changed_total"); err != nil {
t.Errorf("mismatch metrics: %v", err)
t.FailNow()
}
})

// The config did not change. The log line should be printed.
checkConfigReloadLog(t, logBuffer.String(), 1)

//
// Try a different config.
//
cfgText = `
wal_directory: /tmp/wal
configs:
- name: instance_b
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
`
var differentCfg Config

err = yaml.Unmarshal([]byte(cfgText), &differentCfg)
require.NoError(t, err)
err = differentCfg.ApplyDefaults()
require.NoError(t, err)

a.ApplyConfig(differentCfg)

util.Eventually(t, func(t require.TestingT) {
if err := testutil.GatherAndCompare(reg,
strings.NewReader(`
# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs
# TYPE agent_metrics_configs_changed_total counter
agent_metrics_configs_changed_total{event="created"} 2
agent_metrics_configs_changed_total{event="deleted"} 1
`), "agent_metrics_configs_changed_total"); err != nil {
t.Errorf("mismatch metrics: %v", err)
t.FailNow()
}
})

// The config has changed. The log line won't be printed.
checkConfigReloadLog(t, logBuffer.String(), 1)

for _, mi := range fact.Mocks() {
util.Eventually(t, func(t require.TestingT) {
require.Equal(t, 1, int(mi.startedCount.Load()))
})
}

a.Stop()
}

func TestAgent(t *testing.T) {
// Launch two instances
cfg := Config{
Expand Down
Loading

0 comments on commit 8dbd241

Please sign in to comment.