From d4ec620d5ba8e01affeaabc434fe993d40bc99cc Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 20 Sep 2023 10:54:19 +0100 Subject: [PATCH] Sync cri stage in loki.process with promtail. (#5057) * Sync cri stage in loki.process with promtail. * Clarify partial line settings in the docs * converter: properly convert new settings in cri stage This commit properly converts the new settings in the CRI stage, which previously weren't available. This also changes the MaxPartialLineSize field to the underlying uint64 type rather than a flagext.ByteSize: * flagext is a Loki package, and we're trying to drop our Loki dependency. * flagext.ByteSize is a type represented by a uint64, but River can't directly decode a uint64 into a flagext.ByteSize because a uint64 is not directly assignable to it in Go. (It needs to implement encoding.TextUnmarshaler for this to work properly). * Promtail does not document that it's possible to pass a string for max_partial_line_size, so using flagext.ByteSize is unnecessary from the documentation's perspective. --------- Co-authored-by: Robert Fratto --- CHANGELOG.md | 2 + component/loki/process/stages/extensions.go | 80 +++++++++++++++---- .../loki/process/stages/extensions_test.go | 53 ++++++++---- component/loki/process/stages/stage.go | 2 +- .../promtailconvert/internal/build/stages.go | 22 ++++- .../testdata/pipeline_stages_cri_empty.river | 26 ++++++ .../testdata/pipeline_stages_cri_empty.yaml | 12 +++ .../testdata/pipeline_stages_part2.river | 6 +- .../testdata/pipeline_stages_part2.yaml | 5 +- .../flow/reference/components/loki.process.md | 20 +++-- 10 files changed, 184 insertions(+), 44 deletions(-) create mode 100644 converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.river create mode 100644 converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index e54e549a9fdc..af001630dc05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ Main (unreleased) - Add optional `nil_to_zero` config flag for `YACE` which can be set in the `static`, `discovery`, or `metric` config blocks. (@berler) +- The `cri` stage in `loki.process` can now be configured to limit line size. + ### Enhancements - Clustering: allow advertise interfaces to be configurable, with the possibility to select all available interfaces. (@wildum) diff --git a/component/loki/process/stages/extensions.go b/component/loki/process/stages/extensions.go index 30480333612f..87435e67fb92 100644 --- a/component/loki/process/stages/extensions.go +++ b/component/loki/process/stages/extensions.go @@ -5,17 +5,18 @@ package stages // new code without being able to slowly review, examine and test them. import ( + "fmt" "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/river" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) const ( - RFC3339Nano = "RFC3339Nano" - MaxPartialLinesSize = 100 // MaxPartialLinesSize is the max buffer size to hold partial lines when parsing the CRI stage format.lines. + RFC3339Nano = "RFC3339Nano" ) // DockerConfig is an empty struct that is used to enable a pre-defined @@ -24,7 +25,37 @@ type DockerConfig struct{} // CRIConfig is an empty struct that is used to enable a pre-defined pipeline // for decoding entries that are using the CRI logging format. -type CRIConfig struct{} +type CRIConfig struct { + MaxPartialLines int `river:"max_partial_lines,attr,optional"` + MaxPartialLineSize uint64 `river:"max_partial_line_size,attr,optional"` + MaxPartialLineSizeTruncate bool `river:"max_partial_line_size_truncate,attr,optional"` +} + +var ( + _ river.Defaulter = (*CRIConfig)(nil) + _ river.Validator = (*CRIConfig)(nil) +) + +// DefaultCRIConfig contains the default CRIConfig values. +var DefaultCRIConfig = CRIConfig{ + MaxPartialLines: 100, + MaxPartialLineSize: 0, + MaxPartialLineSizeTruncate: false, +} + +// SetToDefault implements river.Defaulter. +func (args *CRIConfig) SetToDefault() { + *args = DefaultCRIConfig +} + +// Validate implements river.Validator. +func (args *CRIConfig) Validate() error { + if args.MaxPartialLines <= 0 { + return fmt.Errorf("max_partial_lines must be greater than 0") + } + + return nil +} // NewDocker creates a predefined pipeline for parsing entries in the Docker // json log format. @@ -61,17 +92,19 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro type cri struct { // bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`) - partialLines map[model.Fingerprint]Entry - maxPartialLines int - base *Pipeline + partialLines map[model.Fingerprint]Entry + cfg CRIConfig + base *Pipeline } +var _ Stage = (*cri)(nil) + // Name implement the Stage interface. func (c *cri) Name() string { return "cri" } -// Run implements Stage interface +// implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) @@ -80,16 +113,17 @@ func (c *cri) Run(entry chan Entry) chan Entry { // We received partial-line (tag: "P") if e.Extracted["flags"] == "P" { - if len(c.partialLines) > c.maxPartialLines { + if len(c.partialLines) >= c.cfg.MaxPartialLines { // Merge existing partialLines entries := make([]Entry, 0, len(c.partialLines)) for _, v := range c.partialLines { entries = append(entries, v) } - level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize) + level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", c.cfg.MaxPartialLines) - c.partialLines = make(map[model.Fingerprint]Entry) + c.partialLines = make(map[model.Fingerprint]Entry, c.cfg.MaxPartialLines) + c.ensureTruncateIfRequired(&e) c.partialLines[fingerprint] = e return entries, false @@ -97,8 +131,12 @@ func (c *cri) Run(entry chan Entry) chan Entry { prev, ok := c.partialLines[fingerprint] if ok { - e.Line = strings.Join([]string{prev.Line, e.Line}, "") + var builder strings.Builder + builder.WriteString(prev.Line) + builder.WriteString(e.Line) + e.Line = builder.String() } + c.ensureTruncateIfRequired(&e) c.partialLines[fingerprint] = e return []Entry{e}, true // it's a partial-line so skip it. @@ -109,7 +147,11 @@ func (c *cri) Run(entry chan Entry) chan Entry { // 2. Else just return the full line. prev, ok := c.partialLines[fingerprint] if ok { - e.Line = strings.Join([]string{prev.Line, e.Line}, "") + var builder strings.Builder + builder.WriteString(prev.Line) + builder.WriteString(e.Line) + e.Line = builder.String() + c.ensureTruncateIfRequired(&e) delete(c.partialLines, fingerprint) } return []Entry{e}, false @@ -118,9 +160,15 @@ func (c *cri) Run(entry chan Entry) chan Entry { return in } +func (c *cri) ensureTruncateIfRequired(e *Entry) { + if c.cfg.MaxPartialLineSizeTruncate && len(e.Line) > int(c.cfg.MaxPartialLineSize) { + e.Line = e.Line[:c.cfg.MaxPartialLineSize] + } +} + // NewCRI creates a predefined pipeline for parsing entries in the CRI log // format. -func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { +func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer) (Stage, error) { base := []StageConfig{ { RegexConfig: &RegexConfig{ @@ -156,9 +204,9 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) } c := cri{ - maxPartialLines: MaxPartialLinesSize, - base: p, + cfg: config, + base: p, } - c.partialLines = make(map[model.Fingerprint]Entry) + c.partialLines = make(map[model.Fingerprint]Entry, c.cfg.MaxPartialLines) return &c, nil } diff --git a/component/loki/process/stages/extensions_test.go b/component/loki/process/stages/extensions_test.go index 9eaa61717489..5cf71add03f2 100644 --- a/component/loki/process/stages/extensions_test.go +++ b/component/loki/process/stages/extensions_test.go @@ -100,15 +100,18 @@ type testEntry struct { func TestCRI_tags(t *testing.T) { cases := []struct { - name string - lines []string - expected []string - maxPartialLines int - entries []testEntry - err error + name string + lines []string + expected []string + maxPartialLines int + maxPartialLineSize uint64 + maxPartialLineSizeTruncate bool + entries []testEntry + err error }{ { - name: "tag F", + name: "tag F", + maxPartialLines: 100, entries: []testEntry{ {line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}}, {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}}, @@ -116,7 +119,8 @@ func TestCRI_tags(t *testing.T) { expected: []string{"some full line", "log"}, }, { - name: "tag P multi-stream", + name: "tag P multi-stream", + maxPartialLines: 100, entries: []testEntry{ {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}}, {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar2"}}, @@ -141,7 +145,7 @@ func TestCRI_tags(t *testing.T) { {line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"label1": "val3"}}, {line: "2019-05-07T18:57:55.904275087+00:00 stdout F yet an another full log", labels: model.LabelSet{"label1": "val4"}}, }, - maxPartialLines: 2, + maxPartialLines: 3, expected: []string{ "partial line 1 partial line 3 ", "partial line 2 ", @@ -167,20 +171,36 @@ func TestCRI_tags(t *testing.T) { "another full log", }, }, + { + name: "tag P multi-stream with truncation", + entries: []testEntry{ + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial", labels: model.LabelSet{"foo": "bar2"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F full", labels: model.LabelSet{"foo": "bar2"}}, + }, + maxPartialLines: 100, + maxPartialLineSizeTruncate: true, + maxPartialLineSize: 11, + expected: []string{ + "partial lin", + "partialfull", + }, + }, } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer) + cfg := CRIConfig{ + MaxPartialLines: tt.maxPartialLines, + MaxPartialLineSize: tt.maxPartialLineSize, + MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate, + } + p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer) require.NoError(t, err) got := make([]string, 0) - // tweak `maxPartialLines` - if tt.maxPartialLines != 0 { - p.(*cri).maxPartialLines = tt.maxPartialLines - } - for _, entry := range tt.entries { out := processEntries(p, newEntry(nil, entry.labels, entry.line, time.Now())) if len(out) > 0 { @@ -258,7 +278,8 @@ func TestNewCri(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer) + cfg := DefaultCRIConfig + p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer) if err != nil { t.Fatalf("failed to create CRI parser: %s", err) } diff --git a/component/loki/process/stages/stage.go b/component/loki/process/stages/stage.go index 0173ef0a36a3..94300eb34c32 100644 --- a/component/loki/process/stages/stage.go +++ b/component/loki/process/stages/stage.go @@ -121,7 +121,7 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh return nil, err } case cfg.CRIConfig != nil: - s, err = NewCRI(logger, registerer) + s, err = NewCRI(logger, *cfg.CRIConfig, registerer) if err != nil { return nil, err } diff --git a/converter/internal/promtailconvert/internal/build/stages.go b/converter/internal/promtailconvert/internal/build/stages.go index 30477e76e933..79ee12a1e55c 100644 --- a/converter/internal/promtailconvert/internal/build/stages.go +++ b/converter/internal/promtailconvert/internal/build/stages.go @@ -59,7 +59,7 @@ func convertStage(st interface{}, diags *diag.Diagnostics) (stages.StageConfig, case promtailstages.StageTypeDocker: return convertDocker() case promtailstages.StageTypeCRI: - return convertCRI() + return convertCRI(iCfg, diags) case promtailstages.StageTypeMatch: return convertMatch(iCfg, diags) case promtailstages.StageTypeTemplate: @@ -352,8 +352,24 @@ func convertMatch(cfg interface{}, diags *diag.Diagnostics) (stages.StageConfig, }}, true } -func convertCRI() (stages.StageConfig, bool) { - return stages.StageConfig{CRIConfig: &stages.CRIConfig{}}, true +func convertCRI(cfg interface{}, diags *diag.Diagnostics) (stages.StageConfig, bool) { + pCRI := &promtailstages.CriConfig{} + if err := mapstructure.Decode(cfg, pCRI); err != nil { + addInvalidStageError(diags, cfg, err) + return stages.StageConfig{}, false + } + + // Copied logic from Promtail: if MaxPartialLines is 0, default it to + // MaxPartialLinesSize. + if pCRI.MaxPartialLines == 0 { + pCRI.MaxPartialLines = promtailstages.MaxPartialLinesSize + } + + return stages.StageConfig{CRIConfig: &stages.CRIConfig{ + MaxPartialLines: pCRI.MaxPartialLines, + MaxPartialLineSize: uint64(pCRI.MaxPartialLineSize), + MaxPartialLineSizeTruncate: pCRI.MaxPartialLineSizeTruncate, + }}, true } func convertDocker() (stages.StageConfig, bool) { diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.river b/converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.river new file mode 100644 index 000000000000..a8890b1a3908 --- /dev/null +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.river @@ -0,0 +1,26 @@ +discovery.kubernetes "example" { + role = "pod" + kubeconfig_file = "/home/toby/.kube/config" +} + +local.file_match "example" { + path_targets = discovery.kubernetes.example.targets +} + +loki.process "example" { + forward_to = [loki.write.default.receiver] + + stage.cri { } +} + +loki.source.file "example" { + targets = local.file_match.example.targets + forward_to = [loki.process.example.receiver] +} + +loki.write "default" { + endpoint { + url = "http://localhost/loki/api/v1/push" + } + external_labels = {} +} diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.yaml b/converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.yaml new file mode 100644 index 000000000000..402af8c349a9 --- /dev/null +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_cri_empty.yaml @@ -0,0 +1,12 @@ +clients: + - url: http://localhost/loki/api/v1/push +scrape_configs: + - job_name: example + pipeline_stages: + - cri: { } + kubernetes_sd_configs: + - role: pod + kubeconfig_file: /home/toby/.kube/config + +tracing: { enabled: false } +server: { register_instrumentation: false } \ No newline at end of file diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river index 638e5e6b90b3..9b5a68542244 100644 --- a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river @@ -12,7 +12,11 @@ loki.process "example" { stage.docker { } - stage.cri { } + stage.cri { + max_partial_lines = 223 + max_partial_line_size = 26214 + max_partial_line_size_truncate = true + } stage.label_drop { values = ["foo", "bar", "baz"] diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml index ad5e695846f8..2093da78c425 100644 --- a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml @@ -4,7 +4,10 @@ scrape_configs: - job_name: example pipeline_stages: - docker: { } - - cri: { } + - cri: + max_partial_lines: 223 + max_partial_line_size: 26214 + max_partial_line_size_truncate: true - labeldrop: - foo - bar diff --git a/docs/sources/flow/reference/components/loki.process.md b/docs/sources/flow/reference/components/loki.process.md index c559f3739559..2d458fc1adb4 100644 --- a/docs/sources/flow/reference/components/loki.process.md +++ b/docs/sources/flow/reference/components/loki.process.md @@ -107,8 +107,16 @@ file. The `stage.cri` inner block enables a predefined pipeline which reads log lines using the CRI logging format. -The `stage.cri` block does not support any arguments or inner blocks, so it is always -empty. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| -------------------------------- | ---------- | -------------------------------------------------------------------- | -------------- | -------- | +| `max_partial_lines` | `number` | Maximum number of partial lines to hold in memory. | `100` | no | +| `max_partial_line_size` | `number` | Maximum number of characters which a partial line can have. | `0` | no | +| `max_partial_line_size_truncate` | `bool` | Truncate partial lines that are longer than `max_partial_line_size`. | `false` | no | + +`max_partial_line_size` is only taken into account if +`max_partial_line_size_truncate` is set to `true`. ```river stage.cri {} @@ -369,11 +377,11 @@ The following arguments are supported: | Name | Type | Description | Default | Required | | --------------------- | -------- | -------------------------------------------------------------------------------- | ------- | -------- | -| `rate` | `int` | The maximum rate of lines per second that the stage forwards. | | yes | -| `burst` | `int` | The cap in the quantity of burst lines that the stage forwards. | | yes | +| `rate` | `number` | The maximum rate of lines per second that the stage forwards. | | yes | +| `burst` | `number` | The maximum number of burst lines that the stage forwards. | | yes | | `by_label_name` | `string` | The label to use when rate-limiting on a label name. | `""` | no | | `drop` | `bool` | Whether to discard or backpressure lines that exceed the rate limit. | `false` | no | -| `max_distinct_labels` | `int` | The number of unique values to keep track of when rate-limiting `by_label_name`. | `10000` | no | +| `max_distinct_labels` | `number` | The number of unique values to keep track of when rate-limiting `by_label_name`. | `10000` | no | The rate limiting is implemented as a "token bucket" of size `burst`, initially full and refilled at `rate` tokens per second. Each received log entry consumes one token from the bucket. When `drop` is set to true, incoming entries @@ -740,7 +748,7 @@ The following arguments are supported: | --------------- | ---------- | -------------------------------------------------- | ------- | -------- | | `firstline` | `string` | Name from extracted data to use for the log entry. | | yes | | `max_wait_time` | `duration` | The maximum time to wait for a multiline block. | `"3s"` | no | -| `max_lines` | `int` | The maximum number of lines a block can have. | `128` | no | +| `max_lines` | `number` | The maximum number of lines a block can have. | `128` | no | A new block is identified by the RE2 regular expression passed in `firstline`.