Skip to content

Commit

Permalink
feat: Apply patterns line length limit to json message key (#14296)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravishankar15 authored Oct 2, 2024
1 parent 1a4436c commit 41fafd8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 24 deletions.
9 changes: 3 additions & 6 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
var tokenizer LineTokenizer
switch format {
case FormatJSON:
tokenizer = newJSONTokenizer(config.ParamString)
tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength)
case FormatLogfmt:
tokenizer = newLogfmtTokenizer(config.ParamString)
tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength)
default:
tokenizer = newPunctuationTokenizer()
tokenizer = newPunctuationTokenizer(config.MaxAllowedLineLength)
}

d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) {
Expand Down Expand Up @@ -206,9 +206,6 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
if len(content) > d.config.MaxAllowedLineLength {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
return d.train(d.tokens, d.state, ts)
}
Expand Down
39 changes: 29 additions & 10 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func (spacesTokenizer) Clone(tokens []string, _ interface{}) ([]string, interfac
type punctuationTokenizer struct {
includeDelimiters [128]rune
excludeDelimiters [128]rune
maxLineLength int
}

func newPunctuationTokenizer() *punctuationTokenizer {
func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer {
var included [128]rune
var excluded [128]rune
included['='] = 1
Expand All @@ -51,10 +52,15 @@ func newPunctuationTokenizer() *punctuationTokenizer {
return &punctuationTokenizer{
includeDelimiters: included,
excludeDelimiters: excluded,
maxLineLength: maxLineLength,
}
}

func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
if len(line) > p.maxLineLength {
return nil, nil
}

if cap(tokens) == 0 {
tokens = make([]string, 0, 128)
}
Expand Down Expand Up @@ -190,18 +196,24 @@ func (splittingTokenizer) Clone(tokens []string, state interface{}) ([]string, i
}

type logfmtTokenizer struct {
dec *logfmt.Decoder
varReplace string
dec *logfmt.Decoder
varReplace string
maxLineLength int
}

func newLogfmtTokenizer(varReplace string) *logfmtTokenizer {
func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer {
return &logfmtTokenizer{
dec: logfmt.NewDecoder(nil),
varReplace: varReplace,
dec: logfmt.NewDecoder(nil),
varReplace: varReplace,
maxLineLength: maxLineLength,
}
}

func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) {
if len(line) > t.maxLineLength {
return nil, nil
}

if cap(tokens) == 0 {
tokens = make([]string, 0, 64)
}
Expand Down Expand Up @@ -251,11 +263,12 @@ func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, inter

type jsonTokenizer struct {
*punctuationTokenizer
varReplace string
varReplace string
maxLineLength int
}

func newJSONTokenizer(varReplace string) *jsonTokenizer {
return &jsonTokenizer{newPunctuationTokenizer(), varReplace}
func newJSONTokenizer(varReplace string, maxLineLength int) *jsonTokenizer {
return &jsonTokenizer{newPunctuationTokenizer(maxLineLength), varReplace, maxLineLength}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
Expand All @@ -272,7 +285,13 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}
return nil, nil
}

return t.punctuationTokenizer.Tokenize(unsafeString(found), tokens, state)
foundLine := unsafeString(found)

if len(foundLine) > t.maxLineLength {
return nil, nil
}

return t.punctuationTokenizer.Tokenize(foundLine, tokens, state)
}

func (t *jsonTokenizer) Join(tokens []string, state interface{}) string {
Expand Down
35 changes: 27 additions & 8 deletions pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ var testCases = []TestCase{
typeSplitting: {`!@£$%^&*()`},
},
},
{
name: "line length greater than max allowed length",
line: `09:17:38.033366 ▶ INFO route ops sending to dest https://graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics: service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full 0 1717060658 userid invalid`,
want: map[string][]string{
typePunctuation: []string(nil),
typeSplitting: {`09:`, `17:`, `38.033366`, `▶`, `INFO`, ``, `route`, `ops`, `sending`, `to`, `dest`, `https:`, `//graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics:`, ``, `service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full`, `0`, `1717060658`, `userid`, `invalid`},
},
},
}

func TestTokenizer_Tokenize(t *testing.T) {
Expand All @@ -124,7 +132,7 @@ func TestTokenizer_Tokenize(t *testing.T) {
}{
{
name: typePunctuation,
tokenizer: newPunctuationTokenizer(),
tokenizer: newPunctuationTokenizer(360),
},
{
name: typeSplitting,
Expand All @@ -149,7 +157,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) {
}{
{
name: typePunctuation,
tokenizer: newPunctuationTokenizer(),
tokenizer: newPunctuationTokenizer(DefaultConfig().MaxAllowedLineLength),
},
{
name: typeSplitting,
Expand All @@ -168,7 +176,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) {
}

func BenchmarkSplittingTokenizer(b *testing.B) {
tokenizer := newPunctuationTokenizer()
tokenizer := newPunctuationTokenizer(DefaultConfig().MaxAllowedLineLength)

for _, tt := range testCases {
tc := tt
Expand Down Expand Up @@ -213,9 +221,13 @@ func TestLogFmtTokenizer(t *testing.T) {
line: `logger=sqlstore.metrics traceID=c933fefbe893411d3be8e1648d6bcf37 t=2024-07-10T16:00:15.564896897Z level=debug msg="query finished" status=success elapsedtime=1.324305ms <REDACTED> error=null`,
want: []string{"logger", "sqlstore.metrics", "traceID", "<_>", "t", "<_>", "level", "debug", "msg", "query finished", "status", "success", "elapsedtime", "1.324305ms", "<REDACTED>", "", "error", "null"},
},
{
line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095 ip=127.0.0.1 userid=1234456`,
want: []string(nil),
},
}

tokenizer := newLogfmtTokenizer(param)
tokenizer := newLogfmtTokenizer(param, 250)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -268,7 +280,7 @@ func TestLogFmtTokenizerJoin(t *testing.T) {
},
}

tokenizer := newLogfmtTokenizer("")
tokenizer := newLogfmtTokenizer("", DefaultConfig().MaxAllowedLineLength)

for _, tt := range tests {
t.Run("", func(t *testing.T) {
Expand Down Expand Up @@ -306,16 +318,23 @@ func TestJsonTokenizer(t *testing.T) {
want: []string{"successfully", "discovered", "15", "agent", "IP", "addresses"},
pattern: "<_>successfully discovered 15 agent IP addresses<_>",
},
{
line: `{"msg":{"actor":{"alternateId":"[email protected]","displayName":"Foo bar","id":"dq23","type":"User"},"authenticationContext":{"authenticationStep":0,"externalSessionId":"123d"},"client":{"device":"Computer","geographicalContext":{"city":"Berlin","country":"DE","state":"Land Berlin"},"ipAddress":"0.0.0.0","userAgent":{"browser":"CHROME","os":"Mac OS X","rawUserAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"},"zone":"null"},"debugContext":{"debugData":{"authMethodFirstEnrollment":"123","authMethodFirstType":"foo","authMethodFirstVerificationTime":"2024-07-02T11:28:03.219Z","authMethodSecondEnrollment":"var","authMethodSecondType":"ddd","authMethodSecondVerificationTime":"2024-07-03T06:59:09.151Z","authnRequestId":"1","dtHash":"1","logOnlySecurityData":"{\"risk\":{\"level\":\"LOW\"},\"behaviors\":{\"New Geo-Location\":\"NEGATIVE\",\"New Device\":\"NEGATIVE\",\"New IP\":\"NEGATIVE\",\"New State\":\"NEGATIVE\",\"New Country\":\"NEGATIVE\",\"Velocity\":\"NEGATIVE\",\"New City\":\"NEGATIVE\"}}","requestId":"1","threatSuspected":"false","url":"/foo?"}},"displayMessage":"Evaluation of sign-on policy","eventType":"policy.evaluate_sign_on","legacyEventType":"app.oauth2.token.grant.refresh_token_success","outcome":{"reason":"Sign-on policy evaluation resulted in AUTHENTICATED","result":"ALLOW"},"published":"2024-07-03T09:19:59.973Z","request":{"ipChain":[{"geographicalContext":{"city":"Berlin","country":"Germany","geolocation":{"lat":52.5363,"lon":13.4169},"postalCode":"10435","state":"Land Berlin"},"ip":"95.90.234.241","version":"V4"}]},"securityContext":{"asNumber":3209,"asOrg":"kabel deutschland breitband customer 19","domain":"kabel-deutschland.de","isProxy":false,"isp":"vodafone gmbh"},"severity":"INFO","target":[{"alternateId":"Salesforce.com","detailEntry":{"signOnModeEvaluationResult":"AUTHENTICATED","signOnModeType":"SAML_2_0"},"displayName":"Salesforce.com","id":"0oa5sfmj3hz0mTgoW357","type":"AppInstance"},{"alternateId":"unknown","detailEntry":{"policyRuleFactorMode":"2FA"},"displayName":"Catch-all Rule","id":"1","type":"Rule"}],"transaction":{"detail":{},"id":"1","type":"WEB"},"context":[{"repo":{"id":27826205,"name":"hermanwahyudi/selenium","url":"https://api.github.com/repos/hermanwahyudi/selenium"},"payload":{"push_id":536863976,"size":1,"distinct_size":0,"ref":"refs/heads/master","head":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","before":"20b10e3a605bd177efff62f1130943774ac07bf3","commits":[{"sha":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","author":{"email":"[email protected]","name":"Herman"},"message":"Update README.md","distinct":false,"url":"https://api.github.com/repos/hermanwahyudi/selenium/commits/1b58dd4c4e14ea9cf5212b981774bd448a266c3c"}]}},{"repo":{"id":27826205,"name":"hermanwahyudi/selenium","url":"https://api.github.com/repos/hermanwahyudi/selenium"},"payload":{"push_id":536863976,"size":1,"distinct_size":0,"ref":"refs/heads/master","head":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","before":"20b10e3a605bd177efff62f1130943774ac07bf3","commits":[{"sha":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","author":{"email":"[email protected]","name":"Herman"},"message":"Update README.md","distinct":false,"url":"https://api.github.com/repos/hermanwahyudi/selenium/commits/1b58dd4c4e14ea9cf5212b981774bd448a266c3c"}]}}],"uuid":"1","version":"0"},"level":"info","type":"received event","time":"2024-07-03T09:19:59Z"}`,
want: []string(nil),
pattern: "",
},
}

tokenizer := newJSONTokenizer(param)
tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, state := tokenizer.Tokenize(tt.line, nil, nil)
require.Equal(t, tt.want, got)
pattern := tokenizer.Join(got, state)
require.Equal(t, tt.pattern, pattern)
if len(got) == len(tt.want) && len(tt.want) != 0 {
pattern := tokenizer.Join(got, state)
require.Equal(t, tt.pattern, pattern)
}
})
}
}

0 comments on commit 41fafd8

Please sign in to comment.