Skip to content

Commit

Permalink
chore: refactor detected fields handler
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Sep 26, 2024
1 parent f39cdbd commit a430bbf
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 45 deletions.
90 changes: 47 additions & 43 deletions pkg/querier/queryrange/detected_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,55 +27,59 @@ func NewDetectedFieldsHandler(
limitedHandler base.Handler,
logHandler base.Handler,
limits Limits,
) base.Middleware {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, req base.Request) (base.Response, error) {
r, ok := req.(*DetectedFieldsRequest)
if !ok {
return nil, httpgrpc.Errorf(
http.StatusBadRequest,
"invalid request type, expected *DetectedFieldsRequest",
)
}
) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, req base.Request) (base.Response, error) {
r, ok := req.(*DetectedFieldsRequest)
if !ok {
return nil, httpgrpc.Errorf(
http.StatusBadRequest,
"invalid request type, expected *DetectedFieldsRequest",
)
}

resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
if err != nil {
return nil, err
}
resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
if err != nil {
return nil, err
}

re, ok := resp.(*LokiResponse)
if !ok || re.Status != "success" {
return resp, nil
re, ok := resp.(*LokiResponse)
if !ok || re.Status != "success" {
return resp, nil
}

detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result)
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
p := v.parsers
if len(p) == 0 {
p = nil
}
fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
}

detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result)
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
p := v.parsers
if len(p) == 0 {
p = nil
}
fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
}
fieldCount++
}

fieldCount++
}
dfResp := DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: fields,
},
Headers: re.Headers,
}

// Otherwise all they get is the field limit, which is a bit confusing
if len(fields) > 0 {
dfResp.Response.FieldLimit = r.GetFieldLimit()
}

return &DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: r.GetFieldLimit(),
},
Headers: re.Headers,
}, nil
})
})
return &dfResp, nil
})
}

func makeDownstreamRequest(
Expand Down
3 changes: 1 addition & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,6 @@ func NewDetectedFieldsTripperware(
limitedHandler := limitedTripperware.Wrap(next)
logHandler := logTripperware.Wrap(next)

detectedFieldsHandler := NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
return NewLimitedRoundTripper(next, limits, schema.Configs, detectedFieldsHandler)
return NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
}), nil
}

0 comments on commit a430bbf

Please sign in to comment.