Skip to content

Commit

Permalink
Sync cri stage in loki.process with promtail. (#5057)
Browse files Browse the repository at this point in the history
* Sync cri stage in loki.process with promtail.

* Clarify partial line settings in the docs

* converter: properly convert new settings in cri stage

This commit properly converts the new settings in the CRI stage, which
previously weren't available.

This also changes the MaxPartialLineSize field to the underlying uint64
type rather than a flagext.ByteSize:

* flagext is a Loki package, and we're trying to drop our Loki
  dependency.

* flagext.ByteSize is a type represented by a uint64, but River can't
  directly decode a uint64 into a flagext.ByteSize because a uint64 is
  not directly assignable to it in Go. (It needs to implement
  encoding.TextUnmarshaler for this to work properly).

* Promtail does not document that it's possible to pass a string for
  max_partial_line_size, so using flagext.ByteSize is unnecessary from
  the documentation's perspective.

---------

Co-authored-by: Robert Fratto <[email protected]>
  • Loading branch information
ptodev and rfratto authored Sep 20, 2023
1 parent 4b72f08 commit d4ec620
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Main (unreleased)

- Add optional `nil_to_zero` config flag for `YACE` which can be set in the `static`, `discovery`, or `metric` config blocks. (@berler)

- The `cri` stage in `loki.process` can now be configured to limit line size.

### Enhancements

- Clustering: allow advertise interfaces to be configurable, with the possibility to select all available interfaces. (@wildum)
Expand Down
80 changes: 64 additions & 16 deletions component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ package stages
// new code without being able to slowly review, examine and test them.

import (
"fmt"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/river"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

const (
RFC3339Nano = "RFC3339Nano"
MaxPartialLinesSize = 100 // MaxPartialLinesSize is the max buffer size to hold partial lines when parsing the CRI stage format.lines.
RFC3339Nano = "RFC3339Nano"
)

// DockerConfig is an empty struct that is used to enable a pre-defined
Expand All @@ -24,7 +25,37 @@ type DockerConfig struct{}

// CRIConfig is an empty struct that is used to enable a pre-defined pipeline
// for decoding entries that are using the CRI logging format.
type CRIConfig struct{}
type CRIConfig struct {
MaxPartialLines int `river:"max_partial_lines,attr,optional"`
MaxPartialLineSize uint64 `river:"max_partial_line_size,attr,optional"`
MaxPartialLineSizeTruncate bool `river:"max_partial_line_size_truncate,attr,optional"`
}

var (
_ river.Defaulter = (*CRIConfig)(nil)
_ river.Validator = (*CRIConfig)(nil)
)

// DefaultCRIConfig contains the default CRIConfig values.
var DefaultCRIConfig = CRIConfig{
MaxPartialLines: 100,
MaxPartialLineSize: 0,
MaxPartialLineSizeTruncate: false,
}

// SetToDefault implements river.Defaulter.
func (args *CRIConfig) SetToDefault() {
*args = DefaultCRIConfig
}

// Validate implements river.Validator.
func (args *CRIConfig) Validate() error {
if args.MaxPartialLines <= 0 {
return fmt.Errorf("max_partial_lines must be greater than 0")
}

return nil
}

// NewDocker creates a predefined pipeline for parsing entries in the Docker
// json log format.
Expand Down Expand Up @@ -61,17 +92,19 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro

type cri struct {
// bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`)
partialLines map[model.Fingerprint]Entry
maxPartialLines int
base *Pipeline
partialLines map[model.Fingerprint]Entry
cfg CRIConfig
base *Pipeline
}

var _ Stage = (*cri)(nil)

// Name implement the Stage interface.
func (c *cri) Name() string {
return "cri"
}

// Run implements Stage interface
// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)

Expand All @@ -80,25 +113,30 @@ func (c *cri) Run(entry chan Entry) chan Entry {

// We received partial-line (tag: "P")
if e.Extracted["flags"] == "P" {
if len(c.partialLines) > c.maxPartialLines {
if len(c.partialLines) >= c.cfg.MaxPartialLines {
// Merge existing partialLines
entries := make([]Entry, 0, len(c.partialLines))
for _, v := range c.partialLines {
entries = append(entries, v)
}

level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize)
level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", c.cfg.MaxPartialLines)

c.partialLines = make(map[model.Fingerprint]Entry)
c.partialLines = make(map[model.Fingerprint]Entry, c.cfg.MaxPartialLines)
c.ensureTruncateIfRequired(&e)
c.partialLines[fingerprint] = e

return entries, false
}

prev, ok := c.partialLines[fingerprint]
if ok {
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
var builder strings.Builder
builder.WriteString(prev.Line)
builder.WriteString(e.Line)
e.Line = builder.String()
}
c.ensureTruncateIfRequired(&e)
c.partialLines[fingerprint] = e

return []Entry{e}, true // it's a partial-line so skip it.
Expand All @@ -109,7 +147,11 @@ func (c *cri) Run(entry chan Entry) chan Entry {
// 2. Else just return the full line.
prev, ok := c.partialLines[fingerprint]
if ok {
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
var builder strings.Builder
builder.WriteString(prev.Line)
builder.WriteString(e.Line)
e.Line = builder.String()
c.ensureTruncateIfRequired(&e)
delete(c.partialLines, fingerprint)
}
return []Entry{e}, false
Expand All @@ -118,9 +160,15 @@ func (c *cri) Run(entry chan Entry) chan Entry {
return in
}

func (c *cri) ensureTruncateIfRequired(e *Entry) {
if c.cfg.MaxPartialLineSizeTruncate && len(e.Line) > int(c.cfg.MaxPartialLineSize) {
e.Line = e.Line[:c.cfg.MaxPartialLineSize]
}
}

// NewCRI creates a predefined pipeline for parsing entries in the CRI log
// format.
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer) (Stage, error) {
base := []StageConfig{
{
RegexConfig: &RegexConfig{
Expand Down Expand Up @@ -156,9 +204,9 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
}

c := cri{
maxPartialLines: MaxPartialLinesSize,
base: p,
cfg: config,
base: p,
}
c.partialLines = make(map[model.Fingerprint]Entry)
c.partialLines = make(map[model.Fingerprint]Entry, c.cfg.MaxPartialLines)
return &c, nil
}
53 changes: 37 additions & 16 deletions component/loki/process/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,27 @@ type testEntry struct {

func TestCRI_tags(t *testing.T) {
cases := []struct {
name string
lines []string
expected []string
maxPartialLines int
entries []testEntry
err error
name string
lines []string
expected []string
maxPartialLines int
maxPartialLineSize uint64
maxPartialLineSizeTruncate bool
entries []testEntry
err error
}{
{
name: "tag F",
name: "tag F",
maxPartialLines: 100,
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}},
},
expected: []string{"some full line", "log"},
},
{
name: "tag P multi-stream",
name: "tag P multi-stream",
maxPartialLines: 100,
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar2"}},
Expand All @@ -141,7 +145,7 @@ func TestCRI_tags(t *testing.T) {
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"label1": "val3"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F yet an another full log", labels: model.LabelSet{"label1": "val4"}},
},
maxPartialLines: 2,
maxPartialLines: 3,
expected: []string{
"partial line 1 partial line 3 ",
"partial line 2 ",
Expand All @@ -167,20 +171,36 @@ func TestCRI_tags(t *testing.T) {
"another full log",
},
},
{
name: "tag P multi-stream with truncation",
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial", labels: model.LabelSet{"foo": "bar2"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F full", labels: model.LabelSet{"foo": "bar2"}},
},
maxPartialLines: 100,
maxPartialLineSizeTruncate: true,
maxPartialLineSize: 11,
expected: []string{
"partial lin",
"partialfull",
},
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer)
cfg := CRIConfig{
MaxPartialLines: tt.maxPartialLines,
MaxPartialLineSize: tt.maxPartialLineSize,
MaxPartialLineSizeTruncate: tt.maxPartialLineSizeTruncate,
}
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
require.NoError(t, err)

got := make([]string, 0)

// tweak `maxPartialLines`
if tt.maxPartialLines != 0 {
p.(*cri).maxPartialLines = tt.maxPartialLines
}

for _, entry := range tt.entries {
out := processEntries(p, newEntry(nil, entry.labels, entry.line, time.Now()))
if len(out) > 0 {
Expand Down Expand Up @@ -258,7 +278,8 @@ func TestNewCri(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer)
cfg := DefaultCRIConfig
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
return nil, err
}
case cfg.CRIConfig != nil:
s, err = NewCRI(logger, registerer)
s, err = NewCRI(logger, *cfg.CRIConfig, registerer)
if err != nil {
return nil, err
}
Expand Down
22 changes: 19 additions & 3 deletions converter/internal/promtailconvert/internal/build/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func convertStage(st interface{}, diags *diag.Diagnostics) (stages.StageConfig,
case promtailstages.StageTypeDocker:
return convertDocker()
case promtailstages.StageTypeCRI:
return convertCRI()
return convertCRI(iCfg, diags)
case promtailstages.StageTypeMatch:
return convertMatch(iCfg, diags)
case promtailstages.StageTypeTemplate:
Expand Down Expand Up @@ -352,8 +352,24 @@ func convertMatch(cfg interface{}, diags *diag.Diagnostics) (stages.StageConfig,
}}, true
}

func convertCRI() (stages.StageConfig, bool) {
return stages.StageConfig{CRIConfig: &stages.CRIConfig{}}, true
func convertCRI(cfg interface{}, diags *diag.Diagnostics) (stages.StageConfig, bool) {
pCRI := &promtailstages.CriConfig{}
if err := mapstructure.Decode(cfg, pCRI); err != nil {
addInvalidStageError(diags, cfg, err)
return stages.StageConfig{}, false
}

// Copied logic from Promtail: if MaxPartialLines is 0, default it to
// MaxPartialLinesSize.
if pCRI.MaxPartialLines == 0 {
pCRI.MaxPartialLines = promtailstages.MaxPartialLinesSize
}

return stages.StageConfig{CRIConfig: &stages.CRIConfig{
MaxPartialLines: pCRI.MaxPartialLines,
MaxPartialLineSize: uint64(pCRI.MaxPartialLineSize),
MaxPartialLineSizeTruncate: pCRI.MaxPartialLineSizeTruncate,
}}, true
}

func convertDocker() (stages.StageConfig, bool) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
discovery.kubernetes "example" {
role = "pod"
kubeconfig_file = "/home/toby/.kube/config"
}

local.file_match "example" {
path_targets = discovery.kubernetes.example.targets
}

loki.process "example" {
forward_to = [loki.write.default.receiver]

stage.cri { }
}

loki.source.file "example" {
targets = local.file_match.example.targets
forward_to = [loki.process.example.receiver]
}

loki.write "default" {
endpoint {
url = "http://localhost/loki/api/v1/push"
}
external_labels = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
clients:
- url: http://localhost/loki/api/v1/push
scrape_configs:
- job_name: example
pipeline_stages:
- cri: { }
kubernetes_sd_configs:
- role: pod
kubeconfig_file: /home/toby/.kube/config

tracing: { enabled: false }
server: { register_instrumentation: false }
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ loki.process "example" {

stage.docker { }

stage.cri { }
stage.cri {
max_partial_lines = 223
max_partial_line_size = 26214
max_partial_line_size_truncate = true
}

stage.label_drop {
values = ["foo", "bar", "baz"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ scrape_configs:
- job_name: example
pipeline_stages:
- docker: { }
- cri: { }
- cri:
max_partial_lines: 223
max_partial_line_size: 26214
max_partial_line_size_truncate: true
- labeldrop:
- foo
- bar
Expand Down
Loading

0 comments on commit d4ec620

Please sign in to comment.