Skip to content

Commit

Permalink
feat: return real data in detected fields endpoint (#12421)
Browse files Browse the repository at this point in the history
parse log lines in the querier to return data about detected fields. this is not sustainable long term, but a short term solution to validate the data we want for the frontend.
  • Loading branch information
trevorwhitney authored Apr 2, 2024
1 parent 4c88be0 commit a53a0cc
Show file tree
Hide file tree
Showing 11 changed files with 658 additions and 248 deletions.
14 changes: 0 additions & 14 deletions pkg/loghttp/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,3 @@ func ParseDetectedLabelsQuery(r *http.Request) (*logproto.DetectedLabelsRequest,
Query: query(r),
}, nil
}

func ParseDetectedFieldsQuery(r *http.Request) (*logproto.DetectedFieldsRequest, error) {
req := &logproto.DetectedFieldsRequest{}

start, end, err := bounds(r)
if err != nil {
return nil, err
}
req.Start = &start
req.End = &end

req.Query = query(r)
return req, nil
}
23 changes: 23 additions & 0 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

const (
defaultQueryLimit = 100
defaultFieldLimit = 1000
defaultSince = 1 * time.Hour
defaultDirection = logproto.BACKWARD
)
Expand All @@ -34,6 +35,28 @@ func limit(r *http.Request) (uint32, error) {
return uint32(l), nil
}

func lineLimit(r *http.Request) (uint32, error) {
l, err := parseInt(r.Form.Get("line_limit"), defaultQueryLimit)
if err != nil {
return 0, err
}
if l <= 0 {
return 0, errors.New("limit must be a positive value")
}
return uint32(l), nil
}

func fieldLimit(r *http.Request) (uint32, error) {
l, err := parseInt(r.Form.Get("field_limit"), defaultFieldLimit)
if err != nil {
return 0, err
}
if l <= 0 {
return 0, errors.New("limit must be a positive value")
}
return uint32(l), nil
}

func query(r *http.Request) string {
return r.Form.Get("query")
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,48 @@ func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
}, nil
}

func ParseDetectedFieldsQuery(r *http.Request) (*logproto.DetectedFieldsRequest, error) {
var err error
result := &logproto.DetectedFieldsRequest{}

result.Query = query(r)
result.Start, result.End, err = bounds(r)
if err != nil {
return nil, err
}

if result.End.Before(result.Start) {
return nil, errEndBeforeStart
}

result.LineLimit, err = lineLimit(r)
if err != nil {
return nil, err
}

result.FieldLimit, err = fieldLimit(r)
if err != nil {
return nil, err
}

step, err := step(r, result.Start, result.End)
result.Step = step.Milliseconds()
if err != nil {
return nil, err
}

if result.Step <= 0 {
return nil, errZeroOrNegativeStep
}

// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (result.End.Sub(result.Start) / step) > 11000 {
return nil, errStepTooSmall
}
return result, nil
}

func targetLabels(r *http.Request) []string {
lbls := strings.Split(r.Form.Get("targetLabels"), ",")
if (len(lbls) == 1 && lbls[0] == "") || len(lbls) == 0 {
Expand Down
516 changes: 308 additions & 208 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,16 @@ message Volume {
message DetectedFieldsRequest {
google.protobuf.Timestamp start = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
(gogoproto.nullable) = false
];
string query = 3; // Naming this query instead of match because this should be with queryrangebase.Request interface
uint32 lineLimit = 4;
uint32 fieldLimit = 5;
int64 step = 6;
}

message DetectedFieldsResponse {
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.De
return nil, err
}
if resp == nil { // Some stores don't implement this
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "queried store for detected fields that does not support it, no response from querier.DetectedFields",
)
return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
}, nil
Expand Down
28 changes: 26 additions & 2 deletions pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package querier
import (
"context"
"fmt"
"strings"

"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"

Expand All @@ -29,12 +31,14 @@ const (
// MultiTenantQuerier is able to query across different tenants.
type MultiTenantQuerier struct {
Querier
logger log.Logger
}

// NewMultiTenantQuerier returns a new querier able to query across different tenants.
func NewMultiTenantQuerier(querier Querier, _ log.Logger) *MultiTenantQuerier {
func NewMultiTenantQuerier(querier Querier, logger log.Logger) *MultiTenantQuerier {
return &MultiTenantQuerier{
Querier: querier,
logger: logger,
}
}

Expand Down Expand Up @@ -258,6 +262,26 @@ func (q *MultiTenantQuerier) Volume(ctx context.Context, req *logproto.VolumeReq
return merged, nil
}

func (q *MultiTenantQuerier) DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

if len(tenantIDs) == 1 {
return q.Querier.DetectedFields(ctx, req)
}

level.Debug(q.logger).Log(
"msg", "detected fields requested for multiple tenants, but not yet supported",
"tenantIDs", strings.Join(tenantIDs, ","),
)

return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
}, nil
}

func (q *MultiTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
// TODO(shantanu)
tenantIDs, err := tenant.TenantID(ctx)
Expand Down Expand Up @@ -308,7 +332,7 @@ func replaceMatchers(expr syntax.Expr, matchers []*labels.Matcher) syntax.Expr {
}

// See https://github.com/grafana/mimir/blob/114ab88b50638a2047e2ca2a60640f6ca6fe8c17/pkg/querier/tenantfederation/tenant_federation.go#L29-L69
// filterValuesByMatchers applies matchers to inputed `idLabelName` and
// filterValuesByMatchers applies matchers to inputted `idLabelName` and
// `ids`. A set of matched IDs is returned and also all label matchers not
// targeting the `idLabelName` label.
//
Expand Down
Loading

0 comments on commit a53a0cc

Please sign in to comment.