Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug(LogQL): Fix mismatch results on scalar and vector binary ops #10997

Merged
merged 11 commits into from
Oct 27, 2023
81 changes: 81 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,25 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{T: 60 * 1000, F: 60, Metric: labels.FromStrings("app", "foo")},
},
},
{
// should return same results as `count_over_time({app="foo"}[1m]) > 1`.
// https://grafana.com/docs/loki/latest/query/#comparison-operators
// Between a vector and a scalar, these operators are
// applied to the value of every data sample in the vector
`1 < count_over_time({app="foo"}[1m])`,
time.Unix(60, 0),
logproto.FORWARD,
0,
[][]logproto.Series{
{newSeries(testSize, identity, `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}},
},
promql.Vector{
{T: 60 * 1000, F: 60, Metric: labels.FromStrings("app", "foo")},
},
},
{
`count_over_time({app="foo"}[1m]) > count_over_time({app="bar"}[1m])`,
time.Unix(60, 0),
Expand Down Expand Up @@ -2107,6 +2126,34 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
// should return same results as `bytes_over_time({app="foo"}[30s]) > 1`.
// https://grafana.com/docs/loki/latest/query/#comparison-operators
// Between a vector and a scalar, these operators are
// applied to the value of every data sample in the vector
`1 < bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Series{
{logproto.Series{
Labels: `{app="foo"}`,
Samples: []logproto.Sample{
{Timestamp: time.Unix(45, 0).UnixNano(), Hash: 1, Value: 5.}, // 5 bytes
{Timestamp: time.Unix(60, 0).UnixNano(), Hash: 2, Value: 0.},
{Timestamp: time.Unix(75, 0).UnixNano(), Hash: 3, Value: 0.},
{Timestamp: time.Unix(90, 0).UnixNano(), Hash: 4, Value: 0.},
{Timestamp: time.Unix(105, 0).UnixNano(), Hash: 5, Value: 4.}, // 4 bytes
},
}},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(120, 0), Selector: `bytes_over_time({app="foo"}[30s])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.FromStrings("app", "foo"),
Floats: []promql.FPoint{{T: 60 * 1000, F: 5.}, {T: 105 * 1000, F: 4.}, {T: 120 * 1000, F: 4.}},
},
},
},
{
`bytes_over_time({app="foo"}[30s]) > bool 1`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Series{
Expand Down Expand Up @@ -2137,6 +2184,40 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
// should return same results as `bytes_over_time({app="foo"}[30s]) > bool 1`.
// https://grafana.com/docs/loki/latest/query/#comparison-operators
// Between a vector and a scalar, these operators are
// applied to the value of every data sample in the vector
`1 < bool bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Series{
{logproto.Series{
Labels: `{app="foo"}`,
Samples: []logproto.Sample{
{Timestamp: time.Unix(45, 0).UnixNano(), Hash: 1, Value: 5.}, // 5 bytes
{Timestamp: time.Unix(60, 0).UnixNano(), Hash: 2, Value: 0.},
{Timestamp: time.Unix(75, 0).UnixNano(), Hash: 3, Value: 0.},
{Timestamp: time.Unix(90, 0).UnixNano(), Hash: 4, Value: 0.},
{Timestamp: time.Unix(105, 0).UnixNano(), Hash: 5, Value: 4.}, // 4 bytes
},
}},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(120, 0), Selector: `bytes_over_time({app="foo"}[30s])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.FromStrings("app", "foo"),
Floats: []promql.FPoint{
{T: 60000, F: 1},
{T: 75000, F: 0},
{T: 90000, F: 0},
{T: 105000, F: 1},
{T: 120000, F: 1},
},
},
},
},
{
// tests combining two streams + unwrap
`sum(rate({job="foo"} | logfmt | bar > 0 | unwrap bazz [30s]))`, time.Unix(60, 0), time.Unix(120, 0), 30 * time.Second, 0, logproto.FORWARD, 10,
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func vectorBinop(op string, opts *syntax.BinOpOptions, lhs, rhs promql.Vector, l
ls, rs = rs, ls
}
}
merged, err := syntax.MergeBinOp(op, ls, rs, filter, syntax.IsComparisonOperator(op))
merged, err := syntax.MergeBinOp(op, ls, rs, false, filter, syntax.IsComparisonOperator(op))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -973,6 +973,7 @@ func (e *LiteralStepEvaluator) Next() (bool, int64, StepResult) {
e.op,
left,
right,
!e.inverted,
!e.returnBool,
syntax.IsComparisonOperator(e.op),
)
Expand Down
103 changes: 97 additions & 6 deletions pkg/logql/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package logql
import (
"math"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logql/syntax"
Expand All @@ -20,6 +23,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) {
},
false,
false,
false,
)
require.NoError(t, err)

Expand All @@ -33,6 +37,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) {
},
false,
false,
false,
)
require.NoError(t, err)
require.Equal(t, true, math.IsNaN(binOp.F))
Expand Down Expand Up @@ -220,31 +225,31 @@ func TestEvaluator_mergeBinOpComparisons(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
// comparing a binop should yield the unfiltered (non-nil variant) regardless
// of whether this is a vector-vector comparison or not.
op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false)
op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false, false)
require.NoError(t, err)
require.Equal(t, tc.expected, op)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, false, true)
require.NoError(t, err)
require.Equal(t, tc.expected, op2)

op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, true)
op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, false, true)
require.NoError(t, err)
require.Nil(t, op3)

// test filtered variants
if tc.expected.F == 0 {
// ensure zeroed predicates are filtered out

op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, true, false)
op, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true, false)
require.NoError(t, err)
require.Nil(t, op)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, true, true)
op2, err := syntax.MergeBinOp(tc.op, tc.lhs, tc.rhs, false, true, true)
require.NoError(t, err)
require.Nil(t, op2)

// for vector-vector comparisons, ensure that nil right hand sides
// translate into nil results
op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, true, true)
op3, err := syntax.MergeBinOp(tc.op, tc.lhs, nil, false, true, true)
require.NoError(t, err)
require.Nil(t, op3)

Expand Down Expand Up @@ -280,6 +285,52 @@ func TestEmptyNestedEvaluator(t *testing.T) {

}

func TestLiteralStepEvaluator(t *testing.T) {
cases := []struct {
name string
expr *LiteralStepEvaluator
expected []float64
}{
{
name: "vector op scalar",
// e.g: sum(count_over_time({app="foo"}[1m])) > 20
expr: &LiteralStepEvaluator{
nextEv: newReturnVectorEvaluator([]float64{20, 21, 22, 23}),
val: 20,
inverted: true, // set to true, because literal expression is not on left.
op: ">",
},
expected: []float64{21, 22, 23},
},
{
name: "scalar op vector",
// e.g: 20 < sum(count_over_time({app="foo"}[1m]))
expr: &LiteralStepEvaluator{
nextEv: newReturnVectorEvaluator([]float64{20, 21, 22, 23}),
val: 20,
inverted: false,
op: "<",
},
expected: []float64{21, 22, 23},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ok, _, got := tc.expr.Next()
require.True(t, ok)
vecs := got.SampleVector()
gotSamples := make([]float64, 0, len(vecs))

for _, v := range vecs {
gotSamples = append(gotSamples, v.F)
}

assert.Equal(t, tc.expected, gotSamples)
})
}
}

type emptyEvaluator struct{}

func (*emptyEvaluator) Next() (ok bool, ts int64, r StepResult) {
Expand All @@ -295,3 +346,43 @@ func (*emptyEvaluator) Error() error {
}

func (*emptyEvaluator) Explain(Node) {}

// returnVectorEvaluator returns elements of vector
// passed in, everytime it's `Next()` is called. Used for testing.
type returnVectorEvaluator struct {
vec promql.Vector
}

func (e *returnVectorEvaluator) Next() (ok bool, ts int64, r StepResult) {
return true, 0, SampleVector(e.vec)
}

func (*returnVectorEvaluator) Close() error {
return nil
}

func (*returnVectorEvaluator) Error() error {
return nil
}

func (*returnVectorEvaluator) Explain(Node) {

}

func newReturnVectorEvaluator(vec []float64) *returnVectorEvaluator {
testTime := time.Now().Unix()

pvec := make([]promql.Sample, 0, len(vec))

for _, v := range vec {
pvec = append(pvec, promql.Sample{
T: testTime,
F: v,
Metric: labels.FromStrings("foo", "bar"),
})
}

return &returnVectorEvaluator{
vec: pvec,
}
}
23 changes: 18 additions & 5 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,14 +1631,21 @@ func reduceBinOp(op string, left, right float64) *LiteralExpr {
&promql.Sample{F: right},
false,
false,
false,
)
if err != nil {
return &LiteralExpr{err: err}
}
return &LiteralExpr{Val: merged.F}
}

func MergeBinOp(op string, left, right *promql.Sample, filter, isVectorComparison bool) (*promql.Sample, error) {
// MergeBinOp performs `op` on `left` and `right` arguments and return the `promql.Sample` value.
// In case of vector and scalar arguments, MergeBinOp assumes `left` is always vector.
// pass `swap=true` otherwise.
// This matters because, either it's (vector op scalar) or (scalar op vector), the return sample value should
// always be sample value of vector argument.
// https://github.com/grafana/loki/issues/10741
func MergeBinOp(op string, left, right *promql.Sample, swap, filter, isVectorComparison bool) (*promql.Sample, error) {
var merger func(left, right *promql.Sample) *promql.Sample

switch op {
Expand Down Expand Up @@ -1826,11 +1833,17 @@ func MergeBinOp(op string, left, right *promql.Sample, filter, isVectorCompariso
}

if filter {
// if a filter-enabled vector-wise comparison has returned non-nil,
// ensure we return the left hand side's value (2) instead of the
// comparison operator's result (1: the truthy answer)
// if a filter is enabled vector-wise comparison has returned non-nil,
// ensure we return the vector hand side's sample value, instead of the
// comparison operator's result (1: the truthy answer. a.k.a bool)

retSample := left
if swap {
retSample = right
}

if res != nil {
return left, nil
return retSample, nil
}
}
return res, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ func Test_SampleExpr_String(t *testing.T) {
)
`,
`(((
sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-1min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60)
or on ()
((sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-15min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 15) / 60))
or on ()
sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-1min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60)
or on ()
((sum by(typename,pool,commandname,colo)(sum_over_time({_namespace_="appspace", _schema_="appspace-15min", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 15) / 60))
or on ()
((sum by(typename,pool,commandname,colo) (sum_over_time({_namespace_="appspace", _schema_="appspace-1h", pool=~"r1testlvs", colo=~"slc|lvs|rno", env!~"(pre-production|sandbox)"} | logfmt | status!="0" | ( ( type=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" or typename=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) or status=~"(?i)^(Error|Exception|Fatal|ERRPAGE|ValidationError)$" ) | commandname=~"(?i).*|UNSET" | unwrap sumcount[5m])) / 60) / 60))`,
`{app="foo"} | logfmt code="response.code", IPAddress="host"`,
} {
Expand Down Expand Up @@ -670,6 +670,7 @@ func Test_MergeBinOpVectors_Filter(t *testing.T) {
OpTypeGT,
&promql.Sample{F: 2},
&promql.Sample{F: 0},
false,
true,
true,
)
Expand Down