Skip to content

Commit

Permalink
feat(detected_labels): Initial skeleton for the API (#12390)
Browse files Browse the repository at this point in the history
Co-authored-by: Cyril Tovena <[email protected]>
  • Loading branch information
shantanualsi and cyriltovena authored Apr 1, 2024
1 parent 0b7ff48 commit 5190dda
Show file tree
Hide file tree
Showing 19 changed files with 1,908 additions and 292 deletions.
13 changes: 13 additions & 0 deletions pkg/loghttp/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ func ParseLabelQuery(r *http.Request) (*logproto.LabelRequest, error) {
return req, nil
}

func ParseDetectedLabelsQuery(r *http.Request) (*logproto.DetectedLabelsRequest, error) {
start, end, err := bounds(r)
if err != nil {
return nil, err
}

return &logproto.DetectedLabelsRequest{
Start: &start,
End: &end,
Query: query(r),
}, nil
}

func ParseDetectedFieldsQuery(r *http.Request) (*logproto.DetectedFieldsRequest, error) {
req := &logproto.DetectedFieldsRequest{}

Expand Down
1,161 changes: 984 additions & 177 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,23 @@ message DetectedField {
string type = 2 [(gogoproto.casttype) = "DetectedFieldType"];
uint64 cardinality = 3;
}

message DetectedLabelsRequest {
google.protobuf.Timestamp start = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
];
google.protobuf.Timestamp end = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
];
string query = 3;
}

message DetectedLabelsResponse {
repeated DetectedLabel detectedLabels = 1;
}

message DetectedLabel {
string label = 1;
}
4 changes: 4 additions & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,7 @@ func extractShard(shards []string) *astmapper.ShardAnnotation {

return &shard
}

func RecordDetectedLabelsQueryMetrics(_ context.Context, _ log.Logger, _ time.Time, _ time.Time, _ string, _ string, _ logql_stats.Result) {
// TODO(shantanu) log metrics here
}
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/loki/api/v1/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/series").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/detected_fields").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/detected_labels").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/index/stats").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/index/shards").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/index/volume").Methods("GET", "POST").Handler(frontendHandler)
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ func (h *Handler) Do(ctx context.Context, req queryrangebase.Request) (queryrang
return &queryrange.DetectedFieldsResponse{
Response: result,
}, nil
case *queryrange.DetectedLabelsRequest:
result, err := h.api.DetectedLabelsHandler(ctx, &concrete.DetectedLabelsRequest)
if err != nil {
return nil, err
}

return &queryrange.DetectedLabelsResponse{Response: result}, nil
default:
return nil, fmt.Errorf("unsupported query type %T", req)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.E
return nil
}

// DetectedLabelsHandler returns a response for detected labels
func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
resp, err := q.querier.DetectedLabels(ctx, req)

if err != nil {
return nil, err
}
return resp, nil
}

// WrapQuerySpanAndTimeout applies a context deadline and a span logger to a query call.
//
// The timeout is based on the per-tenant query timeout configuration.
Expand Down
22 changes: 22 additions & 0 deletions pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,28 @@ func (q *MultiTenantQuerier) Volume(ctx context.Context, req *logproto.VolumeReq
return merged, nil
}

func (q *MultiTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
// TODO(shantanu)
tenantIDs, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

if len(tenantIDs) == 1 {
return q.Querier.DetectedLabels(ctx, req)
}

//resp := make([]*logproto.DetectedLabels, len(tenantIDs))

return &logproto.DetectedLabelsResponse{
DetectedLabels: []*logproto.DetectedLabel{
{Label: "cluster"},
{Label: "namespace"},
{Label: "instance"},
},
}, nil
}

// removeTenantSelector filters the given tenant IDs based on any tenant ID filter the in passed selector.
func removeTenantSelector(params logql.SelectSampleParams, tenantIDs []string) (map[string]struct{}, syntax.Expr, error) {
expr, err := params.Expr()
Expand Down
11 changes: 11 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Querier interface {
IndexShards(ctx context.Context, req *loghttp.RangeQuery, targetBytesPerShard uint64) (*logproto.ShardsResponse, error)
Volume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error)
DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error)
}

type Limits querier_limits.Limits
Expand Down Expand Up @@ -910,3 +911,13 @@ func (q *SingleTenantQuerier) DetectedFields(_ context.Context, _ *logproto.Dete
},
}, nil
}

func (q *SingleTenantQuerier) DetectedLabels(_ context.Context, _ *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
return &logproto.DetectedLabelsResponse{
DetectedLabels: []*logproto.DetectedLabel{
{Label: "namespace"},
{Label: "cluster"},
{Label: "instance"},
},
}, nil
}
12 changes: 12 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,18 @@ func (q *querierMock) DetectedFields(ctx context.Context, req *logproto.Detected
return resp.(*logproto.DetectedFieldsResponse), err
}

func (q *querierMock) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
args := q.MethodCalled("DetectedFields", ctx, req)

resp := args.Get(0)
err := args.Error(1)
if resp == nil {
return nil, err
}

return resp.(*logproto.DetectedLabelsResponse), err
}

type engineMock struct {
util.ExtendedMock
}
Expand Down
123 changes: 121 additions & 2 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"errors"
"fmt"
io "io"
"io"
"net/http"
"net/url"
"regexp"
"sort"
strings "strings"
"strings"
"time"

"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
Expand Down Expand Up @@ -261,6 +261,80 @@ func (r *LabelRequest) Path() string {

func (*LabelRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return }

type DetectedLabelsRequest struct {
path string
logproto.DetectedLabelsRequest
}

// NewDetectedLabelsRequest creates a new request for detected labels
func NewDetectedLabelsRequest(start, end time.Time, query, path string) *DetectedLabelsRequest {
return &DetectedLabelsRequest{
DetectedLabelsRequest: logproto.DetectedLabelsRequest{
Start: &start,
End: &end,
Query: query,
},
path: path,
}
}

func (r *DetectedLabelsRequest) AsProto() *logproto.DetectedLabelsRequest {
return &r.DetectedLabelsRequest
}

func (r *DetectedLabelsRequest) GetEnd() time.Time {
return *r.End
}

func (r *DetectedLabelsRequest) GetEndTs() time.Time {
return *r.End
}

func (r *DetectedLabelsRequest) GetStart() time.Time {
return *r.Start
}

func (r *DetectedLabelsRequest) GetStartTs() time.Time {
return *r.Start
}

func (r *DetectedLabelsRequest) GetStep() int64 {
return 0
}

func (r *DetectedLabelsRequest) WithStartEnd(s, e time.Time) queryrangebase.Request {
clone := *r
clone.Start = &s
clone.End = &e
return &clone
}

// WithStartEndForCache implements resultscache.Request.
func (r *DetectedLabelsRequest) WithStartEndForCache(s time.Time, e time.Time) resultscache.Request {
return r.WithStartEnd(s, e).(resultscache.Request)
}

func (r *DetectedLabelsRequest) WithQuery(query string) queryrangebase.Request {
clone := *r
clone.Query = query
return &clone
}

func (r *DetectedLabelsRequest) LogToSpan(sp opentracing.Span) {
sp.LogFields(
otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()),
otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()),
)
}

func (r *DetectedLabelsRequest) Path() string {
return r.path
}

func (*DetectedLabelsRequest) GetCachingOptions() (res queryrangebase.CachingOptions) {
return
}

func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (queryrangebase.Request, error) {
if err := r.ParseForm(); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down Expand Up @@ -399,6 +473,16 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer
DetectedFieldsRequest: *req,
path: r.URL.Path,
}, nil
case DetectedLabelsOp:
req, err := loghttp.ParseDetectedLabelsQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return &DetectedLabelsRequest{
DetectedLabelsRequest: *req,
path: r.URL.Path,
}, nil
default:
return nil, httpgrpc.Errorf(http.StatusNotFound, fmt.Sprintf("unknown request path: %s", r.URL.Path))
}
Expand Down Expand Up @@ -602,6 +686,15 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest)
DetectedFieldsRequest: *req,
path: httpReq.URL.Path,
}, ctx, nil
case DetectedLabelsOp:
req, err := loghttp.ParseDetectedLabelsQuery(httpReq)
if err != nil {
return nil, ctx, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return &DetectedLabelsRequest{
DetectedLabelsRequest: *req,
path: httpReq.URL.Path,
}, ctx, err
default:
return nil, ctx, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf("unknown request path in HTTP gRPC decode: %s", r.Url))
}
Expand Down Expand Up @@ -878,6 +971,26 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
Header: header,
}

return req.WithContext(ctx), nil
case *DetectedLabelsRequest:
params := url.Values{
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
"query": []string{request.GetQuery()},
}

u := &url.URL{
Path: "/loki/api/v1/detected_labels",
RawQuery: params.Encode(),
}
req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: header,
}

return req.WithContext(ctx), nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, fmt.Sprintf("invalid request format, got (%T)", r))
Expand Down Expand Up @@ -906,6 +1019,8 @@ func (c Codec) Path(r queryrangebase.Request) string {
return "/loki/api/v1/index/volume_range"
case *DetectedFieldsRequest:
return "/loki/api/v1/detected_fields"
case *DetectedLabelsRequest:
return "/loki/api/v1/detected_labels"
}

return "other"
Expand Down Expand Up @@ -1234,6 +1349,10 @@ func encodeResponseJSONTo(version loghttp.Version, res queryrangebase.Response,
if err := marshal.WriteDetectedFieldsResponseJSON(response.Response, w); err != nil {
return err
}
case *DetectedLabelsResponse:
if err := marshal.WriteDetectedLabelsResponseJSON(response.Response, w); err != nil {
return err
}
default:
return httpgrpc.Errorf(http.StatusInternalServerError, fmt.Sprintf("invalid response format, got (%T)", res))
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/queryrange/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,20 @@ func (m *DetectedFieldsResponse) WithHeaders(h []queryrangebase.PrometheusRespon
m.Headers = h
return m
}

// GetHeaders returns the HTTP headers in the response.
func (m *DetectedLabelsResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
if m != nil {
return convertPrometheusResponseHeadersToPointers(m.Headers)
}
return nil
}

func (m *DetectedLabelsResponse) SetHeader(name, value string) {
m.Headers = setHeader(m.Headers, name, value)
}

func (m *DetectedLabelsResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Headers = h
return m
}
10 changes: 10 additions & 0 deletions pkg/querier/queryrange/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ func QueryResponseUnwrap(res *QueryResponse) (queryrangebase.Response, error) {
return concrete.TopkSketches, nil
case *QueryResponse_QuantileSketches:
return concrete.QuantileSketches, nil
case *QueryResponse_DetectedLabels:
return concrete.DetectedLabels, nil
default:
return nil, fmt.Errorf("unsupported QueryResponse response type, got (%T)", res.Response)
}
Expand Down Expand Up @@ -247,6 +249,8 @@ func QueryResponseWrap(res queryrangebase.Response) (*QueryResponse, error) {
p.Response = &QueryResponse_QuantileSketches{response}
case *ShardsResponse:
p.Response = &QueryResponse_ShardsResponse{response}
case *DetectedLabelsResponse:
p.Response = &QueryResponse_DetectedLabels{response}
default:
return nil, fmt.Errorf("invalid response format, got (%T)", res)
}
Expand Down Expand Up @@ -335,6 +339,10 @@ func (Codec) QueryRequestUnwrap(ctx context.Context, req *QueryRequest) (queryra
return &LabelRequest{
LabelRequest: *concrete.Labels,
}, ctx, nil
case *QueryRequest_DetectedLabels:
return &DetectedLabelsRequest{
DetectedLabelsRequest: *concrete.DetectedLabels,
}, ctx, nil
default:
return nil, ctx, fmt.Errorf("unsupported request type while unwrapping, got (%T)", req.Request)
}
Expand All @@ -361,6 +369,8 @@ func (Codec) QueryRequestWrap(ctx context.Context, r queryrangebase.Request) (*Q
result.Request = &QueryRequest_Streams{Streams: req}
case *logproto.ShardsRequest:
result.Request = &QueryRequest_ShardsRequest{ShardsRequest: req}
case *DetectedLabelsRequest:
result.Request = &QueryRequest_DetectedLabels{DetectedLabels: &req.DetectedLabelsRequest}
default:
return nil, fmt.Errorf("unsupported request type while wrapping, got (%T)", r)
}
Expand Down
Loading

0 comments on commit 5190dda

Please sign in to comment.