diff --git a/CHANGELOG.md b/CHANGELOG.md index e817031cdefc..1ef9e663f508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,8 @@ Main (unreleased) - Increased the alert interval and renamed the `ClusterSplitBrain` alert to `ClusterNodeCountMismatch` in the Grafana Agent Mixin to better match the alert conditions. (@thampiotr) +- Not restart tailers in `loki.source.kubernetes` component by above-average time deltas if K8s version is >= 1.29.1 (@hainenber) + - Add conversion from static to flow mode for `loki.source.windowsevent` via `legacy_bookmark_path`. (@mattdurham) - Add ability to convert static mode positions file to `loki.source.file` compatible via `legacy_positions_file` argument. (@mattdurham) diff --git a/go.mod b/go.mod index 4dbe3378793d..ffa16b0cd690 100644 --- a/go.mod +++ b/go.mod @@ -300,7 +300,7 @@ require ( github.com/aws/smithy-go v1.20.1 // indirect github.com/beevik/ntp v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/blang/semver/v4 v4.0.0 // indirect + github.com/blang/semver/v4 v4.0.0 github.com/boynux/squid-exporter v1.10.5-0.20230618153315-c1fae094e18e github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect diff --git a/internal/component/loki/source/kubernetes/kubetail/tailer.go b/internal/component/loki/source/kubernetes/kubetail/tailer.go index 4e30ab511af4..27c8527dec42 100644 --- a/internal/component/loki/source/kubernetes/kubetail/tailer.go +++ b/internal/component/loki/source/kubernetes/kubetail/tailer.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/grafana/agent/internal/component/common/loki" "github.com/grafana/agent/internal/flow/logging/level" @@ -170,6 +171,15 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { return err } + k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion() + if err != nil { + return err + } + k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion) + if err != nil { + return err + } + // Create a new rolling average calculator to determine the average delta // time between log entries. // @@ -180,41 +190,48 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { // The computed average will never be less than the minimum of 2s. calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime) - go func() { - rolledFileTicker := time.NewTicker(1 * time.Second) - defer func() { - rolledFileTicker.Stop() - _ = stream.Close() - }() - for { - select { - case <-ctx.Done(): - return - case <-rolledFileTicker.C: - // Versions of Kubernetes which do not contain - // kubernetes/kubernetes#115702 will fail to detect rolled log files - // and stop sending logs to us. - // - // To work around this, we use a rolling average to determine how - // frequent we usually expect to see entries. If 3x the normal delta has - // elapsed, we'll restart the tailer. - // - // False positives here are acceptable, but false negatives mean that - // we'll have a larger spike of missing logs until we detect a rolled - // file. - avg := calc.GetAverage() - last := calc.GetLast() - if last.IsZero() { - continue - } - s := time.Since(last) - if s > avg*3 { - level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s) + // Versions of Kubernetes which do not contain + // kubernetes/kubernetes#115702 (<= v1.29.1) will fail to detect rotated log files + // and stop sending logs to us. + // + // To work around this, we use a rolling average to determine how + // frequent we usually expect to see entries. If 3x the normal delta has + // elapsed, we'll restart the tailer. + // + // False positives here are acceptable, but false negatives mean that + // we'll have a larger spike of missing logs until we detect a rolled + // file. + if k8sComparableServerVersion.LT(semver.Version{Major: 1, Minor: 29, Patch: 0}) { + go func() { + rolledFileTicker := time.NewTicker(1 * time.Second) + defer func() { + rolledFileTicker.Stop() + _ = stream.Close() + }() + for { + select { + case <-ctx.Done(): return + case <-rolledFileTicker.C: + avg := calc.GetAverage() + last := calc.GetLast() + if last.IsZero() { + continue + } + s := time.Since(last) + if s > avg*3 { + level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s) + return + } } } - } - }() + }() + } else { + go func() { + <-ctx.Done() + _ = stream.Close() + }() + } level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)