Skip to content

Commit

Permalink
feat: add lines skipped metric to pattern ingesters
Browse files Browse the repository at this point in the history
Reasons for skipping:
- too few tokens
- too many tokens
- line too long
  • Loading branch information
trevorwhitney committed Nov 18, 2024
1 parent 321976e commit 8de23a8
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 6 deletions.
7 changes: 6 additions & 1 deletion pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,17 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state, d.metrics.LinesSkipped)
return d.train(d.tokens, d.state, ts)
}

func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
d.metrics.LinesSkipped.WithLabelValues(TooFewTokens).Inc()
return nil
}
if len(tokens) > 50 {
d.metrics.LinesSkipped.WithLabelValues(TooManyTokens).Inc()
return nil
}
if d.metrics != nil {
Expand Down
22 changes: 17 additions & 5 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (

"github.com/buger/jsonparser"
gologfmt "github.com/go-logfmt/logfmt"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
)

type LineTokenizer interface {
Tokenize(line string, tokens []string, state interface{}) ([]string, interface{})
Tokenize(line string, tokens []string, state interface{}, linesDropped *prometheus.CounterVec) ([]string, interface{})
Join(tokens []string, state interface{}) string
Clone(tokens []string, state interface{}) ([]string, interface{})
}
Expand Down Expand Up @@ -56,7 +57,7 @@ func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer {
}
}

func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}, linesDropped *prometheus.CounterVec) ([]string, interface{}) {
if len(line) > p.maxLineLength {
return nil, nil
}
Expand Down Expand Up @@ -209,8 +210,14 @@ func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer {
}
}

func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) {
func (t *logfmtTokenizer) Tokenize(
line string,
tokens []string,
_ interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
if len(line) > t.maxLineLength {
linesDropped.WithLabelValues(LineTooLong).Inc()
return nil, nil
}

Expand Down Expand Up @@ -277,7 +284,12 @@ func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []s
}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (t *jsonTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
var found []byte
for _, key := range t.fieldsToTokenize {
msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key)
Expand All @@ -297,7 +309,7 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}
return nil, nil
}

return t.punctuationTokenizer.Tokenize(foundLine, tokens, state)
return t.punctuationTokenizer.Tokenize(foundLine, tokens, state, linesDropped)
}

func (t *jsonTokenizer) Join(tokens []string, state interface{}) string {
Expand Down
4 changes: 4 additions & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (
FormatLogfmt = "logfmt"
FormatJSON = "json"
FormatUnknown = "unknown"
TooFewTokens = "too_few_tokens"
TooManyTokens = "too_many_tokens"
LineTooLong = "line_too_long"
)

var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$")
Expand All @@ -31,6 +34,7 @@ type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsPrunedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
LinesSkipped *prometheus.CounterVec
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
}
7 changes: 7 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
patternsDiscardedTotal *prometheus.CounterVec
patternsDetectedTotal *prometheus.CounterVec
linesSkipped *prometheus.CounterVec
tokensPerLine *prometheus.HistogramVec
statePerLine *prometheus.HistogramVec
samples *prometheus.CounterVec
Expand All @@ -34,6 +35,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}, []string{"tenant", "format"}),
linesSkipped: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_dropped_total",
Help: "The total number of log lines skipped for pattern recognition.",
}, []string{"tenant", "reason"}),
tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Expand Down
6 changes: 6 additions & 0 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
Expand All @@ -37,6 +38,10 @@ func newStream(
drainCfg *drain.Config,
drainLimits drain.Limits,
) (*stream, error) {
linesSkipped, err := metrics.linesSkipped.CurryWith(prometheus.Labels{"tenant": instanceID})
if err != nil {
return nil, err
}
return &stream{
fp: fp,
labels: labels,
Expand All @@ -47,6 +52,7 @@ func newStream(
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"),
PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"),
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
LinesSkipped: linesSkipped,
TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),
StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat),
}),
Expand Down

0 comments on commit 8de23a8

Please sign in to comment.