Skip to content

Commit

Permalink
loki.source.file: support starting log tailers at the end of file (#5165
Browse files Browse the repository at this point in the history
)

Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
wildum and clayton-cornell authored Sep 20, 2023
1 parent d91ae42 commit 4b72f08
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 35 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ Main (unreleased)
- `loki.source.kafka` component now exposes internal label `__meta_kafka_offset`
to indicate offset of consumed message. (@hainenber)

- Add a`tail_from_end` attribute in `loki.source.file` to have the option to start tailing a file from the end if a cached position is not found.
This is valuable when you want to tail a large file without reading its entire content. (@wildum)

- Flow: improve river config validation step in `prometheus.scrape` by comparing `scrape_timeout` with `scrape_interval`. (@wildum)

- Add support for `windows_certificate_filter` under http tls config block. (@mattdurham)
Expand Down
2 changes: 2 additions & 0 deletions component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Arguments struct {
Encoding string `river:"encoding,attr,optional"`
DecompressionConfig DecompressionConfig `river:"decompression,block,optional"`
FileWatch FileWatch `river:"file_watch,block,optional"`
TailFromEnd bool `river:"tail_from_end,attr,optional"`
}

type FileWatch struct {
Expand Down Expand Up @@ -348,6 +349,7 @@ func (c *Component) startTailing(path string, labels model.LabelSet, handler lok
labels.String(),
c.args.Encoding,
pollOptions,
c.args.TailFromEnd,
)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path)
Expand Down
72 changes: 71 additions & 1 deletion component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package file
// tailer implements the reader interface by using the github.com/grafana/tail package to tail files.

import (
"bytes"
"fmt"
"io"
"os"
"sync"
"time"
Expand Down Expand Up @@ -45,7 +47,8 @@ type tailer struct {
decoder *encoding.Decoder
}

func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encoding string, pollOptions watch.PollingFileWatcherOptions) (*tailer, error) {
func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string,
labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) {
// Simple check to make sure the file we are tailing doesn't
// have a position already saved which is past the end of the file.
fi, err := os.Stat(path)
Expand All @@ -61,6 +64,17 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p
positions.Remove(path, labels)
}

// If no cached position is found and the tailFromEnd option is enabled.
if pos == 0 && tailFromEnd {
pos, err = getLastLinePosition(path)
if err != nil {
level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err)
} else {
positions.Put(path, labels, pos)
level.Info(logger).Log("msg", "retrieved and stored the position of the last line")
}
}

tail, err := tail.TailFile(path, tail.Config{
Follow: true,
Poll: true,
Expand Down Expand Up @@ -108,6 +122,62 @@ func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, p
return tailer, nil
}

// getLastLinePosition returns the offset of the start of the last line in the file at the given path.
// It will read chunks of bytes starting from the end of the file to return the position of the last '\n' + 1.
// If it cannot find any '\n' it will return 0.
func getLastLinePosition(path string) (int64, error) {
file, err := os.Open(path)
if err != nil {
return 0, err
}
defer file.Close()

const chunkSize = 1024

buf := make([]byte, chunkSize)
fi, err := file.Stat()
if err != nil {
return 0, err
}

if fi.Size() == 0 {
return 0, nil
}

var pos int64 = fi.Size() - chunkSize
if pos < 0 {
pos = 0
}

for {
_, err = file.Seek(pos, io.SeekStart)
if err != nil {
return 0, err
}

bytesRead, err := file.Read(buf)
if err != nil {
return 0, err
}

idx := bytes.LastIndexByte(buf[:bytesRead], '\n')
// newline found
if idx != -1 {
return pos + int64(idx) + 1, nil
}

// no newline found in the entire file
if pos == 0 {
return 0, nil
}

pos -= chunkSize
if pos < 0 {
pos = 0
}
}
}

// updatePosition is run in a goroutine and checks the current size of the file
// and saves it to the positions file at a regular interval. If there is ever
// an error it stops the tailer and exits, the tailer will be re-opened by the
Expand Down
80 changes: 80 additions & 0 deletions component/loki/source/file/tailer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package file

import (
"bytes"
"os"
"testing"
)

func createTempFileWithContent(t *testing.T, content []byte) string {
t.Helper()
tmpfile, err := os.CreateTemp("", "testfile")
if err != nil {
t.Fatalf("Failed to create temp file: %v", err)
}

_, err = tmpfile.Write(content)
if err != nil {
tmpfile.Close()
t.Fatalf("Failed to write to temp file: %v", err)
}

tmpfile.Close()
return tmpfile.Name()
}

func TestGetLastLinePosition(t *testing.T) {
tests := []struct {
name string
content []byte
expected int64
}{
{
name: "File ending with newline",
content: []byte("Hello, World!\n"),
expected: 14, // Position after last '\n'
},
{
name: "Newline in the middle",
content: []byte("Hello\nWorld"),
expected: 6, // Position after the '\n' in "Hello\n"
},
{
name: "File not ending with newline",
content: []byte("Hello, World!"),
expected: 0,
},
{
name: "File bigger than chunkSize without newline",
content: bytes.Repeat([]byte("A"), 1025),
expected: 0,
},
{
name: "File bigger than chunkSize with newline in between",
content: append([]byte("Hello\n"), bytes.Repeat([]byte("A"), 1025)...),
expected: 6, // Position after the "Hello\n"
},
{
name: "Empty file",
content: []byte(""),
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filename := createTempFileWithContent(t, tt.content)
defer os.Remove(filename)

got, err := getLastLinePosition(filename)
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}

if got != tt.expected {
t.Errorf("for content %q, expected position %d but got %d", tt.content, tt.expected, got)
}
})
}
}
84 changes: 50 additions & 34 deletions docs/sources/flow/reference/components/loki.source.file.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
aliases:
- /docs/grafana-cloud/agent/flow/reference/components/loki.source.file/
- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/loki.source.file/
- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/loki.source.file/
- /docs/grafana-cloud/agent/flow/reference/components/loki.source.file/
- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/loki.source.file/
- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/loki.source.file/
canonical: https://grafana.com/docs/agent/latest/flow/reference/components/loki.source.file/
title: loki.source.file
---
Expand All @@ -29,51 +29,57 @@ loki.source.file "LABEL" {
```

## Arguments

The component starts a new reader for each of the given `targets` and fans out
log entries to the list of receivers passed in `forward_to`.

`loki.source.file` supports the following arguments:

Name | Type | Description | Default | Required
--------------|----------------------|------------------------------------------------------------|---------|----------
`targets` | `list(map(string))` | List of files to read from. | | yes
`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes
`encoding` | `string` | The encoding to convert from when reading files. | `""` | no
| Name | Type | Description | Default | Required |
| --------------- | -------------------- | ----------------------------------------------------------------------------------- | ------- | -------- |
| `targets` | `list(map(string))` | List of files to read from. | | yes |
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| `encoding` | `string` | The encoding to convert from when reading files. | `""` | no |
| `tail_from_end` | `bool` | Whether a log file should be tailed from the end if a stored position is not found. | `false` | no |

The `encoding` argument must be a valid [IANA encoding][] name. If not set, it
defaults to UTF-8.

You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content.
When set to true, only new logs will be read, ignoring the existing ones.

## Blocks

The following blocks are supported inside the definition of `loki.source.file`:

Hierarchy | Name | Description | Required
----------------|--------------------|-------------------------------------------------------------------|----------
decompresssion | [decompresssion][] | Configure reading logs from compressed files. | no
file_watch | [file_watch][] | Configure how often files should be polled from disk for changes. | no
| Hierarchy | Name | Description | Required |
| -------------- | ------------------ | ----------------------------------------------------------------- | -------- |
| decompresssion | [decompresssion][] | Configure reading logs from compressed files. | no |
| file_watch | [file_watch][] | Configure how often files should be polled from disk for changes. | no |

[decompresssion]: #decompresssion-block
[file_watch]: #file_watch-block

### decompresssion block

The `decompression` block contains configuration for reading logs from
The `decompression` block contains configuration for reading logs from
compressed files. The following arguments are supported:

Name | Type | Description | Default | Required
-----------------|------------|-----------------------------------------------------------------|---------|----------
`enabled` | `bool` | Whether decompression is enabled. | | yes
`initial_delay` | `duration` | Time to wait before starting to read from new compressed files. | 0 | no
`format` | `string` | Compression format. | | yes
| Name | Type | Description | Default | Required |
| --------------- | ---------- | --------------------------------------------------------------- | ------- | -------- |
| `enabled` | `bool` | Whether decompression is enabled. | | yes |
| `initial_delay` | `duration` | Time to wait before starting to read from new compressed files. | 0 | no |
| `format` | `string` | Compression format. | | yes |

If you compress a file under a folder being scraped, `loki.source.file` might
try to ingest your file before you finish compressing it. To avoid it, pick
an `initial_delay` that is enough to avoid it.

Currently supported compression formats are:
* `gz` - for gzip
* `z` - for zlib
* `bz2` - for bzip2

- `gz` - for gzip
- `z` - for zlib
- `bz2` - for bzip2

The component can only support one compression format at a time, in order to
handle multiple formats, you will need to create multiple components.
Expand All @@ -83,10 +89,10 @@ handle multiple formats, you will need to create multiple components.
The `file_watch` block configures how often log files are polled from disk for changes.
The following arguments are supported:

Name | Type | Description | Default | Required
----------------------|------------|-------------------------------------------|---------|----------
`min_poll_frequency` | `duration` | Minimum frequency to poll for files. | 250ms | no
`max_poll_frequency` | `duration` | Maximum frequency to poll for files. | 250ms | no
| Name | Type | Description | Default | Required |
| -------------------- | ---------- | ------------------------------------ | ------- | -------- |
| `min_poll_frequency` | `duration` | Minimum frequency to poll for files. | 250ms | no |
| `max_poll_frequency` | `duration` | Maximum frequency to poll for files. | 250ms | no |

If no file changes are detected, the poll frequency doubles until a file change is detected or the poll frequency reaches the `max_poll_frequency`.

Expand All @@ -104,26 +110,33 @@ configuration.
## Debug information

`loki.source.file` exposes some target-level debug information per reader:
* The tailed path.
* Whether the reader is currently running.
* What is the last recorded read offset in the positions file.

- The tailed path.
- Whether the reader is currently running.
- What is the last recorded read offset in the positions file.

## Debug metrics
* `loki_source_file_read_bytes_total` (gauge): Number of bytes read.
* `loki_source_file_file_bytes_total` (gauge): Number of bytes total.
* `loki_source_file_read_lines_total` (counter): Number of lines read.
* `loki_source_file_encoding_failures_total` (counter): Number of encoding failures.
* `loki_source_file_files_active_total` (gauge): Number of active files.

- `loki_source_file_read_bytes_total` (gauge): Number of bytes read.
- `loki_source_file_file_bytes_total` (gauge): Number of bytes total.
- `loki_source_file_read_lines_total` (counter): Number of lines read.
- `loki_source_file_encoding_failures_total` (counter): Number of encoding failures.
- `loki_source_file_files_active_total` (gauge): Number of active files.

## Component behavior

If the decompression feature is deactivated, the component will continuously monitor and 'tail' the files.
In this mode, upon reaching the end of a file, the component remains active, awaiting and reading new entries in real-time as they are appended.

Each element in the list of `targets` as a set of key-value pairs called
_labels_.
The set of targets can either be _static_, or dynamically provided periodically
by a service discovery component. The special label `__path__` _must always_ be
present and must point to the absolute path of the file to read from.

<!-- TODO(@tpaschalis) refer to local.file_match -->

The `__path__` value is available as the `filename` label to each log entry
The `__path__` value is available as the `filename` label to each log entry
the component reads. All other labels starting with a double underscore are
considered _internal_ and are removed from the log entries before they're
passed to other `loki.*` components.
Expand All @@ -140,6 +153,7 @@ beginning.
## Examples

### Static targets

This example collects log entries from the files specified in the targets
argument and forwards them to a `loki.write` component to be written to Loki.

Expand All @@ -161,6 +175,7 @@ loki.write "local" {
```

### File globbing

This example collects log entries from the files matching `*.log` pattern
using `local.file_match` component. When files appear or disappear, the list of
targets will be updated accordingly.
Expand All @@ -186,6 +201,7 @@ loki.write "local" {
```

### Decompression

This example collects log entries from the compressed files matching `*.gz`
pattern using `local.file_match` component and the decompression configuration
on the `loki.source.file` component.
Expand Down

0 comments on commit 4b72f08

Please sign in to comment.