Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add lines skipped metric to pattern ingesters #14997

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"unsafe"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -211,12 +212,29 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
var linesSkipped *prometheus.CounterVec
if d.metrics != nil {
linesSkipped = d.metrics.LinesSkipped
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state, linesSkipped)
if d.tokens == nil && d.state == nil {
return nil
}

return d.train(d.tokens, d.state, ts)
}

func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
if d.metrics != nil && d.metrics.LinesSkipped != nil {
d.metrics.LinesSkipped.WithLabelValues(TooFewTokens).Inc()
}
return nil
}
if len(tokens) > 80 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

80 is the value adaptive logs uses, so it seems reasonable to do the same. the difference there is they truncate the tokens slice at 80, whereas we drop. my reason for that is the integration between pattern ingester patterns and pattern search in Explore Logs, and searching by a truncated set of tokens won't yield the same result unless we know it's truncated and insert a wildcard at the end of the pattern.

if d.metrics != nil && d.metrics.LinesSkipped != nil {
d.metrics.LinesSkipped.WithLabelValues(TooManyTokens).Inc()
}
return nil
}
if d.metrics != nil {
Expand Down Expand Up @@ -255,7 +273,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state)
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state, d.metrics.LinesSkipped)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
Expand Down
55 changes: 37 additions & 18 deletions pkg/pattern/drain/drain_test.go

Large diffs are not rendered by default.

39 changes: 33 additions & 6 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (

"github.com/buger/jsonparser"
gologfmt "github.com/go-logfmt/logfmt"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
)

type LineTokenizer interface {
Tokenize(line string, tokens []string, state interface{}) ([]string, interface{})
Tokenize(line string, tokens []string, state interface{}, linesDropped *prometheus.CounterVec) ([]string, interface{})
Join(tokens []string, state interface{}) string
Clone(tokens []string, state interface{}) ([]string, interface{})
}
Expand Down Expand Up @@ -56,8 +57,16 @@ func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer {
}
}

func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (p *punctuationTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
if len(line) > p.maxLineLength {
if linesDropped != nil {
linesDropped.WithLabelValues(LineTooLong).Inc()
}
return nil, nil
}

Expand Down Expand Up @@ -131,7 +140,12 @@ func (p *punctuationTokenizer) Clone(tokens []string, state interface{}) ([]stri

type splittingTokenizer struct{}

func (splittingTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (splittingTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
_ *prometheus.CounterVec,
) ([]string, interface{}) {
numEquals := strings.Count(line, "=")
numColons := strings.Count(line, ":")
numSpaces := strings.Count(line, " ")
Expand Down Expand Up @@ -209,8 +223,16 @@ func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer {
}
}

func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) {
func (t *logfmtTokenizer) Tokenize(
line string,
tokens []string,
_ interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
if len(line) > t.maxLineLength {
if linesDropped != nil {
linesDropped.WithLabelValues(LineTooLong).Inc()
}
return nil, nil
}

Expand Down Expand Up @@ -277,7 +299,12 @@ func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []s
}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
func (t *jsonTokenizer) Tokenize(
line string,
tokens []string,
state interface{},
linesDropped *prometheus.CounterVec,
) ([]string, interface{}) {
var found []byte
for _, key := range t.fieldsToTokenize {
msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key)
Expand All @@ -297,7 +324,7 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}
return nil, nil
}

return t.punctuationTokenizer.Tokenize(foundLine, tokens, state)
return t.punctuationTokenizer.Tokenize(foundLine, tokens, state, linesDropped)
}

func (t *jsonTokenizer) Join(tokens []string, state interface{}) string {
Expand Down
10 changes: 5 additions & 5 deletions pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestTokenizer_Tokenize(t *testing.T) {
for _, tt := range tests {
for _, tc := range testCases {
t.Run(tt.name+":"+tc.name, func(t *testing.T) {
got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil)
got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil, nil)
require.Equal(t, tc.want[tt.name], got)
})
}
Expand All @@ -168,7 +168,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) {
for _, tt := range tests {
for _, tc := range testCases {
t.Run(tt.name+":"+tc.name, func(t *testing.T) {
got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil))
got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil, nil))
require.Equal(t, tc.line, got)
})
}
Expand All @@ -184,7 +184,7 @@ func BenchmarkSplittingTokenizer(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tokenizer.Tokenize(tc.line, nil, nil)
tokenizer.Tokenize(tc.line, nil, nil, nil)
}
})
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestLogFmtTokenizer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := tokenizer.Tokenize(tt.line, nil, nil)
got, _ := tokenizer.Tokenize(tt.line, nil, nil, nil)
require.Equal(t, tt.want, got)
})
}
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestJsonTokenizer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, state := tokenizer.Tokenize(tt.line, nil, nil)
got, state := tokenizer.Tokenize(tt.line, nil, nil, nil)
require.Equal(t, tt.want, got)
if len(got) == len(tt.want) && len(tt.want) != 0 {
pattern := tokenizer.Join(got, state)
Expand Down
4 changes: 4 additions & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (
FormatLogfmt = "logfmt"
FormatJSON = "json"
FormatUnknown = "unknown"
TooFewTokens = "too_few_tokens"
TooManyTokens = "too_many_tokens"
LineTooLong = "line_too_long"
)

var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$")
Expand All @@ -31,6 +34,7 @@ type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsPrunedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
LinesSkipped *prometheus.CounterVec
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
}
7 changes: 7 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
patternsDiscardedTotal *prometheus.CounterVec
patternsDetectedTotal *prometheus.CounterVec
linesSkipped *prometheus.CounterVec
tokensPerLine *prometheus.HistogramVec
statePerLine *prometheus.HistogramVec
samples *prometheus.CounterVec
Expand All @@ -34,6 +35,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}, []string{"tenant", "format"}),
linesSkipped: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_dropped_total",
Help: "The total number of log lines skipped for pattern recognition.",
}, []string{"tenant", "reason"}),
tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Expand Down
6 changes: 6 additions & 0 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
Expand All @@ -37,6 +38,10 @@ func newStream(
drainCfg *drain.Config,
drainLimits drain.Limits,
) (*stream, error) {
linesSkipped, err := metrics.linesSkipped.CurryWith(prometheus.Labels{"tenant": instanceID})
if err != nil {
return nil, err
}
return &stream{
fp: fp,
labels: labels,
Expand All @@ -47,6 +52,7 @@ func newStream(
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"),
PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"),
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
LinesSkipped: linesSkipped,
TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),
StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat),
}),
Expand Down
Loading