From 666ddcf6892786e4ab5a47e32b10531599944cad Mon Sep 17 00:00:00 2001 From: Kaviraj Kanagaraj Date: Fri, 27 Oct 2023 20:32:50 +0200 Subject: [PATCH] Bug(LogQL): Fix mismatch results on `scalar` and `vector` binary ops (#10997) **What this PR does / why we need it**: [According to LogQL doc,](https://grafana.com/docs/loki/latest/query/#comparison-operators) binary operators between `scalar` and `vector` should behave as follows (either it's `scalar` op `vector` or `vector` op `scalar`) ``` Between a vector and a scalar, these operators are applied to the value of every data sample in the vector ``` PromQL (where LogQL inspired from) [also works that way](https://prometheus.io/docs/prometheus/latest/querying/operators/). Currently LogQL violates this behaviour. It returns whatever on the `left` (even if it's scalar). Look into the attached issue for more details. Changes: 1. Fixed `MergeBinOps` by making it aware when to return left or right (depending on which is from vector originally) 2. Add tests in both `engine_test.go` and `evaluator_test.go` to lock this behaviour 3. Added appropriate comments to the types and functions. **Which issue(s) this PR fixes**: Fixes https://github.com/grafana/loki/issues/10741 **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. --------- Signed-off-by: Kaviraj --- pkg/logql/engine_test.go | 81 +++++++++++++++++++++++++++ pkg/logql/evaluator.go | 3 +- pkg/logql/evaluator_test.go | 103 +++++++++++++++++++++++++++++++++-- pkg/logql/syntax/ast.go | 23 ++++++-- pkg/logql/syntax/ast_test.go | 9 +-- 5 files changed, 203 insertions(+), 16 deletions(-) diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 94f4687c35279..ef7d5e0538e3d 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -675,6 +675,25 @@ func TestEngine_LogsInstantQuery(t *testing.T) { {T: 60 * 1000, F: 60, Metric: labels.FromStrings("app", "foo")}, }, }, + { + // should return same results as `count_over_time({app="foo"}[1m]) > 1`. + // https://grafana.com/docs/loki/latest/query/#comparison-operators + // Between a vector and a scalar, these operators are + // applied to the value of every data sample in the vector + `1 < count_over_time({app="foo"}[1m])`, + time.Unix(60, 0), + logproto.FORWARD, + 0, + [][]logproto.Series{ + {newSeries(testSize, identity, `{app="foo"}`)}, + }, + []SelectSampleParams{ + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}}, + }, + promql.Vector{ + {T: 60 * 1000, F: 60, Metric: labels.FromStrings("app", "foo")}, + }, + }, { `count_over_time({app="foo"}[1m]) > count_over_time({app="bar"}[1m])`, time.Unix(60, 0), @@ -2107,6 +2126,34 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, }, + { + // should return same results as `bytes_over_time({app="foo"}[30s]) > 1`. + // https://grafana.com/docs/loki/latest/query/#comparison-operators + // Between a vector and a scalar, these operators are + // applied to the value of every data sample in the vector + `1 < bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10, + [][]logproto.Series{ + {logproto.Series{ + Labels: `{app="foo"}`, + Samples: []logproto.Sample{ + {Timestamp: time.Unix(45, 0).UnixNano(), Hash: 1, Value: 5.}, // 5 bytes + {Timestamp: time.Unix(60, 0).UnixNano(), Hash: 2, Value: 0.}, + {Timestamp: time.Unix(75, 0).UnixNano(), Hash: 3, Value: 0.}, + {Timestamp: time.Unix(90, 0).UnixNano(), Hash: 4, Value: 0.}, + {Timestamp: time.Unix(105, 0).UnixNano(), Hash: 5, Value: 4.}, // 4 bytes + }, + }}, + }, + []SelectSampleParams{ + {&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(120, 0), Selector: `bytes_over_time({app="foo"}[30s])`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.FromStrings("app", "foo"), + Floats: []promql.FPoint{{T: 60 * 1000, F: 5.}, {T: 105 * 1000, F: 4.}, {T: 120 * 1000, F: 4.}}, + }, + }, + }, { `bytes_over_time({app="foo"}[30s]) > bool 1`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10, [][]logproto.Series{ @@ -2137,6 +2184,40 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, }, + { + // should return same results as `bytes_over_time({app="foo"}[30s]) > bool 1`. + // https://grafana.com/docs/loki/latest/query/#comparison-operators + // Between a vector and a scalar, these operators are + // applied to the value of every data sample in the vector + `1 < bool bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10, + [][]logproto.Series{ + {logproto.Series{ + Labels: `{app="foo"}`, + Samples: []logproto.Sample{ + {Timestamp: time.Unix(45, 0).UnixNano(), Hash: 1, Value: 5.}, // 5 bytes + {Timestamp: time.Unix(60, 0).UnixNano(), Hash: 2, Value: 0.}, + {Timestamp: time.Unix(75, 0).UnixNano(), Hash: 3, Value: 0.}, + {Timestamp: time.Unix(90, 0).UnixNano(), Hash: 4, Value: 0.}, + {Timestamp: time.Unix(105, 0).UnixNano(), Hash: 5, Value: 4.}, // 4 bytes + }, + }}, + }, + []SelectSampleParams{ + {&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(120, 0), Selector: `bytes_over_time({app="foo"}[30s])`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.FromStrings("app", "foo"), + Floats: []promql.FPoint{ + {T: 60000, F: 1}, + {T: 75000, F: 0}, + {T: 90000, F: 0}, + {T: 105000, F: 1}, + {T: 120000, F: 1}, + }, + }, + }, + }, { // tests combining two streams + unwrap `sum(rate({job="foo"} | logfmt | bar > 0 | unwrap bazz [30s]))`, time.Unix(60, 0), time.Unix(120, 0), 30 * time.Second, 0, logproto.FORWARD, 10, diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 1e5a5f19768f1..07a056e4ffe58 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -809,7 +809,7 @@ func vectorBinop(op string, opts *syntax.BinOpOptions, lhs, rhs promql.Vector, l ls, rs = rs, ls } } - merged, err := syntax.MergeBinOp(op, ls, rs, filter, syntax.IsComparisonOperator(op)) + merged, err := syntax.MergeBinOp(op, ls, rs, false, filter, syntax.IsComparisonOperator(op)) if err != nil { return nil, err } @@ -973,6 +973,7 @@ func (e *LiteralStepEvaluator) Next() (bool, int64, StepResult) { e.op, left, right, + !e.inverted, !e.returnBool, syntax.IsComparisonOperator(e.op), ) diff --git a/pkg/logql/evaluator_test.go b/pkg/logql/evaluator_test.go index bc600d6707bf2..1bec3d9c67d68 100644 --- a/pkg/logql/evaluator_test.go +++ b/pkg/logql/evaluator_test.go @@ -3,8 +3,11 @@ package logql import ( "math" "testing" + "time" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logql/syntax" @@ -20,6 +23,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) { }, false, false, + false, ) require.NoError(t, err) @@ -33,6 +37,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) { }, false, false, + false, ) require.NoError(t, err) require.Equal(t, true, math.IsNaN(binOp.F)) @@ -220,14 +225,14 @@ func TestEvaluator_mergeBinOpComparisons(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { // comparing a binop should yield the unfiltered (non-nil variant) regardless // of whether this is a vector-vector comparison or not. - op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false) + op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false, false) require.NoError(t, err) require.Equal(t, tc.expected, op) - op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true) + op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false, true) require.NoError(t, err) require.Equal(t, tc.expected, op2) - op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, true) + op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, false, true) require.NoError(t, err) require.Nil(t, op3) @@ -235,16 +240,16 @@ func TestEvaluator_mergeBinOpComparisons(t *testing.T) { if tc.expected.F == 0 { // ensure zeroed predicates are filtered out - op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, true, false) + op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true, false) require.NoError(t, err) require.Nil(t, op) - op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, true, true) + op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true, true) require.NoError(t, err) require.Nil(t, op2) // for vector-vector comparisons, ensure that nil right hand sides // translate into nil results - op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, true, true) + op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, true, true) require.NoError(t, err) require.Nil(t, op3) @@ -280,6 +285,52 @@ func TestEmptyNestedEvaluator(t *testing.T) { } +func TestLiteralStepEvaluator(t *testing.T) { + cases := []struct { + name string + expr *LiteralStepEvaluator + expected []float64 + }{ + { + name: "vector op scalar", + // e.g: sum(count_over_time({app="foo"}[1m])) > 20 + expr: &LiteralStepEvaluator{ + nextEv: newReturnVectorEvaluator([]float64{20, 21, 22, 23}), + val: 20, + inverted: true, // set to true, because literal expression is not on left. + op: ">", + }, + expected: []float64{21, 22, 23}, + }, + { + name: "scalar op vector", + // e.g: 20 < sum(count_over_time({app="foo"}[1m])) + expr: &LiteralStepEvaluator{ + nextEv: newReturnVectorEvaluator([]float64{20, 21, 22, 23}), + val: 20, + inverted: false, + op: "<", + }, + expected: []float64{21, 22, 23}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ok, _, got := tc.expr.Next() + require.True(t, ok) + vecs := got.SampleVector() + gotSamples := make([]float64, 0, len(vecs)) + + for _, v := range vecs { + gotSamples = append(gotSamples, v.F) + } + + assert.Equal(t, tc.expected, gotSamples) + }) + } +} + type emptyEvaluator struct{} func (*emptyEvaluator) Next() (ok bool, ts int64, r StepResult) { @@ -295,3 +346,43 @@ func (*emptyEvaluator) Error() error { } func (*emptyEvaluator) Explain(Node) {} + +// returnVectorEvaluator returns elements of vector +// passed in, everytime it's `Next()` is called. Used for testing. +type returnVectorEvaluator struct { + vec promql.Vector +} + +func (e *returnVectorEvaluator) Next() (ok bool, ts int64, r StepResult) { + return true, 0, SampleVector(e.vec) +} + +func (*returnVectorEvaluator) Close() error { + return nil +} + +func (*returnVectorEvaluator) Error() error { + return nil +} + +func (*returnVectorEvaluator) Explain(Node) { + +} + +func newReturnVectorEvaluator(vec []float64) *returnVectorEvaluator { + testTime := time.Now().Unix() + + pvec := make([]promql.Sample, 0, len(vec)) + + for _, v := range vec { + pvec = append(pvec, promql.Sample{ + T: testTime, + F: v, + Metric: labels.FromStrings("foo", "bar"), + }) + } + + return &returnVectorEvaluator{ + vec: pvec, + } +} diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index 2deda9ff612cc..aa4aa7fa80617 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -1631,6 +1631,7 @@ func reduceBinOp(op string, left, right float64) *LiteralExpr { &promql.Sample{F: right}, false, false, + false, ) if err != nil { return &LiteralExpr{err: err} @@ -1638,7 +1639,13 @@ func reduceBinOp(op string, left, right float64) *LiteralExpr { return &LiteralExpr{Val: merged.F} } -func MergeBinOp(op string, left, right *promql.Sample, filter, isVectorComparison bool) (*promql.Sample, error) { +// MergeBinOp performs `op` on `left` and `right` arguments and return the `promql.Sample` value. +// In case of vector and scalar arguments, MergeBinOp assumes `left` is always vector. +// pass `swap=true` otherwise. +// This matters because, either it's (vector op scalar) or (scalar op vector), the return sample value should +// always be sample value of vector argument. +// https://github.com/grafana/loki/issues/10741 +func MergeBinOp(op string, left, right *promql.Sample, swap, filter, isVectorComparison bool) (*promql.Sample, error) { var merger func(left, right *promql.Sample) *promql.Sample switch op { @@ -1826,11 +1833,17 @@ func MergeBinOp(op string, left, right *promql.Sample, filter, isVectorCompariso } if filter { - // if a filter-enabled vector-wise comparison has returned non-nil, - // ensure we return the left hand side's value (2) instead of the - // comparison operator's result (1: the truthy answer) + // if a filter is enabled vector-wise comparison has returned non-nil, + // ensure we return the vector hand side's sample value, instead of the + // comparison operator's result (1: the truthy answer. a.k.a bool) + + retSample := left + if swap { + retSample = right + } + if res != nil { - return left, nil + return retSample, nil } } return res, nil diff --git a/pkg/logql/syntax/ast_test.go b/pkg/logql/syntax/ast_test.go index 0ffbc3851e407..e1570e07e8c1f 100644 --- a/pkg/logql/syntax/ast_test.go +++ b/pkg/logql/syntax/ast_test.go @@ -177,10 +177,10 @@ func Test_SampleExpr_String(t *testing.T) { ) `, `((( - sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-1min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60) - or on () - ((sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-15min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 15) / 60)) - or on () + sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-1min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60) + or on () + ((sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-15min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 15) / 60)) + or on () ((sum by(typename,pool,commandname,colo) (sum_over_time({_namespace_="appspace", _schema_="appspace-1h", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60) / 60))`, `{app="foo"} | logfmt code="response.code", IPAddress="host"`, } { @@ -670,6 +670,7 @@ func Test_MergeBinOpVectors_Filter(t *testing.T) { OpTypeGT, &promql.Sample{F: 2}, &promql.Sample{F: 0}, + false, true, true, )