Skip to content

Commit

Permalink
Do not attempt to shard quantile_over_time if feature flag is disable…
Browse files Browse the repository at this point in the history
…d. (#11628)

**What this PR does / why we need it**:
If `quantile_over_time` sharding is disabled the code would still shard
it naively. That was a bug.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)
  • Loading branch information
jeschkies authored Jan 10, 2024
1 parent 3cb2a6b commit 24fa648
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
22 changes: 11 additions & 11 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,7 @@ var rangeMergeMap = map[string]string{

func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) {
if !expr.Shardable() {
exprStats, err := m.shards.GetStats(expr)
if err != nil {
return nil, 0, err
}
return expr, exprStats.Bytes, nil
return m.noOp(expr)
}

switch expr.Operation {
Expand Down Expand Up @@ -437,7 +433,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return nil, 0, err
}
if shards == 0 || !m.quantileOverTimeSharding {
return m.mapSampleExpr(expr, r)
return m.noOp(expr)
}

// quantile_over_time() by (foo) ->
Expand Down Expand Up @@ -465,12 +461,16 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

default:
// don't shard if there's not an appropriate optimization
exprStats, err := m.shards.GetStats(expr)
if err != nil {
return nil, 0, err
}
return expr, exprStats.Bytes, nil
return m.noOp(expr)
}
}

func (m ShardMapper) noOp(expr *syntax.RangeAggregationExpr) (syntax.SampleExpr, uint64, error) {
exprStats, err := m.shards.GetStats(expr)
if err != nil {
return nil, 0, err
}
return expr, exprStats.Bytes, nil
}

func badASTMapping(got syntax.Expr) error {
Expand Down
25 changes: 24 additions & 1 deletion pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestMappingStrings(t *testing.T) {
}

func TestMapping(t *testing.T) {
m := NewShardMapper(ConstantShards(2), nilShardMetrics, []string{ShardQuantileOverTime})
m := NewShardMapper(ConstantShards(2), nilShardMetrics, []string{})

for _, tc := range []struct {
in string
Expand Down Expand Up @@ -1340,6 +1340,25 @@ func TestMapping(t *testing.T) {
},
},
},
{
in: `quantile_over_time(0.8, {foo="bar"} | unwrap bytes [5m]) by (cluster)`,
expr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeQuantile,
Params: float64p(0.8),
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Unwrap: &syntax.UnwrapExpr{
Identifier: "bytes",
},
Interval: 5 * time.Minute,
},
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
},
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := syntax.ParseExpr(tc.in)
Expand Down Expand Up @@ -1420,3 +1439,7 @@ func TestStringTrimming(t *testing.T) {
})
}
}

func float64p(v float64) *float64 {
return &v
}

0 comments on commit 24fa648

Please sign in to comment.