diff --git a/CHANGELOG.md b/CHANGELOG.md index 434c816524de..5ad6e4e0214e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,8 @@ Main (unreleased) - Fixed a bug where converting `YACE` cloudwatch config to river skipped converting static jobs. (@berler) +- Fix `loki.source.file` race condition in cleaning up metrics when stopping to tail files. (@thampiotr) + v0.36.1 (2023-09-06) -------------------- diff --git a/component/loki/source/file/decompresser.go b/component/loki/source/file/decompresser.go index 687218fde7eb..d9fe39dae7bc 100644 --- a/component/loki/source/file/decompresser.go +++ b/component/loki/source/file/decompresser.go @@ -1,8 +1,8 @@ package file -// This code is copied from Promtail. decompressor implements the reader -// interface and is used to read compressed log files. It uses the Go stdlib's -// compress/* packages for decoding. +// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// Decompressor implements the reader interface and is used to read compressed log files. +// It uses the Go stdlib's compress/* packages for decoding. import ( "bufio" diff --git a/component/loki/source/file/metrics.go b/component/loki/source/file/metrics.go index 004eb7fd23a3..4f8be70fb706 100644 --- a/component/loki/source/file/metrics.go +++ b/component/loki/source/file/metrics.go @@ -1,8 +1,8 @@ package file -// This code is copied from Promtail. The metrics struct provides a common set -// of metrics that are reused between all implementations of the reader -// interface. +// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// The metrics struct provides a common set of metrics that are reused between all +// implementations of the reader interface. import "github.com/prometheus/client_golang/prometheus" diff --git a/component/loki/source/file/reader.go b/component/loki/source/file/reader.go index b0be091395b8..04016dc6c112 100644 --- a/component/loki/source/file/reader.go +++ b/component/loki/source/file/reader.go @@ -1,7 +1,7 @@ package file -// This code is copied from Promtail to accommodate the tailer and decompressor -// implementations as readers. +// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// This code accommodates the tailer and decompressor implementations as readers. // reader contains the set of methods the loki.source.file component uses. type reader interface { diff --git a/component/loki/source/file/tailer.go b/component/loki/source/file/tailer.go index 52038f51267e..780e5c2bfb58 100644 --- a/component/loki/source/file/tailer.go +++ b/component/loki/source/file/tailer.go @@ -1,7 +1,7 @@ package file -// This code is copied from Promtail. tailer implements the reader interface by -// using the github.com/grafana/tail package to tail files. +// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5. +// tailer implements the reader interface by using the github.com/grafana/tail package to tail files. import ( "fmt" @@ -119,6 +119,8 @@ func (t *tailer) updatePosition() { defer func() { positionWait.Stop() level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path) + // NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics. + t.cleanupMetrics() close(t.posdone) }() @@ -154,10 +156,11 @@ func (t *tailer) readLines() { // This function runs in a goroutine, if it exits this tailer will never do any more tailing. // Clean everything up. defer func() { - t.cleanupMetrics() t.running.Store(false) level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path) close(t.done) + // Shut down the position marker thread + close(t.posquit) }() entries := t.handler.Chan() for { @@ -211,12 +214,14 @@ func (t *tailer) MarkPositionAndSize() error { } return err } - t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) pos, err := t.tail.Tell() if err != nil { return err } + + // Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped. + t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size)) t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos)) t.positions.Put(t.path, t.labels, pos) @@ -227,10 +232,6 @@ func (t *tailer) Stop() { // stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once // we wrap the stop in a sync.Once. t.stopOnce.Do(func() { - // Shut down the position marker thread - close(t.posquit) - <-t.posdone - // Save the current position before shutting down tailer err := t.MarkPositionAndSize() if err != nil { @@ -244,6 +245,8 @@ func (t *tailer) Stop() { } // Wait for readLines() to consume all the remaining messages and exit when the channel is closed <-t.done + // Wait for the position marker thread to exit + <-t.posdone level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) t.handler.Stop() })