Skip to content

Commit

Permalink
loki.process: add decolorise stage support (#5416)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr authored Oct 10, 2023
1 parent fa2fc38 commit 0d6ad63
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 46 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Features

- Added a new `stage.decolorize` stage to `loki.process` component which
allows to strip ANSI color codes from the log lines. (@thampiotr)

### Bugfixes

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
Expand Down
39 changes: 39 additions & 0 deletions component/loki/process/stages/decolorize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package stages

// NOTE: This code is copied from Promtail (07cbef92268aecc0f20d1791a6df390c2df5c072) with changes kept to the minimum.

import (
"github.com/grafana/loki/pkg/logql/log"
)

type DecolorizeConfig struct{}

type decolorizeStage struct{}

func newDecolorizeStage(_ DecolorizeConfig) (Stage, error) {
return &decolorizeStage{}, nil
}

// Run implements Stage
func (m *decolorizeStage) Run(in chan Entry) chan Entry {
decolorizer, _ := log.NewDecolorizer()
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
decolorizedLine, _ := decolorizer.Process(
e.Timestamp.Unix(),
[]byte(e.Entry.Line),
nil,
)
e.Entry.Line = string(decolorizedLine)
out <- e
}
}()
return out
}

// Name implements Stage
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}
53 changes: 53 additions & 0 deletions component/loki/process/stages/decolorize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package stages

// NOTE: This code is copied from Promtail (07cbef92268aecc0f20d1791a6df390c2df5c072) with changes kept to the minimum.

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"

util_log "github.com/grafana/loki/pkg/util/log"
)

var testDecolorizePipeline = `
stage.decolorize {}
`

func TestPipeline_Decolorize(t *testing.T) {
t.Parallel()

tests := map[string]struct {
config string
entry string
expectedEntry string
}{
"successfully run pipeline on non-colored text": {
testDecolorizePipeline,
"sample text",
"sample text",
},
"successfully run pipeline on colored text": {
testDecolorizePipeline,
"\033[0;32mgreen\033[0m \033[0;31mred\033[0m",
"green red",
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0]
assert.Equal(t, testData.expectedEntry, out.Line)
})
}
}
2 changes: 1 addition & 1 deletion component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var _ Stage = (*cri)(nil)

// Name implement the Stage interface.
func (c *cri) Name() string {
return "cri"
return StageTypeCRI
}

// implements Stage interface
Expand Down
32 changes: 17 additions & 15 deletions component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@ import (
// We define these as pointers types so we can use reflection to check that
// exactly one is set.
type StageConfig struct {
//TODO(thampiotr): sync these with new stages
CRIConfig *CRIConfig `river:"cri,block,optional"`
DecolorizeConfig *DecolorizeConfig `river:"decolorize,block,optional"`
DockerConfig *DockerConfig `river:"docker,block,optional"`
DropConfig *DropConfig `river:"drop,block,optional"`
GeoIPConfig *GeoIPConfig `river:"geoip,block,optional"`
JSONConfig *JSONConfig `river:"json,block,optional"`
LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"`
LabelsConfig *LabelsConfig `river:"labels,block,optional"`
StructuredMetadata *LabelsConfig `river:"structured_metadata,block,optional"`
LabelAllowConfig *LabelAllowConfig `river:"label_keep,block,optional"`
LabelDropConfig *LabelDropConfig `river:"label_drop,block,optional"`
StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"`
DockerConfig *DockerConfig `river:"docker,block,optional"`
CRIConfig *CRIConfig `river:"cri,block,optional"`
RegexConfig *RegexConfig `river:"regex,block,optional"`
TimestampConfig *TimestampConfig `river:"timestamp,block,optional"`
OutputConfig *OutputConfig `river:"output,block,optional"`
ReplaceConfig *ReplaceConfig `river:"replace,block,optional"`
MultilineConfig *MultilineConfig `river:"multiline,block,optional"`
LabelsConfig *LabelsConfig `river:"labels,block,optional"`
LimitConfig *LimitConfig `river:"limit,block,optional"`
LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"`
MatchConfig *MatchConfig `river:"match,block,optional"`
DropConfig *DropConfig `river:"drop,block,optional"`
MetricsConfig *MetricsConfig `river:"metrics,block,optional"`
MultilineConfig *MultilineConfig `river:"multiline,block,optional"`
OutputConfig *OutputConfig `river:"output,block,optional"`
PackConfig *PackConfig `river:"pack,block,optional"`
RegexConfig *RegexConfig `river:"regex,block,optional"`
ReplaceConfig *ReplaceConfig `river:"replace,block,optional"`
StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"`
StructuredMetadata *LabelsConfig `river:"structured_metadata,block,optional"`
TemplateConfig *TemplateConfig `river:"template,block,optional"`
TenantConfig *TenantConfig `river:"tenant,block,optional"`
LimitConfig *LimitConfig `river:"limit,block,optional"`
MetricsConfig *MetricsConfig `river:"metrics,block,optional"`
GeoIPConfig *GeoIPConfig `river:"geoip,block,optional"`
TimestampConfig *TimestampConfig `river:"timestamp,block,optional"`
}

var rateLimiter *rate.Limiter
Expand Down
54 changes: 32 additions & 22 deletions component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package stages
// new code without being able to slowly review, examine and test them.

import (
"fmt"
"os"
"runtime"
"time"
Expand All @@ -18,29 +19,34 @@ import (

// TODO(@tpaschalis) Let's use this as the list of stages we need to port over.
const (
StageTypeJSON = "json"
StageTypeLogfmt = "logfmt"
StageTypeRegex = "regex"
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeCRI = "cri"
StageTypeDecolorize = "decolorize"
StageTypeDocker = "docker"
StageTypeDrop = "drop"
//TODO(thampiotr): Add support for eventlogmessage stage
StageTypeEventLogMessage = "eventlogmessage"
StageTypeGeoIP = "geoip"
StageTypeJSON = "json"
StageTypeLabel = "labels"
StageTypeLabelAllow = "labelallow"
StageTypeLabelDrop = "labeldrop"
StageTypeLimit = "limit"
StageTypeLogfmt = "logfmt"
StageTypeMatch = "match"
StageTypeMetric = "metrics"
StageTypeMultiline = "multiline"
StageTypeOutput = "output"
StageTypePack = "pack"
StageTypePipeline = "pipeline"
StageTypeRegex = "regex"
StageTypeReplace = "replace"
//TODO(thampiotr): Add support for sampling stage
StageTypeSampling = "sampling"
StageTypeStaticLabels = "static_labels"
StageTypeStructuredMetadata = "structured_metadata"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
StageTypeCRI = "cri"
StageTypeMatch = "match"
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeLimit = "limit"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
StageTypeStaticLabels = "static_labels"
StageTypeGeoIP = "geoip"
StageTypeTimestamp = "timestamp"
)

// Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
Expand Down Expand Up @@ -222,9 +228,13 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
if err != nil {
return nil, err
}

case cfg.DecolorizeConfig != nil:
s, err = newDecolorizeStage(*cfg.DecolorizeConfig)
if err != nil {
return nil, err
}
default:
panic("unreachable; should have decoded into one of the StageConfig fields")
panic(fmt.Sprintf("unreachable; should have decoded into one of the StageConfig fields: %+v", cfg))
}
return s, nil
}
5 changes: 2 additions & 3 deletions converter/internal/promtailconvert/internal/build/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,8 @@ func convertEventLogMessage(diags *diag.Diagnostics) (stages.StageConfig, bool)
return stages.StageConfig{}, false
}

func convertDecolorize(diags *diag.Diagnostics) (stages.StageConfig, bool) {
diags.Add(diag.SeverityLevelError, "pipeline_stages.decolorize is not supported")
return stages.StageConfig{}, false
func convertDecolorize(_ *diag.Diagnostics) (stages.StageConfig, bool) {
return stages.StageConfig{DecolorizeConfig: &stages.DecolorizeConfig{}}, true
}

func convertStaticLabels(cfg interface{}, diags *diag.Diagnostics) (stages.StageConfig, bool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ loki.process "example" {
source = "internet"
db_type = "mmdb"
}

stage.decolorize { }
}

loki.source.file "example" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ scrape_configs:
db: /usr/share/GeoIP/GeoLite2-City.mmdb
source: internet
db_type: mmdb
- decolorize: { }
kubernetes_sd_configs:
- role: pod
kubeconfig_file: /home/toby/.kube/config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
(Error) pipeline_stages.sampling is currently not supported: map[rate:100]
(Error) pipeline_stages.decolorize is not supported
(Error) pipeline_stages.eventlogmessage is not supported
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ scrape_configs:
pipeline_stages:
- sampling:
rate: 100
- decolorize: { }
- eventlogmessage: { }
kubernetes_sd_configs:
- role: pod
Expand Down
33 changes: 30 additions & 3 deletions docs/sources/flow/reference/components/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ loki.process "LABEL" {
The following blocks are supported inside the definition of `loki.process`:

| Hierarchy | Block | Description | Required |
|---------------------------|-------------------------------|------------------------------------------------------| -------- |
|---------------------------|-------------------------------|------------------------------------------------------|----------|
| stage.cri | [stage.cri][] | Configures a pre-defined CRI-format pipeline. | no |
| stage.decolorize | [stage.decolorize][] | Strips ANSI color codes from log lines. | no |
| stage.docker | [stage.docker][] | Configures a pre-defined Docker log format pipeline. | no |
| stage.drop | [stage.drop][] | Configures a `drop` processing stage. | no |
| stage.json | [stage.json][] | Configures a JSON processing stage. | no |
| stage.label_drop | [stage.label_drop][] | Configures a `label_drop` processing stage. | no |
| stage.label_keep | [stage.label_keep][] | Configures a `label_keep` processing stage. | no |
| stage.labels | [stage.labels][] | Configures a `labels` processing stage. | no |
| stage.labels | [stage.labels][] | Configures a `labels` processing stage. | no |
| stage.structured_metadata | [stage.structured_metadata][] | Configures a structured metadata processing stage. | no |
| stage.limit | [stage.limit][] | Configures a `limit` processing stage. | no |
| stage.logfmt | [stage.logfmt][] | Configures a `logfmt` processing stage. | no |
| stage.logfmt | [stage.logfmt][] | Configures a `logfmt` processing stage. | no |
| stage.match | [stage.match][] | Configures a `match` processing stage. | no |
| stage.metrics | [stage.metrics][] | Configures a `metrics` stage. | no |
| stage.multiline | [stage.multiline][] | Configures a `multiline` processing stage. | no |
Expand All @@ -80,6 +81,7 @@ A user can provide any number of these stage blocks nested inside
file.

[stage.cri]: #stagecri-block
[stage.decolorize]: #stagedecolorize-block
[stage.docker]: #stagedocker-block
[stage.drop]: #stagedrop-block
[stage.json]: #stagejson-block
Expand Down Expand Up @@ -141,6 +143,31 @@ stream: stdout
timestamp: 2019-04-30T02:12:41.8443515
```

### stage.decolorize block

The `stage.decolorize` strips ANSI color codes from the log lines, thus making
it easier to parse logs further.

The `stage.decolorize` block does not support any arguments or inner blocks, so
it is always empty.

```river
stage.decolorize {}
```

`stage.decolorize` turns each line having a color code into a non-colored one,
for example:

```
[2022-11-04 22:17:57.811] \033[0;32http\033[0m: GET /_health (0 ms) 204
```

is turned into

```
[2022-11-04 22:17:57.811] http: GET /_health (0 ms) 204
```

### stage.docker block

The `stage.docker` inner block enables a predefined pipeline which reads log lines in
Expand Down

0 comments on commit 0d6ad63

Please sign in to comment.