Skip to content

Commit

Permalink
Sync loki.source.file with promtail (#5245)
Browse files Browse the repository at this point in the history
* Sync loki.source.file with promtail

* changelog
  • Loading branch information
thampiotr authored Sep 19, 2023
1 parent 4bd9763 commit 23dfab9
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Main (unreleased)

- Fixed a bug where converting `YACE` cloudwatch config to river skipped converting static jobs. (@berler)

- Fix `loki.source.file` race condition in cleaning up metrics when stopping to tail files. (@thampiotr)

v0.36.1 (2023-09-06)
--------------------

Expand Down
6 changes: 3 additions & 3 deletions component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package file

// This code is copied from Promtail. decompressor implements the reader
// interface and is used to read compressed log files. It uses the Go stdlib's
// compress/* packages for decoding.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// Decompressor implements the reader interface and is used to read compressed log files.
// It uses the Go stdlib's compress/* packages for decoding.

import (
"bufio"
Expand Down
6 changes: 3 additions & 3 deletions component/loki/source/file/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package file

// This code is copied from Promtail. The metrics struct provides a common set
// of metrics that are reused between all implementations of the reader
// interface.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// The metrics struct provides a common set of metrics that are reused between all
// implementations of the reader interface.

import "github.com/prometheus/client_golang/prometheus"

Expand Down
4 changes: 2 additions & 2 deletions component/loki/source/file/reader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package file

// This code is copied from Promtail to accommodate the tailer and decompressor
// implementations as readers.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// This code accommodates the tailer and decompressor implementations as readers.

// reader contains the set of methods the loki.source.file component uses.
type reader interface {
Expand Down
19 changes: 11 additions & 8 deletions component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package file

// This code is copied from Promtail. tailer implements the reader interface by
// using the github.com/grafana/tail package to tail files.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// tailer implements the reader interface by using the github.com/grafana/tail package to tail files.

import (
"fmt"
Expand Down Expand Up @@ -119,6 +119,8 @@ func (t *tailer) updatePosition() {
defer func() {
positionWait.Stop()
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
// NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics.
t.cleanupMetrics()
close(t.posdone)
}()

Expand Down Expand Up @@ -154,10 +156,11 @@ func (t *tailer) readLines() {
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
t.cleanupMetrics()
t.running.Store(false)
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
close(t.done)
// Shut down the position marker thread
close(t.posquit)
}()
entries := t.handler.Chan()
for {
Expand Down Expand Up @@ -211,12 +214,14 @@ func (t *tailer) MarkPositionAndSize() error {
}
return err
}
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))

pos, err := t.tail.Tell()
if err != nil {
return err
}

// Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped.
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, t.labels, pos)

Expand All @@ -227,10 +232,6 @@ func (t *tailer) Stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
t.stopOnce.Do(func() {
// Shut down the position marker thread
close(t.posquit)
<-t.posdone

// Save the current position before shutting down tailer
err := t.MarkPositionAndSize()
if err != nil {
Expand All @@ -244,6 +245,8 @@ func (t *tailer) Stop() {
}
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-t.done
// Wait for the position marker thread to exit
<-t.posdone
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
t.handler.Stop()
})
Expand Down

0 comments on commit 23dfab9

Please sign in to comment.