From 332016ac345895c0a1ab84e101ed4815b02ac8dc Mon Sep 17 00:00:00 2001 From: Ravishankar Date: Fri, 27 Sep 2024 20:08:43 +0530 Subject: [PATCH] feat: Apply patterns line length limit to json message key --- pkg/pattern/drain/drain.go | 9 ++---- pkg/pattern/drain/line_tokenizer.go | 39 ++++++++++++++++++------ pkg/pattern/drain/line_tokenizer_test.go | 35 ++++++++++++++++----- 3 files changed, 59 insertions(+), 24 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index e6706e0517bb0..9e6062432cc6a 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -153,11 +153,11 @@ func New(config *Config, format string, metrics *Metrics) *Drain { var tokenizer LineTokenizer switch format { case FormatJSON: - tokenizer = newJSONTokenizer(config.ParamString) + tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength) case FormatLogfmt: - tokenizer = newLogfmtTokenizer(config.ParamString) + tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength) default: - tokenizer = newPunctuationTokenizer() + tokenizer = newPunctuationTokenizer(config.MaxAllowedLineLength) } d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) { @@ -206,9 +206,6 @@ func (d *Drain) Train(content string, ts int64) *LogCluster { if !d.limiter.Allow() { return nil } - if len(content) > d.config.MaxAllowedLineLength { - return nil - } d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state) return d.train(d.tokens, d.state, ts) } diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index e2acd8228bcfa..87b98afaea6de 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -37,9 +37,10 @@ func (spacesTokenizer) Clone(tokens []string, _ interface{}) ([]string, interfac type punctuationTokenizer struct { includeDelimiters [128]rune excludeDelimiters [128]rune + maxLineLength int } -func newPunctuationTokenizer() *punctuationTokenizer { +func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer { var included [128]rune var excluded [128]rune included['='] = 1 @@ -51,10 +52,15 @@ func newPunctuationTokenizer() *punctuationTokenizer { return &punctuationTokenizer{ includeDelimiters: included, excludeDelimiters: excluded, + maxLineLength: maxLineLength, } } func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { + if len(line) > p.maxLineLength { + return nil, nil + } + if cap(tokens) == 0 { tokens = make([]string, 0, 128) } @@ -190,18 +196,24 @@ func (splittingTokenizer) Clone(tokens []string, state interface{}) ([]string, i } type logfmtTokenizer struct { - dec *logfmt.Decoder - varReplace string + dec *logfmt.Decoder + varReplace string + maxLineLength int } -func newLogfmtTokenizer(varReplace string) *logfmtTokenizer { +func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer { return &logfmtTokenizer{ - dec: logfmt.NewDecoder(nil), - varReplace: varReplace, + dec: logfmt.NewDecoder(nil), + varReplace: varReplace, + maxLineLength: maxLineLength, } } func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) { + if len(line) > t.maxLineLength { + return nil, nil + } + if cap(tokens) == 0 { tokens = make([]string, 0, 64) } @@ -251,11 +263,12 @@ func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, inter type jsonTokenizer struct { *punctuationTokenizer - varReplace string + varReplace string + maxLineLength int } -func newJSONTokenizer(varReplace string) *jsonTokenizer { - return &jsonTokenizer{newPunctuationTokenizer(), varReplace} +func newJSONTokenizer(varReplace string, maxLineLength int) *jsonTokenizer { + return &jsonTokenizer{newPunctuationTokenizer(maxLineLength), varReplace, maxLineLength} } func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { @@ -272,7 +285,13 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{} return nil, nil } - return t.punctuationTokenizer.Tokenize(unsafeString(found), tokens, state) + foundLine := unsafeString(found) + + if len(foundLine) > t.maxLineLength { + return nil, nil + } + + return t.punctuationTokenizer.Tokenize(foundLine, tokens, state) } func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index 200d1a8f510e6..f825a8d86bbc6 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -115,6 +115,14 @@ var testCases = []TestCase{ typeSplitting: {`!@£$%^&*()`}, }, }, + { + name: "line length greater than max allowed length", + line: `09:17:38.033366 ▶ INFO route ops sending to dest https://graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics: service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full 0 1717060658 userid invalid`, + want: map[string][]string{ + typePunctuation: []string(nil), + typeSplitting: {`09:`, `17:`, `38.033366`, `▶`, `INFO`, ``, `route`, `ops`, `sending`, `to`, `dest`, `https:`, `//graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics:`, ``, `service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full`, `0`, `1717060658`, `userid`, `invalid`}, + }, + }, } func TestTokenizer_Tokenize(t *testing.T) { @@ -124,7 +132,7 @@ func TestTokenizer_Tokenize(t *testing.T) { }{ { name: typePunctuation, - tokenizer: newPunctuationTokenizer(), + tokenizer: newPunctuationTokenizer(360), }, { name: typeSplitting, @@ -149,7 +157,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) { }{ { name: typePunctuation, - tokenizer: newPunctuationTokenizer(), + tokenizer: newPunctuationTokenizer(DefaultConfig().MaxAllowedLineLength), }, { name: typeSplitting, @@ -168,7 +176,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) { } func BenchmarkSplittingTokenizer(b *testing.B) { - tokenizer := newPunctuationTokenizer() + tokenizer := newPunctuationTokenizer(DefaultConfig().MaxAllowedLineLength) for _, tt := range testCases { tc := tt @@ -213,9 +221,13 @@ func TestLogFmtTokenizer(t *testing.T) { line: `logger=sqlstore.metrics traceID=c933fefbe893411d3be8e1648d6bcf37 t=2024-07-10T16:00:15.564896897Z level=debug msg="query finished" status=success elapsedtime=1.324305ms error=null`, want: []string{"logger", "sqlstore.metrics", "traceID", "<_>", "t", "<_>", "level", "debug", "msg", "query finished", "status", "success", "elapsedtime", "1.324305ms", "", "", "error", "null"}, }, + { + line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095 ip=127.0.0.1 userid=1234456`, + want: []string(nil), + }, } - tokenizer := newLogfmtTokenizer(param) + tokenizer := newLogfmtTokenizer(param, 250) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -268,7 +280,7 @@ func TestLogFmtTokenizerJoin(t *testing.T) { }, } - tokenizer := newLogfmtTokenizer("") + tokenizer := newLogfmtTokenizer("", DefaultConfig().MaxAllowedLineLength) for _, tt := range tests { t.Run("", func(t *testing.T) { @@ -306,16 +318,23 @@ func TestJsonTokenizer(t *testing.T) { want: []string{"successfully", "discovered", "15", "agent", "IP", "addresses"}, pattern: "<_>successfully discovered 15 agent IP addresses<_>", }, + { + line: `{"msg":{"actor":{"alternateId":"foo@grafana.com","displayName":"Foo bar","id":"dq23","type":"User"},"authenticationContext":{"authenticationStep":0,"externalSessionId":"123d"},"client":{"device":"Computer","geographicalContext":{"city":"Berlin","country":"DE","state":"Land Berlin"},"ipAddress":"0.0.0.0","userAgent":{"browser":"CHROME","os":"Mac OS X","rawUserAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"},"zone":"null"},"debugContext":{"debugData":{"authMethodFirstEnrollment":"123","authMethodFirstType":"foo","authMethodFirstVerificationTime":"2024-07-02T11:28:03.219Z","authMethodSecondEnrollment":"var","authMethodSecondType":"ddd","authMethodSecondVerificationTime":"2024-07-03T06:59:09.151Z","authnRequestId":"1","dtHash":"1","logOnlySecurityData":"{\"risk\":{\"level\":\"LOW\"},\"behaviors\":{\"New Geo-Location\":\"NEGATIVE\",\"New Device\":\"NEGATIVE\",\"New IP\":\"NEGATIVE\",\"New State\":\"NEGATIVE\",\"New Country\":\"NEGATIVE\",\"Velocity\":\"NEGATIVE\",\"New City\":\"NEGATIVE\"}}","requestId":"1","threatSuspected":"false","url":"/foo?"}},"displayMessage":"Evaluation of sign-on policy","eventType":"policy.evaluate_sign_on","legacyEventType":"app.oauth2.token.grant.refresh_token_success","outcome":{"reason":"Sign-on policy evaluation resulted in AUTHENTICATED","result":"ALLOW"},"published":"2024-07-03T09:19:59.973Z","request":{"ipChain":[{"geographicalContext":{"city":"Berlin","country":"Germany","geolocation":{"lat":52.5363,"lon":13.4169},"postalCode":"10435","state":"Land Berlin"},"ip":"95.90.234.241","version":"V4"}]},"securityContext":{"asNumber":3209,"asOrg":"kabel deutschland breitband customer 19","domain":"kabel-deutschland.de","isProxy":false,"isp":"vodafone gmbh"},"severity":"INFO","target":[{"alternateId":"Salesforce.com","detailEntry":{"signOnModeEvaluationResult":"AUTHENTICATED","signOnModeType":"SAML_2_0"},"displayName":"Salesforce.com","id":"0oa5sfmj3hz0mTgoW357","type":"AppInstance"},{"alternateId":"unknown","detailEntry":{"policyRuleFactorMode":"2FA"},"displayName":"Catch-all Rule","id":"1","type":"Rule"}],"transaction":{"detail":{},"id":"1","type":"WEB"},"context":[{"repo":{"id":27826205,"name":"hermanwahyudi/selenium","url":"https://api.github.com/repos/hermanwahyudi/selenium"},"payload":{"push_id":536863976,"size":1,"distinct_size":0,"ref":"refs/heads/master","head":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","before":"20b10e3a605bd177efff62f1130943774ac07bf3","commits":[{"sha":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","author":{"email":"2bb20d8a71fb7adbc1d6239cc9ff4130f26819dc@gmail.com","name":"Herman"},"message":"Update README.md","distinct":false,"url":"https://api.github.com/repos/hermanwahyudi/selenium/commits/1b58dd4c4e14ea9cf5212b981774bd448a266c3c"}]}},{"repo":{"id":27826205,"name":"hermanwahyudi/selenium","url":"https://api.github.com/repos/hermanwahyudi/selenium"},"payload":{"push_id":536863976,"size":1,"distinct_size":0,"ref":"refs/heads/master","head":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","before":"20b10e3a605bd177efff62f1130943774ac07bf3","commits":[{"sha":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","author":{"email":"2bb20d8a71fb7adbc1d6239cc9ff4130f26819dc@gmail.com","name":"Herman"},"message":"Update README.md","distinct":false,"url":"https://api.github.com/repos/hermanwahyudi/selenium/commits/1b58dd4c4e14ea9cf5212b981774bd448a266c3c"}]}}],"uuid":"1","version":"0"},"level":"info","type":"received event","time":"2024-07-03T09:19:59Z"}`, + want: []string(nil), + pattern: "", + }, } - tokenizer := newJSONTokenizer(param) + tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, state := tokenizer.Tokenize(tt.line, nil, nil) require.Equal(t, tt.want, got) - pattern := tokenizer.Join(got, state) - require.Equal(t, tt.pattern, pattern) + if len(got) == len(tt.want) && len(tt.want) != 0 { + pattern := tokenizer.Join(got, state) + require.Equal(t, tt.pattern, pattern) + } }) } }