Skip to content

Commit

Permalink
Bug(LogQL): Fix mismatch results on scalar and vector binary ops (g…
Browse files Browse the repository at this point in the history
…rafana#10997)

**What this PR does / why we need it**:

[According to LogQL
doc,](https://grafana.com/docs/loki/latest/query/#comparison-operators)
binary operators between `scalar` and `vector` should behave as follows
(either it's `scalar` op `vector` or `vector` op `scalar`)

```
Between a vector and a scalar, these operators are applied to the value of every data sample in the vector
```
PromQL (where LogQL inspired from) [also works that
way](https://prometheus.io/docs/prometheus/latest/querying/operators/).

Currently LogQL violates this behaviour. It returns whatever on the
`left` (even if it's scalar). Look into the attached issue for more
details.

Changes:
1. Fixed `MergeBinOps` by making it aware when to return left or right
(depending on which is from vector originally)
2. Add tests in both `engine_test.go` and `evaluator_test.go` to lock
this behaviour
3. Added appropriate comments to the types and functions.

**Which issue(s) this PR fixes**:
Fixes grafana#10741

**Special notes for your reviewer**:


**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory. <!--
TODO(salvacorts): Add example PR -->

---------

Signed-off-by: Kaviraj <[email protected]>
  • Loading branch information
kavirajk authored and rhnasc committed Apr 12, 2024
1 parent f518185 commit 666ddcf
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 16 deletions.
81 changes: 81 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,25 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{T: 60 * 1000, F: 60, Metric: labels.FromStrings("app", "foo")},
},
},
{
// should return same results as `count_over_time({app="foo"}[1m]) > 1`.
// https://grafana.com/docs/loki/latest/query/#comparison-operators
// Between a vector and a scalar, these operators are
// applied to the value of every data sample in the vector
`1 < count_over_time({app="foo"}[1m])`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}},
},
promql.Vector{
{T: 60 * 1000, F: 60, Metric: labels.FromStrings("app", "foo")},
},
},
{
`count_over_time({app="foo"}[1m]) > count_over_time({app="bar"}[1m])`,
time.Unix(60, 0),
Expand Down Expand Up @@ -2107,6 +2126,34 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
// should return same results as `bytes_over_time({app="foo"}[30s]) > 1`.
// https://grafana.com/docs/loki/latest/query/#comparison-operators
// Between a vector and a scalar, these operators are
// applied to the value of every data sample in the vector
`1 < bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Series{
{logproto.Series{
Labels: `{app="foo"}`,
Samples: []logproto.Sample{
{Timestamp: time.Unix(45, 0).UnixNano(), Hash: 1, Value: 5.}, // 5 bytes
{Timestamp: time.Unix(60, 0).UnixNano(), Hash: 2, Value: 0.},
{Timestamp: time.Unix(75, 0).UnixNano(), Hash: 3, Value: 0.},
{Timestamp: time.Unix(90, 0).UnixNano(), Hash: 4, Value: 0.},
{Timestamp: time.Unix(105, 0).UnixNano(), Hash: 5, Value: 4.}, // 4 bytes
},
}},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(120, 0), Selector: `bytes_over_time({app="foo"}[30s])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.FromStrings("app", "foo"),
Floats: []promql.FPoint{{T: 60 * 1000, F: 5.}, {T: 105 * 1000, F: 4.}, {T: 120 * 1000, F: 4.}},
},
},
},
{
`bytes_over_time({app="foo"}[30s]) > bool 1`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Series{
Expand Down Expand Up @@ -2137,6 +2184,40 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
// should return same results as `bytes_over_time({app="foo"}[30s]) > bool 1`.
// https://grafana.com/docs/loki/latest/query/#comparison-operators
// Between a vector and a scalar, these operators are
// applied to the value of every data sample in the vector
`1 < bool bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Series{
{logproto.Series{
Labels: `{app="foo"}`,
Samples: []logproto.Sample{
{Timestamp: time.Unix(45, 0).UnixNano(), Hash: 1, Value: 5.}, // 5 bytes
{Timestamp: time.Unix(60, 0).UnixNano(), Hash: 2, Value: 0.},
{Timestamp: time.Unix(75, 0).UnixNano(), Hash: 3, Value: 0.},
{Timestamp: time.Unix(90, 0).UnixNano(), Hash: 4, Value: 0.},
{Timestamp: time.Unix(105, 0).UnixNano(), Hash: 5, Value: 4.}, // 4 bytes
},
}},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(120, 0), Selector: `bytes_over_time({app="foo"}[30s])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.FromStrings("app", "foo"),
Floats: []promql.FPoint{
{T: 60000, F: 1},
{T: 75000, F: 0},
{T: 90000, F: 0},
{T: 105000, F: 1},
{T: 120000, F: 1},
},
},
},
},
{
// tests combining two streams + unwrap
`sum(rate({job="foo"} | logfmt | bar > 0 | unwrap bazz [30s]))`, time.Unix(60, 0), time.Unix(120, 0), 30 * time.Second, 0, logproto.FORWARD, 10,
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func vectorBinop(op string, opts *syntax.BinOpOptions, lhs, rhs promql.Vector, l
ls, rs = rs, ls
}
}
merged, err := syntax.MergeBinOp(op, ls, rs, filter, syntax.IsComparisonOperator(op))
merged, err := syntax.MergeBinOp(op, ls, rs, false, filter, syntax.IsComparisonOperator(op))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -973,6 +973,7 @@ func (e *LiteralStepEvaluator) Next() (bool, int64, StepResult) {
e.op,
left,
right,
!e.inverted,
!e.returnBool,
syntax.IsComparisonOperator(e.op),
)
Expand Down
103 changes: 97 additions & 6 deletions pkg/logql/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package logql
import (
"math"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logql/syntax"
Expand All @@ -20,6 +23,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) {
},
false,
false,
false,
)
require.NoError(t, err)

Expand All @@ -33,6 +37,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) {
},
false,
false,
false,
)
require.NoError(t, err)
require.Equal(t, true, math.IsNaN(binOp.F))
Expand Down Expand Up @@ -220,31 +225,31 @@ func TestEvaluator_mergeBinOpComparisons(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
// comparing a binop should yield the unfiltered (non-nil variant) regardless
// of whether this is a vector-vector comparison or not.
op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false)
op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false, false)
require.NoError(t, err)
require.Equal(t, tc.expected, op)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false, true)
require.NoError(t, err)
require.Equal(t, tc.expected, op2)

op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, true)
op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, false, true)
require.NoError(t, err)
require.Nil(t, op3)

// test filtered variants
if tc.expected.F == 0 {
// ensure zeroed predicates are filtered out

op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, true, false)
op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true, false)
require.NoError(t, err)
require.Nil(t, op)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, true, true)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true, true)
require.NoError(t, err)
require.Nil(t, op2)

// for vector-vector comparisons, ensure that nil right hand sides
// translate into nil results
op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, true, true)
op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, true, true)
require.NoError(t, err)
require.Nil(t, op3)

Expand Down Expand Up @@ -280,6 +285,52 @@ func TestEmptyNestedEvaluator(t *testing.T) {

}

func TestLiteralStepEvaluator(t *testing.T) {
cases := []struct {
name string
expr *LiteralStepEvaluator
expected []float64
}{
{
name: "vector op scalar",
// e.g: sum(count_over_time({app="foo"}[1m])) > 20
expr: &LiteralStepEvaluator{
nextEv: newReturnVectorEvaluator([]float64{20, 21, 22, 23}),
val: 20,
inverted: true, // set to true, because literal expression is not on left.
op: ">",
},
expected: []float64{21, 22, 23},
},
{
name: "scalar op vector",
// e.g: 20 < sum(count_over_time({app="foo"}[1m]))
expr: &LiteralStepEvaluator{
nextEv: newReturnVectorEvaluator([]float64{20, 21, 22, 23}),
val: 20,
inverted: false,
op: "<",
},
expected: []float64{21, 22, 23},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ok, _, got := tc.expr.Next()
require.True(t, ok)
vecs := got.SampleVector()
gotSamples := make([]float64, 0, len(vecs))

for _, v := range vecs {
gotSamples = append(gotSamples, v.F)
}

assert.Equal(t, tc.expected, gotSamples)
})
}
}

type emptyEvaluator struct{}

func (*emptyEvaluator) Next() (ok bool, ts int64, r StepResult) {
Expand All @@ -295,3 +346,43 @@ func (*emptyEvaluator) Error() error {
}

func (*emptyEvaluator) Explain(Node) {}

// returnVectorEvaluator returns elements of vector
// passed in, everytime it's `Next()` is called. Used for testing.
type returnVectorEvaluator struct {
vec promql.Vector
}

func (e *returnVectorEvaluator) Next() (ok bool, ts int64, r StepResult) {
return true, 0, SampleVector(e.vec)
}

func (*returnVectorEvaluator) Close() error {
return nil
}

func (*returnVectorEvaluator) Error() error {
return nil
}

func (*returnVectorEvaluator) Explain(Node) {

}

func newReturnVectorEvaluator(vec []float64) *returnVectorEvaluator {
testTime := time.Now().Unix()

pvec := make([]promql.Sample, 0, len(vec))

for _, v := range vec {
pvec = append(pvec, promql.Sample{
T: testTime,
F: v,
Metric: labels.FromStrings("foo", "bar"),
})
}

return &returnVectorEvaluator{
vec: pvec,
}
}
23 changes: 18 additions & 5 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,14 +1631,21 @@ func reduceBinOp(op string, left, right float64) *LiteralExpr {
&promql.Sample{F: right},
false,
false,
false,
)
if err != nil {
return &LiteralExpr{err: err}
}
return &LiteralExpr{Val: merged.F}
}

func MergeBinOp(op string, left, right *promql.Sample, filter, isVectorComparison bool) (*promql.Sample, error) {
// MergeBinOp performs `op` on `left` and `right` arguments and return the `promql.Sample` value.
// In case of vector and scalar arguments, MergeBinOp assumes `left` is always vector.
// pass `swap=true` otherwise.
// This matters because, either it's (vector op scalar) or (scalar op vector), the return sample value should
// always be sample value of vector argument.
// https://github.com/grafana/loki/issues/10741
func MergeBinOp(op string, left, right *promql.Sample, swap, filter, isVectorComparison bool) (*promql.Sample, error) {
var merger func(left, right *promql.Sample) *promql.Sample

switch op {
Expand Down Expand Up @@ -1826,11 +1833,17 @@ func MergeBinOp(op string, left, right *promql.Sample, filter, isVectorCompariso
}

if filter {
// if a filter-enabled vector-wise comparison has returned non-nil,
// ensure we return the left hand side's value (2) instead of the
// comparison operator's result (1: the truthy answer)
// if a filter is enabled vector-wise comparison has returned non-nil,
// ensure we return the vector hand side's sample value, instead of the
// comparison operator's result (1: the truthy answer. a.k.a bool)

retSample := left
if swap {
retSample = right
}

if res != nil {
return left, nil
return retSample, nil
}
}
return res, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ func Test_SampleExpr_String(t *testing.T) {
)
`,
`(((
sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-1min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60)
or on ()
((sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-15min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 15) / 60))
or on ()
sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-1min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60)
or on ()
((sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-15min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 15) / 60))
or on ()
((sum by(typename,pool,commandname,colo) (sum_over_time({_namespace_="appspace", _schema_="appspace-1h", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60) / 60))`,
`{app="foo"} | logfmt code="response.code", IPAddress="host"`,
} {
Expand Down Expand Up @@ -670,6 +670,7 @@ func Test_MergeBinOpVectors_Filter(t *testing.T) {
OpTypeGT,
&promql.Sample{F: 2},
&promql.Sample{F: 0},
false,
true,
true,
)
Expand Down

0 comments on commit 666ddcf

Please sign in to comment.