Skip to content

Commit

Permalink
feat: add lines skipped metric to pattern ingesters (#14997)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Nov 22, 2024
1 parent 0f242e7 commit dea5d78
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 31 deletions.
22 changes: 20 additions & 2 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"unsafe"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -211,12 +212,29 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
var linesSkipped *prometheus.CounterVec
if d.metrics != nil {
linesSkipped = d.metrics.LinesSkipped
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state, linesSkipped)
if d.tokens == nil && d.state == nil {
return nil
}

return d.train(d.tokens, d.state, ts)
}

func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
if d.metrics != nil && d.metrics.LinesSkipped != nil {
d.metrics.LinesSkipped.WithLabelValues(TooFewTokens).Inc()
}
return nil
}
if len(tokens) > 80 {
if d.metrics != nil && d.metrics.LinesSkipped != nil {
d.metrics.LinesSkipped.WithLabelValues(TooManyTokens).Inc()
}
return nil
}
if d.metrics != nil {
Expand Down Expand Up @@ -255,7 +273,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state)
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state, d.metrics.LinesSkipped)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
Expand Down
55 changes: 37 additions & 18 deletions pkg/pattern/drain/drain_test.go

Large diffs are not rendered by default.

39 changes: 33 additions & 6 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (

"github.com/buger/jsonparser"
gologfmt "github.com/go-logfmt/logfmt"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
)

type LineTokenizer interface {
Tokenize(line string, tokens []string, state interface{}) ([]string, interface{})
Tokenize(line string, tokens []string, state interface{}, linesDropped *prometheus.CounterVec) ([]string, interface{})
Join(tokens []string, state interface{}) string
Clone(tokens []string, state interface{}) ([]string, interface{})
}
Expand Down Expand Up @@ -56,8 +57,16 @@ func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer {
}
}

func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (p *punctuationTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
if len(line) > p.maxLineLength {
if linesDropped != nil {
linesDropped.WithLabelValues(LineTooLong).Inc()
}
return nil, nil
}

Expand Down Expand Up @@ -131,7 +140,12 @@ func (p *punctuationTokenizer) Clone(tokens []string, state interface{}) ([]stri

type splittingTokenizer struct{}

func (splittingTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (splittingTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
_ *prometheus.CounterVec,
) ([]string, interface{}) {
numEquals := strings.Count(line, "=")
numColons := strings.Count(line, ":")
numSpaces := strings.Count(line, " ")
Expand Down Expand Up @@ -209,8 +223,16 @@ func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer {
}
}

func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) {
func (t *logfmtTokenizer) Tokenize(
line string,
tokens []string,
_ interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
if len(line) > t.maxLineLength {
if linesDropped != nil {
linesDropped.WithLabelValues(LineTooLong).Inc()
}
return nil, nil
}

Expand Down Expand Up @@ -277,7 +299,12 @@ func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []s
}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (t *jsonTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
var found []byte
for _, key := range t.fieldsToTokenize {
msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key)
Expand All @@ -297,7 +324,7 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}
return nil, nil
}

return t.punctuationTokenizer.Tokenize(foundLine, tokens, state)
return t.punctuationTokenizer.Tokenize(foundLine, tokens, state, linesDropped)
}

func (t *jsonTokenizer) Join(tokens []string, state interface{}) string {
Expand Down
10 changes: 5 additions & 5 deletions pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestTokenizer_Tokenize(t *testing.T) {
for _, tt := range tests {
for _, tc := range testCases {
t.Run(tt.name+":"+tc.name, func(t *testing.T) {
got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil)
got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil, nil)
require.Equal(t, tc.want[tt.name], got)
})
}
Expand All @@ -168,7 +168,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) {
for _, tt := range tests {
for _, tc := range testCases {
t.Run(tt.name+":"+tc.name, func(t *testing.T) {
got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil))
got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil, nil))
require.Equal(t, tc.line, got)
})
}
Expand All @@ -184,7 +184,7 @@ func BenchmarkSplittingTokenizer(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tokenizer.Tokenize(tc.line, nil, nil)
tokenizer.Tokenize(tc.line, nil, nil, nil)
}
})
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestLogFmtTokenizer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := tokenizer.Tokenize(tt.line, nil, nil)
got, _ := tokenizer.Tokenize(tt.line, nil, nil, nil)
require.Equal(t, tt.want, got)
})
}
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestJsonTokenizer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, state := tokenizer.Tokenize(tt.line, nil, nil)
got, state := tokenizer.Tokenize(tt.line, nil, nil, nil)
require.Equal(t, tt.want, got)
if len(got) == len(tt.want) && len(tt.want) != 0 {
pattern := tokenizer.Join(got, state)
Expand Down
4 changes: 4 additions & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (
FormatLogfmt = "logfmt"
FormatJSON = "json"
FormatUnknown = "unknown"
TooFewTokens = "too_few_tokens"
TooManyTokens = "too_many_tokens"
LineTooLong = "line_too_long"
)

var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$")
Expand All @@ -31,6 +34,7 @@ type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsPrunedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
LinesSkipped *prometheus.CounterVec
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
}
7 changes: 7 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
patternsDiscardedTotal *prometheus.CounterVec
patternsDetectedTotal *prometheus.CounterVec
linesSkipped *prometheus.CounterVec
tokensPerLine *prometheus.HistogramVec
statePerLine *prometheus.HistogramVec
samples *prometheus.CounterVec
Expand All @@ -34,6 +35,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}, []string{"tenant", "format"}),
linesSkipped: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_dropped_total",
Help: "The total number of log lines skipped for pattern recognition.",
}, []string{"tenant", "reason"}),
tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Expand Down
6 changes: 6 additions & 0 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
Expand All @@ -37,6 +38,10 @@ func newStream(
drainCfg *drain.Config,
drainLimits drain.Limits,
) (*stream, error) {
linesSkipped, err := metrics.linesSkipped.CurryWith(prometheus.Labels{"tenant": instanceID})
if err != nil {
return nil, err
}
return &stream{
fp: fp,
labels: labels,
Expand All @@ -47,6 +52,7 @@ func newStream(
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"),
PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"),
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
LinesSkipped: linesSkipped,
TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),
StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat),
}),
Expand Down

0 comments on commit dea5d78

Please sign in to comment.