Skip to content

Commit

Permalink
Tweak sub queries without offset before caching
Browse files Browse the repository at this point in the history
Signed-off-by: Kaviraj <[email protected]>
  • Loading branch information
kavirajk committed Jan 30, 2024
1 parent e4fbe8f commit 27dcfe6
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/metadata"
"github.com/grafana/loki/pkg/logqlmodel/stats"
Expand Down Expand Up @@ -103,22 +104,39 @@ type instance struct {
handler queryrangebase.Handler
}

// withoutOffset returns the given DownstreamQuery with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is.
func withoutOffset(query logql.DownstreamQuery) logql.DownstreamQuery {
// withoutOffset returns the given query string with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is.
func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) {
expr := query.Params.GetExpression()
expr.Walk()
var (
newStart, newEnd time.Time
)
expr.Walk(func(e syntax.Expr) {
switch rng := e.(type) {
case *syntax.RangeAggregationExpr:
off := rng.Left.Offset
rng.Left.Offset = 0 // remove offset
newEnd = query.Params.End().Add(-off)
newStart = query.Params.Start()

if query.Params.Start() == query.Params.End() { // instant query
newStart = newStart.Add(-off)
}
}
})

return expr.String(), newStart, newEnd
}

func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery) ([]logqlmodel.Result, error) {
return in.For(ctx, queries, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) {
qry = withoutOffset(qry)
qs, newStart, newEnd := withoutOffset(qry)

req := ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String())
req := ParamsToLokiRequest(qry.Params).WithQuery(qs).WithStartEnd(newStart, newEnd)
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
defer logger.Finish()
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler))
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down

0 comments on commit 27dcfe6

Please sign in to comment.