Skip to content

Commit

Permalink
feat: add pattern match line filter (#12398)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Apr 2, 2024
1 parent a331746 commit 36c703d
Show file tree
Hide file tree
Showing 15 changed files with 827 additions and 488 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* [11970](https://github.com/grafana/loki/pull/11897) **masslessparticle** Ksonnet: Introduces memory limits to the compactor configuration to avoid unbounded memory usage.
* [12318](https://github.com/grafana/loki/pull/12318) **DylanGuedes** Memcached: Add mTLS support.
* [12392](https://github.com/grafana/loki/pull/12392) **sandeepsukhani** Detect name of service emitting logs and add it as a label.
* [12398](https://github.com/grafana/loki/pull/12398) **kolesnikovae** LogQL: Introduces pattern match filter operators.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
45 changes: 45 additions & 0 deletions pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logql/log/pattern"
"github.com/grafana/loki/pkg/util"
)

Expand All @@ -23,6 +24,8 @@ const (
LineMatchNotEqual
LineMatchRegexp
LineMatchNotRegexp
LineMatchPattern
LineMatchNotPattern
)

func (t LineMatchType) String() string {
Expand All @@ -35,6 +38,10 @@ func (t LineMatchType) String() string {
return "|~"
case LineMatchNotRegexp:
return "!~"
case LineMatchPattern:
return "|>"
case LineMatchNotPattern:
return "!>"
default:
return ""
}
Expand Down Expand Up @@ -553,6 +560,10 @@ func NewFilter(match string, mt LineMatchType) (Filterer, error) {
return newContainsFilter([]byte(match), false), nil
case LineMatchNotEqual:
return NewNotFilter(newContainsFilter([]byte(match), false)), nil
case LineMatchPattern:
return newPatternFilterer([]byte(match), true)
case LineMatchNotPattern:
return newPatternFilterer([]byte(match), false)
default:
return nil, fmt.Errorf("unknown matcher: %v", match)
}
Expand Down Expand Up @@ -783,3 +794,37 @@ func (s *RegexSimplifier) simplifyConcatAlternate(reg *syntax.Regexp, literal []
}
return nil, false
}

type patternFilter struct {
matcher *pattern.Matcher
pattern []byte
}

func newPatternFilterer(p []byte, match bool) (MatcherFilterer, error) {
m, err := pattern.ParseLineFilter(p)
if err != nil {
return nil, err
}
filter := &patternFilter{
matcher: m,
pattern: p,
}
if !match {
return NewNotFilter(filter), nil
}
return filter, nil
}

func (f *patternFilter) Filter(line []byte) bool { return f.matcher.Test(line) }

func (f *patternFilter) Matches(test Checker) bool {
return test.Test(f.pattern, false, false)
}

func (f *patternFilter) ToStage() Stage {
return StageFunc{
process: func(_ int64, line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, f.Filter(line)
},
}
}
2 changes: 1 addition & 1 deletion pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
func (l *LogfmtParser) RequiredLabelNames() []string { return []string{} }

type PatternParser struct {
matcher pattern.Matcher
matcher *pattern.Matcher
names []string
}

Expand Down
33 changes: 24 additions & 9 deletions pkg/logql/log/pattern/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ func (e expr) validate() error {
return ErrNoCapture
}
// Consecutive captures are not allowed.
if err := e.validateNoConsecutiveCaptures(); err != nil {
return err
}
caps := e.captures()
uniq := map[string]struct{}{}
for _, c := range caps {
if _, ok := uniq[c]; ok {
return fmt.Errorf("duplicate capture name (%s): %w", c, ErrInvalidExpr)
}
uniq[c] = struct{}{}
}
return nil
}

func (e expr) validateNoConsecutiveCaptures() error {
for i, n := range e {
if i+1 >= len(e) {
break
Expand All @@ -30,21 +45,21 @@ func (e expr) validate() error {
}
}
}
return nil
}

caps := e.captures()
uniq := map[string]struct{}{}
for _, c := range caps {
if _, ok := uniq[c]; ok {
return fmt.Errorf("duplicate capture name (%s): %w", c, ErrInvalidExpr)
func (e expr) validateNoNamedCaptures() error {
for i, n := range e {
if c, ok := e[i].(capture); ok && !c.isUnnamed() {
return fmt.Errorf("%w: found '%s'", ErrCaptureNotAllowed, n.String())
}
uniq[c] = struct{}{}
}
return nil
}

func (e expr) captures() (captures []string) {
for _, n := range e {
if c, ok := n.(capture); ok && !c.isUnamed() {
if c, ok := n.(capture); ok && !c.isUnnamed() {
captures = append(captures, c.Name())
}
}
Expand All @@ -65,8 +80,8 @@ func (c capture) Name() string {
return string(c)
}

func (c capture) isUnamed() bool {
return string(c) == underscore
func (c capture) isUnnamed() bool {
return len(c) == 1 && c[0] == underscore[0]
}

type literals []byte
Expand Down
6 changes: 5 additions & 1 deletion pkg/logql/log/pattern/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ func init() {
}

func parseExpr(input string) (expr, error) {
return parseExprBytes([]byte(input))
}

func parseExprBytes(input []byte) (expr, error) {
l := newLexer()
l.setData([]byte(input))
l.setData(input)
e := exprNewParser().Parse(l)
if e != 0 || len(l.errs) > 0 {
return nil, l.errs[0]
Expand Down
93 changes: 78 additions & 15 deletions pkg/logql/log/pattern/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,67 @@ import (
)

var (
ErrNoCapture = errors.New("at least one capture is required")
ErrInvalidExpr = errors.New("invalid expression")
ErrNoCapture = errors.New("at least one capture is required")
ErrCaptureNotAllowed = errors.New("named captures are not allowed")
ErrInvalidExpr = errors.New("invalid expression")
)

type Matcher interface {
Matches(in []byte) [][]byte
Names() []string
}

type matcher struct {
type Matcher struct {
e expr

captures [][]byte
names []string
}

func New(in string) (Matcher, error) {
func New(in string) (*Matcher, error) {
e, err := parseExpr(in)
if err != nil {
return nil, err
}
if err := e.validate(); err != nil {
return nil, err
}
return &matcher{
return &Matcher{
e: e,
captures: make([][]byte, 0, e.captureCount()),
names: e.captures(),
}, nil
}

func ParseLineFilter(in []byte) (*Matcher, error) {
if len(in) == 0 {
return new(Matcher), nil
}
e, err := parseExprBytes(in)
if err != nil {
return nil, err
}
if err = e.validateNoConsecutiveCaptures(); err != nil {
return nil, err
}
if err = e.validateNoNamedCaptures(); err != nil {
return nil, err
}
return &Matcher{e: e}, nil
}

func ParseLiterals(in string) ([][]byte, error) {
e, err := parseExpr(in)
if err != nil {
return nil, err
}
lit := make([][]byte, 0, len(e))
for _, n := range e {
if l, ok := n.(literals); ok {
lit = append(lit, l)
}
}
return lit, nil
}

// Matches matches the given line with the provided pattern.
// Matches invalidates the previous returned captures array.
func (m *matcher) Matches(in []byte) [][]byte {
func (m *Matcher) Matches(in []byte) [][]byte {
if len(in) == 0 {
return nil
}
Expand All @@ -62,7 +89,7 @@ func (m *matcher) Matches(in []byte) [][]byte {
// from now we have capture - literals - capture ... (literals)?
for len(expr) != 0 {
if len(expr) == 1 { // we're ending on a capture.
if !(expr[0].(capture)).isUnamed() {
if !(expr[0].(capture)).isUnnamed() {
captures = append(captures, in)
}
return captures
Expand All @@ -73,13 +100,13 @@ func (m *matcher) Matches(in []byte) [][]byte {
i := bytes.Index(in, ls)
if i == -1 {
// if a capture is missed we return up to the end as the capture.
if !capt.isUnamed() {
if !capt.isUnnamed() {
captures = append(captures, in)
}
return captures
}

if capt.isUnamed() {
if capt.isUnnamed() {
in = in[len(ls)+i:]
continue
}
Expand All @@ -90,6 +117,42 @@ func (m *matcher) Matches(in []byte) [][]byte {
return captures
}

func (m *matcher) Names() []string {
func (m *Matcher) Names() []string {
return m.names
}

func (m *Matcher) Test(in []byte) bool {
if len(in) == 0 || len(m.e) == 0 {
// An empty line can only match an empty pattern.
return len(in) == 0 && len(m.e) == 0
}
var off int
for i := 0; i < len(m.e); i++ {
lit, ok := m.e[i].(literals)
if !ok {
continue
}
j := bytes.Index(in[off:], lit)
if j == -1 {
return false
}
if i != 0 && j == 0 {
// This means we either have repetitive literals, or an empty
// capture. Either way, the line does not match the pattern.
return false
}
off += j + len(lit)
}
// If we end up on a literal, we only consider the test successful if
// the remaining input is empty. Otherwise, if we end up on a capture,
// the remainder (the captured text) must not be empty.
//
// For example, "foo bar baz" does not match "<_> bar", but it matches
// "<_> baz" and "foo <_>".
//
// Empty captures are not allowed as well: " bar " does not match
// "<_> bar <_>", but matches "<_>bar<_>".
_, reqRem := m.e[len(m.e)-1].(capture)
hasRem := off != len(in)
return reqRem == hasRem
}
Loading

0 comments on commit 36c703d

Please sign in to comment.