diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 27cb3e849fa2c..ca0a5d6aa615d 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -279,11 +279,11 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre type errorQuerier struct{} func (errorQuerier) SelectLogs(_ context.Context, _ SelectLogParams) (iter.EntryIterator, error) { - return nil, errors.New("unimplemented") + return nil, errors.New("SelectLogs unimplemented: the query-frontend cannot evaluate an expression that selects logs. this is likely a bug in the query engine. please contact your system operator") } func (errorQuerier) SelectSamples(_ context.Context, _ SelectSampleParams) (iter.SampleIterator, error) { - return nil, errors.New("unimplemented") + return nil, errors.New("SelectSamples unimplemented: the query-frontend cannot evaluate an expression that selects samples. this is likely a bug in the query engine. please contact your system operator") } func NewDownstreamEvaluator(downstreamer Downstreamer) *DownstreamEvaluator { diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 218957f862bb1..426722a554594 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -55,6 +55,14 @@ func TestMappingEquivalence(t *testing.T) { {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true}, + { + ` + (quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1) + and + avg by (a) (rate({a=~".+"}[1s])) + `, + false, + }, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index baa26a2a69c74..4a06b5f804e84 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -62,9 +62,7 @@ func (m ShardMapper) Parse(parsed syntax.Expr) (noop bool, bytesPerShard uint64, return false, 0, nil, err } - originalStr := parsed.String() - mappedStr := mapped.String() - noop = originalStr == mappedStr + noop = isNoOp(parsed, mapped) if noop { m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc() } else { @@ -97,32 +95,62 @@ func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder) (syntax.Expr, case *syntax.RangeAggregationExpr: return m.mapRangeAggregationExpr(e, r) case *syntax.BinOpExpr: - lhsMapped, lhsBytesPerShard, err := m.Map(e.SampleExpr, r) - if err != nil { - return nil, 0, err - } - rhsMapped, rhsBytesPerShard, err := m.Map(e.RHS, r) - if err != nil { - return nil, 0, err - } - lhsSampleExpr, ok := lhsMapped.(syntax.SampleExpr) - if !ok { - return nil, 0, badASTMapping(lhsMapped) - } - rhsSampleExpr, ok := rhsMapped.(syntax.SampleExpr) - if !ok { - return nil, 0, badASTMapping(rhsMapped) + return m.mapBinOpExpr(e, r) + default: + return nil, 0, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m) + } +} + +func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder) (*syntax.BinOpExpr, uint64, error) { + // In a BinOp expression both sides need to be either executed locally or wrapped + // into a downstream expression to be executed on the querier, since the default + // evaluator on the query frontend cannot select logs or samples. + // However, it can evaluate literals and vectors. + + // check if LHS is shardable by mapping the tree + // only wrap in downstream expression if the mapping is a no-op and the + // expression is a vector or literal + lhsMapped, lhsBytesPerShard, err := m.Map(e.SampleExpr, r) + if err != nil { + return nil, 0, err + } + if isNoOp(e.SampleExpr, lhsMapped) && !isLiteralOrVector(lhsMapped) { + lhsMapped = DownstreamSampleExpr{ + shard: nil, + SampleExpr: e.SampleExpr, } - e.SampleExpr = lhsSampleExpr - e.RHS = rhsSampleExpr + } - // We take the maximum bytes per shard of both sides of the operation - bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard))) + // check if RHS is shardable by mapping the tree + // only wrap in downstream expression if the mapping is a no-op and the + // expression is a vector or literal + rhsMapped, rhsBytesPerShard, err := m.Map(e.RHS, r) + if err != nil { + return nil, 0, err + } + if isNoOp(e.SampleExpr, rhsMapped) && !isLiteralOrVector(rhsMapped) { + // TODO: check if literal or vector + rhsMapped = DownstreamSampleExpr{ + shard: nil, + SampleExpr: e.RHS, + } + } - return e, bytesPerShard, nil - default: - return nil, 0, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m) + lhsSampleExpr, ok := lhsMapped.(syntax.SampleExpr) + if !ok { + return nil, 0, badASTMapping(lhsMapped) + } + rhsSampleExpr, ok := rhsMapped.(syntax.SampleExpr) + if !ok { + return nil, 0, badASTMapping(rhsMapped) } + e.SampleExpr = lhsSampleExpr + e.RHS = rhsSampleExpr + + // We take the maximum bytes per shard of both sides of the operation + bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard))) + + return e, bytesPerShard, nil } func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstreamRecorder) (syntax.LogSelectorExpr, uint64, error) { @@ -338,7 +366,7 @@ var rangeMergeMap = map[string]string{ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) { if !expr.Shardable() { - return m.noOp(expr) + return noOp(expr, m.shards) } switch expr.Operation { @@ -433,7 +461,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return nil, 0, err } if shards == 0 || !m.quantileOverTimeSharding { - return m.noOp(expr) + return noOp(expr, m.shards) } // quantile_over_time() by (foo) -> @@ -461,18 +489,32 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, default: // don't shard if there's not an appropriate optimization - return m.noOp(expr) + return noOp(expr, m.shards) } } -func (m ShardMapper) noOp(expr *syntax.RangeAggregationExpr) (syntax.SampleExpr, uint64, error) { - exprStats, err := m.shards.GetStats(expr) +func noOp[E syntax.Expr](expr E, shards ShardResolver) (E, uint64, error) { + exprStats, err := shards.GetStats(expr) if err != nil { - return nil, 0, err + var empty E + return empty, 0, err } return expr, exprStats.Bytes, nil } +func isNoOp(left syntax.Expr, right syntax.Expr) bool { + return left.String() == right.String() +} + +func isLiteralOrVector(e syntax.Expr) bool { + switch e.(type) { + case *syntax.VectorExpr, *syntax.LiteralExpr: + return true + default: + return false + } +} + func badASTMapping(got syntax.Expr) error { return fmt.Errorf("bad AST mapping: expected SampleExpr, but got (%T)", got) } diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 7e9ca7481e286..7a02640c81491 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -1359,6 +1359,93 @@ func TestMapping(t *testing.T) { }, }, }, + { + in: ` + quantile_over_time(0.99, {a="foo"} | unwrap bytes [1s]) by (b) + and + sum by (b) (rate({a="bar"}[1s])) + `, + expr: &syntax.BinOpExpr{ + SampleExpr: DownstreamSampleExpr{ + SampleExpr: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeQuantile, + Params: float64p(0.99), + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "a", "foo")}, + }, + Unwrap: &syntax.UnwrapExpr{ + Identifier: "bytes", + }, + Interval: 1 * time.Second, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"b"}, + }, + }, + }, + RHS: &syntax.VectorAggregationExpr{ + Left: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &syntax.VectorAggregationExpr{ + Left: &syntax.RangeAggregationExpr{ + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "a", "bar")}, + }, + Interval: 1 * time.Second, + }, + Operation: syntax.OpRangeTypeRate, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"b"}, + }, + Params: 0, + Operation: syntax.OpTypeSum, + }, + }, + next: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &syntax.VectorAggregationExpr{ + Left: &syntax.RangeAggregationExpr{ + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "a", "bar")}, + }, + Interval: 1 * time.Second, + }, + Operation: syntax.OpRangeTypeRate, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"b"}, + }, + Params: 0, + Operation: syntax.OpTypeSum, + }, + }, + next: nil, + }, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"b"}, + }, + Operation: syntax.OpTypeSum, + }, + Op: syntax.OpTypeAnd, + Opts: &syntax.BinOpOptions{ + ReturnBool: false, + VectorMatching: &syntax.VectorMatching{}, + }, + }, + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := syntax.ParseExpr(tc.in) @@ -1367,8 +1454,8 @@ func TestMapping(t *testing.T) { mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder()) require.Equal(t, tc.err, err) - require.Equal(t, tc.expr.String(), mapped.String()) - require.Equal(t, tc.expr, mapped) + require.Equal(t, mapped.String(), tc.expr.String()) + require.Equal(t, mapped, tc.expr) }) } } diff --git a/tools/dev/loki-boltdb-storage-s3/config/loki.yaml b/tools/dev/loki-boltdb-storage-s3/config/loki.yaml index 83149885fe85b..ea0cf186e2691 100644 --- a/tools/dev/loki-boltdb-storage-s3/config/loki.yaml +++ b/tools/dev/loki-boltdb-storage-s3/config/loki.yaml @@ -67,7 +67,6 @@ ingester_client: remote_timeout: 1s limits_config: cardinality_limit: 100000 - enforce_metric_name: false ingestion_burst_size_mb: 5 ingestion_rate_mb: 2 ingestion_rate_strategy: global