From 4b72f08160ed9ee8c03e066987a94e3196de58e9 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Wed, 20 Sep 2023 10:48:22 +0200 Subject: [PATCH] loki.source.file: support starting log tailers at the end of file (#5165) Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 3 + component/loki/source/file/file.go | 2 + component/loki/source/file/tailer.go | 72 +++++++++++++++- component/loki/source/file/tailer_test.go | 80 ++++++++++++++++++ .../reference/components/loki.source.file.md | 84 +++++++++++-------- 5 files changed, 206 insertions(+), 35 deletions(-) create mode 100644 component/loki/source/file/tailer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ffba2d61b8..e54e549a9fdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,9 @@ Main (unreleased) - `loki.source.kafka` component now exposes internal label `__meta_kafka_offset` to indicate offset of consumed message. (@hainenber) +- Add a`tail_from_end` attribute in `loki.source.file` to have the option to start tailing a file from the end if a cached position is not found. + This is valuable when you want to tail a large file without reading its entire content. (@wildum) + - Flow: improve river config validation step in `prometheus.scrape` by comparing `scrape_timeout` with `scrape_interval`. (@wildum) - Add support for `windows_certificate_filter` under http tls config block. (@mattdurham) diff --git a/component/loki/source/file/file.go b/component/loki/source/file/file.go index 5a45d7e0664b..0c8d1b8d2c4f 100644 --- a/component/loki/source/file/file.go +++ b/component/loki/source/file/file.go @@ -42,6 +42,7 @@ type Arguments struct { Encoding string `river:"encoding,attr,optional"` DecompressionConfig DecompressionConfig `river:"decompression,block,optional"` FileWatch FileWatch `river:"file_watch,block,optional"` + TailFromEnd bool `river:"tail_from_end,attr,optional"` } type FileWatch struct { @@ -348,6 +349,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok labels.String(), c.args.Encoding, pollOptions, + c.args.TailFromEnd, ) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path) diff --git a/component/loki/source/file/tailer.go b/component/loki/source/file/tailer.go index 780e5c2bfb58..b7f5a731567f 100644 --- a/component/loki/source/file/tailer.go +++ b/component/loki/source/file/tailer.go @@ -4,7 +4,9 @@ package file // tailer implements the reader interface by using the github.com/grafana/tail package to tail files. import ( + "bytes" "fmt" + "io" "os" "sync" "time" @@ -45,7 +47,8 @@ type tailer struct { decoder *encoding.Decoder } -func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encoding string, pollOptions watch.PollingFileWatcherOptions) (*tailer, error) { +func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, + labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) { // Simple check to make sure the file we are tailing doesn't // have a position already saved which is past the end of the file. fi, err := os.Stat(path) @@ -61,6 +64,17 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p positions.Remove(path, labels) } + // If no cached position is found and the tailFromEnd option is enabled. + if pos == 0 && tailFromEnd { + pos, err = getLastLinePosition(path) + if err != nil { + level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err) + } else { + positions.Put(path, labels, pos) + level.Info(logger).Log("msg", "retrieved and stored the position of the last line") + } + } + tail, err := tail.TailFile(path, tail.Config{ Follow: true, Poll: true, @@ -108,6 +122,62 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p return tailer, nil } +// getLastLinePosition returns the offset of the start of the last line in the file at the given path. +// It will read chunks of bytes starting from the end of the file to return the position of the last '\n' + 1. +// If it cannot find any '\n' it will return 0. +func getLastLinePosition(path string) (int64, error) { + file, err := os.Open(path) + if err != nil { + return 0, err + } + defer file.Close() + + const chunkSize = 1024 + + buf := make([]byte, chunkSize) + fi, err := file.Stat() + if err != nil { + return 0, err + } + + if fi.Size() == 0 { + return 0, nil + } + + var pos int64 = fi.Size() - chunkSize + if pos < 0 { + pos = 0 + } + + for { + _, err = file.Seek(pos, io.SeekStart) + if err != nil { + return 0, err + } + + bytesRead, err := file.Read(buf) + if err != nil { + return 0, err + } + + idx := bytes.LastIndexByte(buf[:bytesRead], '\n') + // newline found + if idx != -1 { + return pos + int64(idx) + 1, nil + } + + // no newline found in the entire file + if pos == 0 { + return 0, nil + } + + pos -= chunkSize + if pos < 0 { + pos = 0 + } + } +} + // updatePosition is run in a goroutine and checks the current size of the file // and saves it to the positions file at a regular interval. If there is ever // an error it stops the tailer and exits, the tailer will be re-opened by the diff --git a/component/loki/source/file/tailer_test.go b/component/loki/source/file/tailer_test.go new file mode 100644 index 000000000000..b023ea631fd1 --- /dev/null +++ b/component/loki/source/file/tailer_test.go @@ -0,0 +1,80 @@ +package file + +import ( + "bytes" + "os" + "testing" +) + +func createTempFileWithContent(t *testing.T, content []byte) string { + t.Helper() + tmpfile, err := os.CreateTemp("", "testfile") + if err != nil { + t.Fatalf("Failed to create temp file: %v", err) + } + + _, err = tmpfile.Write(content) + if err != nil { + tmpfile.Close() + t.Fatalf("Failed to write to temp file: %v", err) + } + + tmpfile.Close() + return tmpfile.Name() +} + +func TestGetLastLinePosition(t *testing.T) { + tests := []struct { + name string + content []byte + expected int64 + }{ + { + name: "File ending with newline", + content: []byte("Hello, World!\n"), + expected: 14, // Position after last '\n' + }, + { + name: "Newline in the middle", + content: []byte("Hello\nWorld"), + expected: 6, // Position after the '\n' in "Hello\n" + }, + { + name: "File not ending with newline", + content: []byte("Hello, World!"), + expected: 0, + }, + { + name: "File bigger than chunkSize without newline", + content: bytes.Repeat([]byte("A"), 1025), + expected: 0, + }, + { + name: "File bigger than chunkSize with newline in between", + content: append([]byte("Hello\n"), bytes.Repeat([]byte("A"), 1025)...), + expected: 6, // Position after the "Hello\n" + }, + { + name: "Empty file", + content: []byte(""), + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filename := createTempFileWithContent(t, tt.content) + defer os.Remove(filename) + + got, err := getLastLinePosition(filename) + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if got != tt.expected { + t.Errorf("for content %q, expected position %d but got %d", tt.content, tt.expected, got) + } + }) + } +} diff --git a/docs/sources/flow/reference/components/loki.source.file.md b/docs/sources/flow/reference/components/loki.source.file.md index d972cc543274..a9b257c53291 100644 --- a/docs/sources/flow/reference/components/loki.source.file.md +++ b/docs/sources/flow/reference/components/loki.source.file.md @@ -1,8 +1,8 @@ --- aliases: -- /docs/grafana-cloud/agent/flow/reference/components/loki.source.file/ -- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/loki.source.file/ -- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/loki.source.file/ + - /docs/grafana-cloud/agent/flow/reference/components/loki.source.file/ + - /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/loki.source.file/ + - /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/loki.source.file/ canonical: https://grafana.com/docs/agent/latest/flow/reference/components/loki.source.file/ title: loki.source.file --- @@ -29,51 +29,57 @@ loki.source.file "LABEL" { ``` ## Arguments + The component starts a new reader for each of the given `targets` and fans out log entries to the list of receivers passed in `forward_to`. `loki.source.file` supports the following arguments: - Name | Type | Description | Default | Required ---------------|----------------------|------------------------------------------------------------|---------|---------- - `targets` | `list(map(string))` | List of files to read from. | | yes - `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes - `encoding` | `string` | The encoding to convert from when reading files. | `""` | no +| Name | Type | Description | Default | Required | +| --------------- | -------------------- | ----------------------------------------------------------------------------------- | ------- | -------- | +| `targets` | `list(map(string))` | List of files to read from. | | yes | +| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes | +| `encoding` | `string` | The encoding to convert from when reading files. | `""` | no | +| `tail_from_end` | `bool` | Whether a log file should be tailed from the end if a stored position is not found. | `false` | no | The `encoding` argument must be a valid [IANA encoding][] name. If not set, it defaults to UTF-8. +You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content. +When set to true, only new logs will be read, ignoring the existing ones. + ## Blocks The following blocks are supported inside the definition of `loki.source.file`: - Hierarchy | Name | Description | Required -----------------|--------------------|-------------------------------------------------------------------|---------- - decompresssion | [decompresssion][] | Configure reading logs from compressed files. | no - file_watch | [file_watch][] | Configure how often files should be polled from disk for changes. | no +| Hierarchy | Name | Description | Required | +| -------------- | ------------------ | ----------------------------------------------------------------- | -------- | +| decompresssion | [decompresssion][] | Configure reading logs from compressed files. | no | +| file_watch | [file_watch][] | Configure how often files should be polled from disk for changes. | no | [decompresssion]: #decompresssion-block [file_watch]: #file_watch-block ### decompresssion block -The `decompression` block contains configuration for reading logs from +The `decompression` block contains configuration for reading logs from compressed files. The following arguments are supported: - Name | Type | Description | Default | Required ------------------|------------|-----------------------------------------------------------------|---------|---------- - `enabled` | `bool` | Whether decompression is enabled. | | yes - `initial_delay` | `duration` | Time to wait before starting to read from new compressed files. | 0 | no - `format` | `string` | Compression format. | | yes +| Name | Type | Description | Default | Required | +| --------------- | ---------- | --------------------------------------------------------------- | ------- | -------- | +| `enabled` | `bool` | Whether decompression is enabled. | | yes | +| `initial_delay` | `duration` | Time to wait before starting to read from new compressed files. | 0 | no | +| `format` | `string` | Compression format. | | yes | If you compress a file under a folder being scraped, `loki.source.file` might try to ingest your file before you finish compressing it. To avoid it, pick an `initial_delay` that is enough to avoid it. Currently supported compression formats are: -* `gz` - for gzip -* `z` - for zlib -* `bz2` - for bzip2 + +- `gz` - for gzip +- `z` - for zlib +- `bz2` - for bzip2 The component can only support one compression format at a time, in order to handle multiple formats, you will need to create multiple components. @@ -83,10 +89,10 @@ handle multiple formats, you will need to create multiple components. The `file_watch` block configures how often log files are polled from disk for changes. The following arguments are supported: - Name | Type | Description | Default | Required -----------------------|------------|-------------------------------------------|---------|---------- - `min_poll_frequency` | `duration` | Minimum frequency to poll for files. | 250ms | no - `max_poll_frequency` | `duration` | Maximum frequency to poll for files. | 250ms | no +| Name | Type | Description | Default | Required | +| -------------------- | ---------- | ------------------------------------ | ------- | -------- | +| `min_poll_frequency` | `duration` | Minimum frequency to poll for files. | 250ms | no | +| `max_poll_frequency` | `duration` | Maximum frequency to poll for files. | 250ms | no | If no file changes are detected, the poll frequency doubles until a file change is detected or the poll frequency reaches the `max_poll_frequency`. @@ -104,26 +110,33 @@ configuration. ## Debug information `loki.source.file` exposes some target-level debug information per reader: -* The tailed path. -* Whether the reader is currently running. -* What is the last recorded read offset in the positions file. + +- The tailed path. +- Whether the reader is currently running. +- What is the last recorded read offset in the positions file. ## Debug metrics -* `loki_source_file_read_bytes_total` (gauge): Number of bytes read. -* `loki_source_file_file_bytes_total` (gauge): Number of bytes total. -* `loki_source_file_read_lines_total` (counter): Number of lines read. -* `loki_source_file_encoding_failures_total` (counter): Number of encoding failures. -* `loki_source_file_files_active_total` (gauge): Number of active files. + +- `loki_source_file_read_bytes_total` (gauge): Number of bytes read. +- `loki_source_file_file_bytes_total` (gauge): Number of bytes total. +- `loki_source_file_read_lines_total` (counter): Number of lines read. +- `loki_source_file_encoding_failures_total` (counter): Number of encoding failures. +- `loki_source_file_files_active_total` (gauge): Number of active files. ## Component behavior + +If the decompression feature is deactivated, the component will continuously monitor and 'tail' the files. +In this mode, upon reaching the end of a file, the component remains active, awaiting and reading new entries in real-time as they are appended. + Each element in the list of `targets` as a set of key-value pairs called _labels_. The set of targets can either be _static_, or dynamically provided periodically by a service discovery component. The special label `__path__` _must always_ be present and must point to the absolute path of the file to read from. + -The `__path__` value is available as the `filename` label to each log entry +The `__path__` value is available as the `filename` label to each log entry the component reads. All other labels starting with a double underscore are considered _internal_ and are removed from the log entries before they're passed to other `loki.*` components. @@ -140,6 +153,7 @@ beginning. ## Examples ### Static targets + This example collects log entries from the files specified in the targets argument and forwards them to a `loki.write` component to be written to Loki. @@ -161,6 +175,7 @@ loki.write "local" { ``` ### File globbing + This example collects log entries from the files matching `*.log` pattern using `local.file_match` component. When files appear or disappear, the list of targets will be updated accordingly. @@ -186,6 +201,7 @@ loki.write "local" { ``` ### Decompression + This example collects log entries from the compressed files matching `*.gz` pattern using `local.file_match` component and the decompression configuration on the `loki.source.file` component.