Skip to content

Commit

Permalink
Fix metrics queries with unscoped attributes (grafana#4409)
Browse files Browse the repository at this point in the history
* Fix metrics evaluator incorrectly optimizing away callback when unscoped attrs present, add integration tests

* Fix falsey metrics queries to not return any data

* changelog

* Detect a few more noop queries and exit early from search too
  • Loading branch information
mdisibio authored Dec 4, 2024
1 parent b0a06e8 commit b9321f4
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#4236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)
* [BUGFIX] Fix traceql metrics returning incorrect data for falsey queries and unscoped attributes [#4409](https://github.com/grafana/tempo/pull/4409) (@mdisibio)
* [BUGFIX] Fix traceql metrics time range handling at the cutoff between recent and backend data [#4257](https://github.com/grafana/tempo/issues/4257) (@mdisibio)
* [BUGFIX] Fix several issues with exemplar values for traceql metrics [#4366](https://github.com/grafana/tempo/pull/4366) [#4404](https://github.com/grafana/tempo/pull/4404) (@mdisibio)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
Expand Down
7 changes: 7 additions & 0 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewBadRequest(err), nil
}

if expr.IsNoop() {
// Empty response
ch := make(chan pipeline.Request, 2)
close(ch)
return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, ch, nil, s.next), nil
}

tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return pipeline.NewBadRequest(err), nil
Expand Down
19 changes: 19 additions & 0 deletions pkg/tempopb/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package tempopb

import v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"

func MakeKeyValueString(key, value string) v1.KeyValue {
return v1.KeyValue{
Key: key,
Value: &v1.AnyValue{
Value: &v1.AnyValue_StringValue{
StringValue: value,
},
},
}
}

func MakeKeyValueStringPtr(key, value string) *v1.KeyValue {
kv := MakeKeyValueString(key, value)
return &kv
}
43 changes: 43 additions & 0 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,49 @@ func (r *RootExpr) withHints(h *Hints) *RootExpr {
return r
}

// IsNoop detects trival noop queries like {false} which never return
// results and can be used to exit early.
func (r *RootExpr) IsNoop() bool {
isNoopFilter := func(x any) bool {
f, ok := x.(*SpansetFilter)
if !ok {
return false
}

if f.Expression.referencesSpan() {
return false
}

// Else check for static evaluation to false
v, _ := f.Expression.execute(nil)
return v.Equals(&StaticFalse)
}

// Any spanset filter that references the span or something other
// than static false means the expression isn't noop.
// This checks one layer deep which covers most expressions.
for _, e := range r.Pipeline.Elements {
switch x := e.(type) {
case SpansetOperation:
if !isNoopFilter(x.LHS) {
return false
}
if !isNoopFilter(x.RHS) {
return false
}
case *SpansetFilter:
if !isNoopFilter(x) {
return false
}
default:
// Lots of other expressions here which aren't checked
// for noops yet.
return false
}
}
return true
}

// **********************
// Pipeline
// **********************
Expand Down
32 changes: 32 additions & 0 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,38 @@ import (
"github.com/stretchr/testify/require"
)

func TestRootExprIsNoop(t *testing.T) {
noops := []string{
"{false}",
"{1=0}",
"({false})",
"{false} && {false}",
"{false} >> {false}",
}

for _, q := range noops {
expr, err := Parse(q)
require.NoError(t, err)
require.True(t, expr.IsNoop(), "Query should be noop: %v", q)
}

ops := []string{
"{true}",
"{1=1}",
"{.foo = 1}",
"{false} || {true}",

// This is a noop but not yet detected
"{false} && {true}",
}

for _, q := range ops {
expr, err := Parse(q)
require.NoError(t, err)
require.False(t, expr.IsNoop(), "Query should not be noop: %v", q)
}
}

func TestNewStaticNil(t *testing.T) {
s := NewStaticNil()
assert.Equal(t, TypeNil, s.Type)
Expand Down
7 changes: 7 additions & 0 deletions pkg/traceql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq
return nil, err
}

if rootExpr.IsNoop() {
return &tempopb.SearchResponse{
Traces: nil,
Metrics: &tempopb.SearchMetrics{},
}, nil
}

fetchSpansRequest.StartTimeUnixNanos = unixSecToNano(searchReq.Start)
fetchSpansRequest.EndTimeUnixNanos = unixSecToNano(searchReq.End)

Expand Down
8 changes: 8 additions & 0 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,14 @@ func optimize(req *FetchSpansRequest) {
return
}

// Unscoped attributes like .foo require the second pass to evaluate
for _, c := range req.Conditions {
if c.Attribute.Scope == AttributeScopeNone && c.Attribute.Intrinsic == IntrinsicNone {
// Unscoped (non-intrinsic) attribute
return
}
}

// There is an issue where multiple conditions &&'ed on the same
// attribute can look like AllConditions==true, but are implemented
// in the storage layer like ||'ed and require the second pass callback (engine).
Expand Down
196 changes: 196 additions & 0 deletions tempodb/tempodb_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package tempodb

import (
"context"
"path"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
resource_v1 "github.com/grafana/tempo/pkg/tempopb/resource/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)

var queryRangeTestCases = []struct {
req *tempopb.QueryRangeRequest
expected []*tempopb.TimeSeries
}{
{
req: &tempopb.QueryRangeRequest{
Start: 1,
End: 50 * uint64(time.Second),
Step: 15 * uint64(time.Second),
Query: "{ } | rate()",
},
expected: []*tempopb.TimeSeries{
{
PromLabels: `{__name__="rate"}`,
Labels: []common_v1.KeyValue{tempopb.MakeKeyValueString("__name__", "rate")},
Samples: []tempopb.Sample{
{TimestampMs: 0, Value: 14.0 / 15.0}, // First interval starts at 1, so it only has 14 spans
{TimestampMs: 15_000, Value: 1.0}, // Spans every 1 second
{TimestampMs: 30_000, Value: 1.0}, // Spans every 1 second
{TimestampMs: 45_000, Value: 5.0 / 15.0}, // Interval [45,50) has 5 spans
{TimestampMs: 60_000, Value: 0}, // I think this is a bug that we extend out an extra interval
},
},
},
},
{
req: &tempopb.QueryRangeRequest{
Start: 1,
End: 50 * uint64(time.Second),
Step: 15 * uint64(time.Second),
Query: `{ .service.name="even" } | rate()`,
},
expected: []*tempopb.TimeSeries{
{
PromLabels: `{__name__="rate"}`,
Labels: []common_v1.KeyValue{tempopb.MakeKeyValueString("__name__", "rate")},
Samples: []tempopb.Sample{
{TimestampMs: 0, Value: 7.0 / 15.0}, // Interval [ 1, 14], 7 spans at 2, 4, 6, 8, 10, 12, 14
{TimestampMs: 15_000, Value: 7.0 / 15.0}, // Interval [15, 29], 7 spans at 16, 18, 20, 22, 24, 26, 28
{TimestampMs: 30_000, Value: 8.0 / 15.0}, // Interval [30, 44], 8 spans at 30, 32, 34, 36, 38, 40, 42, 44
{TimestampMs: 45_000, Value: 2.0 / 15.0}, // Interval [45, 50), 2 spans at 46, 48
{TimestampMs: 60_000, Value: 0}, // I think this is a bug that we extend out an extra interval
},
},
},
},
}

func TestTempoDBQueryRange(t *testing.T) {
var (
tempDir = t.TempDir()
blockVersion = vparquet4.VersionString
)

dc := backend.DedicatedColumns{
{Scope: "resource", Name: "res-dedicated.01", Type: "string"},
{Scope: "resource", Name: "res-dedicated.02", Type: "string"},
{Scope: "span", Name: "span-dedicated.01", Type: "string"},
{Scope: "span", Name: "span-dedicated.02", Type: "string"},
}
r, w, c, err := New(&Config{
Backend: backend.Local,
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
Block: &common.BlockConfig{
IndexDownsampleBytes: 17,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Version: blockVersion,
IndexPageSizeBytes: 1000,
RowGroupSizeBytes: 10000,
DedicatedColumns: dc,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IngestionSlack: time.Since(time.Time{}),
},
Search: &SearchConfig{
ChunkSizeBytes: 1_000_000,
ReadBufferCount: 8, ReadBufferSizeBytes: 4 * 1024 * 1024,
},
BlocklistPoll: 0,
}, nil, log.NewNopLogger())
require.NoError(t, err)

err = c.EnableCompaction(context.Background(), &CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{}, &mockOverrides{})
require.NoError(t, err)

ctx := context.Background()
r.EnablePolling(ctx, &mockJobSharder{})

// Write to wal
wal := w.WAL()

meta := &backend.BlockMeta{BlockID: backend.NewUUID(), TenantID: testTenantID, DedicatedColumns: dc}
head, err := wal.NewBlock(meta, model.CurrentEncoding)
require.NoError(t, err)
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

totalSpans := 100
for i := 1; i <= totalSpans; i++ {
tid := test.ValidTraceID(nil)

sp := test.MakeSpan(tid)

// Start time is i seconds
sp.StartTimeUnixNano = uint64(i * int(time.Second))

// Duration is i seconds
sp.EndTimeUnixNano = sp.StartTimeUnixNano + uint64(i*int(time.Second))

// Service name
var svcName string
if i%2 == 0 {
svcName = "even"
} else {
svcName = "odd"
}

tr := &tempopb.Trace{
ResourceSpans: []*v1.ResourceSpans{
{
Resource: &resource_v1.Resource{
Attributes: []*common_v1.KeyValue{tempopb.MakeKeyValueStringPtr("service.name", svcName)},
},
ScopeSpans: []*v1.ScopeSpans{
{
Spans: []*v1.Span{
sp,
},
},
},
},
},
}

b1, err := dec.PrepareForWrite(tr, 0, 0)
require.NoError(t, err)

b2, err := dec.ToObject([][]byte{b1})
require.NoError(t, err)
err = head.Append(tid, b2, 0, 0)
require.NoError(t, err)
}

// Complete block
block, err := w.CompleteBlock(context.Background(), head)
require.NoError(t, err)

f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return block.Fetch(ctx, req, common.DefaultSearchOptions())
})

for _, tc := range queryRangeTestCases {

eval, err := traceql.NewEngine().CompileMetricsQueryRange(tc.req, 0, 0, false)
require.NoError(t, err)

err = eval.Do(ctx, f, 0, 0)
require.NoError(t, err)

actual := eval.Results().ToProto(tc.req)

require.Equal(t, tc.expected, actual, "Query: %v", tc.req.Query)
}
}

0 comments on commit b9321f4

Please sign in to comment.