Skip to content

Commit

Permalink
chore: refactor detected fields handler (grafana#14288)
Browse files Browse the repository at this point in the history
Co-authored-by: Paul Rogers <[email protected]>
  • Loading branch information
2 people authored and jeschkies committed Oct 1, 2024
1 parent 90b4558 commit 1d7bfee
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 65 deletions.
90 changes: 47 additions & 43 deletions pkg/querier/queryrange/detected_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,55 +27,59 @@ func NewDetectedFieldsHandler(
limitedHandler base.Handler,
logHandler base.Handler,
limits Limits,
) base.Middleware {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, req base.Request) (base.Response, error) {
r, ok := req.(*DetectedFieldsRequest)
if !ok {
return nil, httpgrpc.Errorf(
http.StatusBadRequest,
"invalid request type, expected *DetectedFieldsRequest",
)
}
) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, req base.Request) (base.Response, error) {
r, ok := req.(*DetectedFieldsRequest)
if !ok {
return nil, httpgrpc.Errorf(
http.StatusBadRequest,
"invalid request type, expected *DetectedFieldsRequest",
)
}

resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
if err != nil {
return nil, err
}
resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
if err != nil {
return nil, err
}

re, ok := resp.(*LokiResponse)
if !ok || re.Status != "success" {
return resp, nil
re, ok := resp.(*LokiResponse)
if !ok || re.Status != "success" {
return resp, nil
}

detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result)
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
p := v.parsers
if len(p) == 0 {
p = nil
}
fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
}

detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result)
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
p := v.parsers
if len(p) == 0 {
p = nil
}
fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
}
fieldCount++
}

fieldCount++
}
dfResp := DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: fields,
},
Headers: re.Headers,
}

// Otherwise all they get is the field limit, which is a bit confusing
if len(fields) > 0 {
dfResp.Response.FieldLimit = r.GetFieldLimit()
}

return &DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: r.GetFieldLimit(),
},
Headers: re.Headers,
}, nil
})
})
return &dfResp, nil
})
}

func makeDownstreamRequest(
Expand Down
25 changes: 5 additions & 20 deletions pkg/querier/queryrange/detected_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,10 +1028,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
limitedHandler(mockLogfmtStreamWithLabels(1, 5, `{type="test", name="foo"}`)),
logHandler(mockLogfmtStreamWithLabels(1, 5, `{type="test", name="foo"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)

detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
Expand All @@ -1058,10 +1055,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
limitedHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 5, `{type="test", name="bob"}`)),
logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 5, `{type="test", name="bob"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)

detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
Expand Down Expand Up @@ -1090,10 +1084,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
limitedHandler(mockLogfmtStreamWithLabels(1, 2, `{type="test", name="foo"}`)),
logHandler(mockLogfmtStreamWithLabels(1, 2, `{type="test", name="foo"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)

detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
Expand Down Expand Up @@ -1136,10 +1127,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
),
logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)

detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
Expand Down Expand Up @@ -1188,10 +1176,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
),
logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test", name="bob"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)

detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
Expand Down
3 changes: 1 addition & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,6 @@ func NewDetectedFieldsTripperware(
limitedHandler := limitedTripperware.Wrap(next)
logHandler := logTripperware.Wrap(next)

detectedFieldsHandler := NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
return NewLimitedRoundTripper(next, limits, schema.Configs, detectedFieldsHandler)
return NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
}), nil
}

0 comments on commit 1d7bfee

Please sign in to comment.