diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b94e274ac9a..fe8b60d47181 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,8 @@ Main (unreleased) - Fix an issue on Windows where uninstalling Alloy did not remove it from the Add/Remove programs list. (@rfratto) +- Fix a bug with the logs pipeline in static mode which prevented it from shutting down cleanly. + ### Other changes - Clustering for Grafana Agent in Flow mode has graduated from beta to stable. diff --git a/go.mod b/go.mod index beba03e40b41..35e8a9cae92e 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb github.com/grafana/go-gelf/v2 v2.0.1 - github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 // k190 branch + github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b // k190 branch github.com/grafana/pyroscope-go/godeltaprof v0.1.7 github.com/grafana/pyroscope/api v0.4.0 github.com/grafana/pyroscope/ebpf v0.4.3 diff --git a/go.sum b/go.sum index fc9326cdba6b..e000d9c1cec0 100644 --- a/go.sum +++ b/go.sum @@ -1045,6 +1045,8 @@ github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a h1:jqM4NNdx github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a/go.mod h1:ZXGGyeTUMenf/H1CDBK9lv3azjswfa0nVzLoQAYmnDc= github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 h1:C191g5Ls8lIf9lkJEoScTQgoVDwUdK4HXKP5XtL+zAM= github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU= +github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b h1:x5JsSnExxRl9kTMNqHebMCv0fn+V1+T16z7Tgz6xYf4= +github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU= github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872 h1:6kPX7bngjBgUlHqADwZ6249UtzMaoQW5n0H8bOtnYeM= github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0= github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595 h1:I9sRknI5ajd8whPOX0nBDXy5B6xUfhItClMy+6R4oqE= diff --git a/internal/component/loki/process/metric/metricvec.go b/internal/component/loki/process/metric/metricvec.go index d1a8d6939909..d09cc53e35b6 100644 --- a/internal/component/loki/process/metric/metricvec.go +++ b/internal/component/loki/process/metric/metricvec.go @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool { return ok } +func (c *metricVec) DeleteAll() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.metrics = map[model.Fingerprint]prometheus.Metric{} +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/internal/component/loki/process/stages/decolorize.go b/internal/component/loki/process/stages/decolorize.go index 26356ef1552c..33f365934b88 100644 --- a/internal/component/loki/process/stages/decolorize.go +++ b/internal/component/loki/process/stages/decolorize.go @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry { func (m *decolorizeStage) Name() string { return StageTypeDecolorize } + +// Cleanup implements Stage. +func (*decolorizeStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/drop.go b/internal/component/loki/process/stages/drop.go index 96c212ba3c96..81ed0439f370 100644 --- a/internal/component/loki/process/stages/drop.go +++ b/internal/component/loki/process/stages/drop.go @@ -222,3 +222,8 @@ func splitSource(s string) []string { func (m *dropStage) Name() string { return StageTypeDrop } + +// Cleanup implements Stage. +func (*dropStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/eventlogmessage.go b/internal/component/loki/process/stages/eventlogmessage.go index 247b80163911..840248201825 100644 --- a/internal/component/loki/process/stages/eventlogmessage.go +++ b/internal/component/loki/process/stages/eventlogmessage.go @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string { return StageTypeEventLogMessage } +// Cleanup implements Stage. +func (*eventLogMessageStage) Cleanup() { + // no-op +} + // Sanitize a input string to convert it into a valid prometheus label // TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName func SanitizeFullLabelName(input string) string { diff --git a/internal/component/loki/process/stages/extensions.go b/internal/component/loki/process/stages/extensions.go index 3abbfd624346..df7648831df0 100644 --- a/internal/component/loki/process/stages/extensions.go +++ b/internal/component/loki/process/stages/extensions.go @@ -100,6 +100,11 @@ func (c *cri) Name() string { return StageTypeCRI } +// Cleanup implements Stage. +func (*cri) Cleanup() { + // no-op +} + // implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) diff --git a/internal/component/loki/process/stages/geoip.go b/internal/component/loki/process/stages/geoip.go index 505077ecfe2d..f5f777689eb8 100644 --- a/internal/component/loki/process/stages/geoip.go +++ b/internal/component/loki/process/stages/geoip.go @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string { return StageTypeGeoIP } +// Cleanup implements Stage. +func (*geoIPStage) Cleanup() { + // no-op +} + func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) { var ip net.IP if g.cfgs.Source != nil { diff --git a/internal/component/loki/process/stages/json.go b/internal/component/loki/process/stages/json.go index 5f98209b46ae..d96a203fd746 100644 --- a/internal/component/loki/process/stages/json.go +++ b/internal/component/loki/process/stages/json.go @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string func (j *jsonStage) Name() string { return StageTypeJSON } + +// Cleanup implements Stage. +func (*jsonStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/limit.go b/internal/component/loki/process/stages/limit.go index aee76ee519f6..7d7d9c1dec8d 100644 --- a/internal/component/loki/process/stages/limit.go +++ b/internal/component/loki/process/stages/limit.go @@ -130,6 +130,11 @@ func (m *limitStage) Name() string { return StageTypeLimit } +// Cleanup implements Stage. +func (*limitStage) Cleanup() { + // no-op +} + func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec { return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total", "A count of all log lines dropped as a result of a pipeline stage", diff --git a/internal/component/loki/process/stages/match.go b/internal/component/loki/process/stages/match.go index 4b84bb99a90a..63cb0f30d2a6 100644 --- a/internal/component/loki/process/stages/match.go +++ b/internal/component/loki/process/stages/match.go @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) { func (m *matcherStage) Name() string { return StageTypeMatch } + +// Cleanup implements Stage. +func (*matcherStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/metric.go b/internal/component/loki/process/stages/metric.go index 983354abd6de..880b61b085c4 100644 --- a/internal/component/loki/process/stages/metric.go +++ b/internal/component/loki/process/stages/metric.go @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg) } } - return toStage(&metricStage{ + return &metricStage{ logger: logger, cfg: config, metrics: metrics, - }), nil + }, nil } // metricStage creates and updates prometheus metrics based on extracted pipeline data @@ -118,6 +118,19 @@ type metricStage struct { metrics map[string]cfgCollector } +func (m *metricStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + + for e := range in { + m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line) + out <- e + } + }() + return out +} + // Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for name, cc := range m.metrics { @@ -162,6 +175,20 @@ func (m *metricStage) Name() string { return StageTypeMetric } +// Cleanup implements Stage. +func (m *metricStage) Cleanup() { + for _, cfgCollector := range m.metrics { + switch vec := cfgCollector.collector.(type) { + case *metric.Counters: + vec.DeleteAll() + case *metric.Gauges: + vec.DeleteAll() + case *metric.Histograms: + vec.DeleteAll() + } + } +} + // recordCounter will update a counter metric func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) { // If value matching is defined, make sure value matches. diff --git a/internal/component/loki/process/stages/metric_test.go b/internal/component/loki/process/stages/metric_test.go index 559c5fc913b8..b270d22440aa 100644 --- a/internal/component/loki/process/stages/metric_test.go +++ b/internal/component/loki/process/stages/metric_test.go @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) { strings.NewReader(expectedMetrics)); err != nil { t.Fatalf("mismatch metrics: %v", err) } + + pl.Cleanup() + + if err := testutil.GatherAndCompare(registry, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } } func TestNegativeGauge(t *testing.T) { diff --git a/internal/component/loki/process/stages/multiline.go b/internal/component/loki/process/stages/multiline.go index 677fe5c09b57..c9cb7cb21394 100644 --- a/internal/component/loki/process/stages/multiline.go +++ b/internal/component/loki/process/stages/multiline.go @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { func (m *multilineStage) Name() string { return StageTypeMultiline } + +// Cleanup implements Stage. +func (*multilineStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/pack.go b/internal/component/loki/process/stages/pack.go index a5d1633b4011..0a2cf6a0b3a7 100644 --- a/internal/component/loki/process/stages/pack.go +++ b/internal/component/loki/process/stages/pack.go @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry { func (m *packStage) Name() string { return StageTypePack } + +// Cleanup implements Stage. +func (*packStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index e24642c84cba..57a9a58e1104 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -144,6 +144,13 @@ func (p *Pipeline) Name() string { return StageTypePipeline } +// Cleanup implements Stage. +func (p *Pipeline) Cleanup() { + for _, s := range p.stages { + s.Cleanup() + } +} + // Wrap implements EntryMiddleware func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler { handlerIn := make(chan loki.Entry) @@ -181,6 +188,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler { return loki.NewEntryHandler(handlerIn, func() { once.Do(func() { close(handlerIn) }) wg.Wait() + p.Cleanup() }) } diff --git a/internal/component/loki/process/stages/sampling.go b/internal/component/loki/process/stages/sampling.go index 53bb6ad2e579..dec5976a6db3 100644 --- a/internal/component/loki/process/stages/sampling.go +++ b/internal/component/loki/process/stages/sampling.go @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 { func (m *samplingStage) Name() string { return StageTypeSampling } + +// Cleanup implements Stage. +func (*samplingStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/stage.go b/internal/component/loki/process/stages/stage.go index 0958bfdf09b2..2cb69c26e48f 100644 --- a/internal/component/loki/process/stages/stage.go +++ b/internal/component/loki/process/stages/stage.go @@ -61,6 +61,7 @@ type Entry struct { type Stage interface { Name() string Run(chan Entry) chan Entry + Cleanup() } func (entry *Entry) copy() *Entry { @@ -243,3 +244,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh } return s, nil } + +// Cleanup implements Stage. +func (*stageProcessor) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/structured_metadata.go b/internal/component/loki/process/stages/structured_metadata.go index abaee55245b8..256bbb6df3c3 100644 --- a/internal/component/loki/process/stages/structured_metadata.go +++ b/internal/component/loki/process/stages/structured_metadata.go @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string { return StageTypeStructuredMetadata } +// Cleanup implements Stage. +func (*structuredMetadataStage) Cleanup() { + // no-op +} + func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { return RunWith(in, func(e Entry) Entry { processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {