Skip to content

Commit

Permalink
Send query plan to querier. (#11246)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Following #11123 and in order to
enable #10417 the query frontend
should send the serialized LogQL AST instead of the query string to the
queriers. This enables the frontend to change the AST and inject
expressions that are not expressible in LogQL.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

---------

Signed-off-by: Callum Styan <[email protected]>
Co-authored-by: Callum Styan <[email protected]>
  • Loading branch information
jeschkies and cstyan authored Nov 22, 2023
1 parent 162bbb1 commit 5b97fcf
Show file tree
Hide file tree
Showing 43 changed files with 932 additions and 530 deletions.
10 changes: 8 additions & 2 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.

ctx = user.InjectOrgID(ctx, f.orgID)

params := logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
q,
t, t,
0,
Expand All @@ -78,6 +78,9 @@ func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.
uint32(limit),
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
}

query := f.engine.Query(params)

Expand Down Expand Up @@ -106,7 +109,7 @@ func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time

ctx = user.InjectOrgID(ctx, f.orgID)

params := logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
queryStr,
start,
end,
Expand All @@ -116,6 +119,9 @@ func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time
uint32(limit),
nil,
)
if err != nil {
return nil, err
}

query := f.engine.Query(params)

Expand Down
22 changes: 18 additions & 4 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
var query logql.Query

if q.isInstant() {
query = eng.Query(logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
q.QueryString,
q.Start,
q.Start,
Expand All @@ -460,9 +460,14 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
q.resultsDirection(),
uint32(q.Limit),
nil,
))
)
if err != nil {
return err
}

query = eng.Query(params)
} else {
query = eng.Query(logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
q.QueryString,
q.Start,
q.End,
Expand All @@ -471,7 +476,16 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
q.resultsDirection(),
uint32(q.Limit),
nil,
))
)
if err != nil {
return err
}

query = eng.Query(params)
}

if err != nil {
return err
}

// execute the query
Expand Down
5 changes: 4 additions & 1 deletion pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,10 @@ func (t *testQueryClient) Query(_ string, _ int, _ time.Time, _ logproto.Directi
func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, _ bool) (*loghttp.QueryResponse, error) {
ctx := user.InjectOrgID(context.Background(), "fake")

params := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
params, err := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
if err != nil {
return nil, err
}

v, err := t.engine.Query(params).Exec(ctx)
if err != nil {
Expand Down
50 changes: 24 additions & 26 deletions pkg/logproto/indexgateway.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/logproto/indexgateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package indexgatewaypb;

import "gogoproto/gogo.proto";
import "pkg/logproto/logproto.proto";

option go_package = "github.com/grafana/loki/pkg/logproto";
Expand Down
78 changes: 38 additions & 40 deletions pkg/logproto/sketch.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/logproto/sketch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package logproto;

import "gogoproto/gogo.proto";
import "pkg/logproto/logproto.proto";

option go_package = "github.com/grafana/loki/pkg/logproto";
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
return false
}

query := qb.q.params.Query()
typ, err := QueryType(query)
query := qb.q.params.QueryString()
typ, err := QueryType(qb.q.params.GetExpression())
if err != nil {
typ = "unknown"
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/logql/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,10 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
limits.blockedQueries = test.blocked

q := eng.Query(LiteralParams{
qs: test.q,
start: time.Unix(0, 0),
end: time.Unix(100000, 0),
step: 60 * time.Second,
direction: logproto.FORWARD,
limit: 1000,
})
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
params, err := NewLiteralParams(test.q, time.Unix(0, 0), time.Unix(100000, 0), 60*time.Second, 0, logproto.FORWARD, 1000, nil)
require.NoError(t, err)
q := eng.Query(params)
_, err = q.Exec(user.InjectOrgID(context.Background(), "fake"))

if test.expectedErr == nil {
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 5b97fcf

Please sign in to comment.