diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a548d05a9251b..c0ae38cd4c0d8 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -516,6 +516,7 @@ func (t *Loki) initQuerier() (services.Service, error) { internalMiddlewares := []queryrangebase.Middleware{ serverutil.RecoveryMiddleware, queryrange.Instrument{Metrics: t.Metrics}, + queryrange.Tracer{}, } if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled { internalMiddlewares = append( diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 8571eef4e4071..08c3ca2868b78 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -572,6 +572,22 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht } } + // Add org id + orgID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + header.Set(user.OrgIDHeaderName, orgID) + + // Propagate trace context in request. + tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) + if tracer != nil && span != nil { + carrier := opentracing.HTTPHeadersCarrier(header) + if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil { + return nil, err + } + } + switch request := r.(type) { case *LokiRequest: params := url.Values{ @@ -725,6 +741,7 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht } } +// nolint:goconst func (c Codec) Path(r queryrangebase.Request) string { switch request := r.(type) { case *LokiRequest: diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 880143976439e..2869bb4b38652 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/gorilla/mux" + "github.com/grafana/dskit/user" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -41,6 +42,9 @@ var ( ) func Test_codec_EncodeDecodeRequest(t *testing.T) { + + ctx := user.InjectOrgID(context.Background(), "1") + tests := []struct { name string reqBuilder func() (*http.Request, error) @@ -108,7 +112,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) { }, NewLabelRequest(start, end, `{foo="bar"}`, "test", "/label/test/values"), false}, {"index_stats", func() (*http.Request, error) { - return DefaultCodec.EncodeRequest(context.Background(), &logproto.IndexStatsRequest{ + return DefaultCodec.EncodeRequest(ctx, &logproto.IndexStatsRequest{ From: model.TimeFromUnixNano(start.UnixNano()), Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, @@ -119,7 +123,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) { Matchers: `{job="foo"}`, }, false}, {"volume", func() (*http.Request, error) { - return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ + return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{ From: model.TimeFromUnixNano(start.UnixNano()), Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, @@ -138,7 +142,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) { AggregateBy: "labels", }, false}, {"volume_default_limit", func() (*http.Request, error) { - return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ + return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{ From: model.TimeFromUnixNano(start.UnixNano()), Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, @@ -152,7 +156,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) { AggregateBy: "series", }, false}, {"volume_range", func() (*http.Request, error) { - return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ + return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{ From: model.TimeFromUnixNano(start.UnixNano()), Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, @@ -170,7 +174,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) { AggregateBy: "series", }, false}, {"volume_range_default_limit", func() (*http.Request, error) { - return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ + return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{ From: model.TimeFromUnixNano(start.UnixNano()), Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, @@ -585,7 +589,8 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) { require.NoError(t, err) // queryrange.Response -> JSON - httpResp, err := codec.EncodeResponse(context.TODO(), httpReq, resp) + ctx := user.InjectOrgID(context.Background(), "1") + httpResp, err := codec.EncodeResponse(ctx, httpReq, resp) require.NoError(t, err) body, _ := io.ReadAll(httpResp.Body) @@ -596,11 +601,11 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) { func Test_codec_EncodeRequest(t *testing.T) { // we only accept LokiRequest. - got, err := DefaultCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{}) + ctx := user.InjectOrgID(context.Background(), "1") + got, err := DefaultCodec.EncodeRequest(ctx, &queryrangebase.PrometheusRequest{}) require.Error(t, err) require.Nil(t, got) - ctx := context.Background() toEncode := &LokiRequest{ Query: `{foo="bar"}`, Limit: 200, @@ -637,11 +642,11 @@ func Test_codec_EncodeRequest(t *testing.T) { } func Test_codec_series_EncodeRequest(t *testing.T) { - got, err := DefaultCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{}) + ctx := user.InjectOrgID(context.Background(), "1") + got, err := DefaultCodec.EncodeRequest(ctx, &queryrangebase.PrometheusRequest{}) require.Error(t, err) require.Nil(t, got) - ctx := context.Background() toEncode := &LokiSeriesRequest{ Match: []string{`{foo="bar"}`}, Path: "/series", @@ -666,7 +671,7 @@ func Test_codec_series_EncodeRequest(t *testing.T) { } func Test_codec_labels_EncodeRequest(t *testing.T) { - ctx := context.Background() + ctx := user.InjectOrgID(context.Background(), "1") toEncode := NewLabelRequest(start, end, "", "", "/loki/api/v1/labels") got, err := DefaultCodec.EncodeRequest(ctx, toEncode) require.NoError(t, err) @@ -703,7 +708,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) { } func Test_codec_labels_DecodeRequest(t *testing.T) { - ctx := context.Background() + ctx := user.InjectOrgID(context.Background(), "1") u, err := url.Parse(`/loki/api/v1/label/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`) require.NoError(t, err) @@ -732,7 +737,8 @@ func Test_codec_index_stats_EncodeRequest(t *testing.T) { Through: through, Matchers: `{job="foo"}`, } - got, err := DefaultCodec.EncodeRequest(context.Background(), toEncode) + ctx := user.InjectOrgID(context.Background(), "1") + got, err := DefaultCodec.EncodeRequest(ctx, toEncode) require.Nil(t, err) require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start")) require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end")) @@ -749,7 +755,8 @@ func Test_codec_seriesVolume_EncodeRequest(t *testing.T) { Step: 30 * 1e6, TargetLabels: []string{"foo", "bar"}, } - got, err := DefaultCodec.EncodeRequest(context.Background(), toEncode) + ctx := user.InjectOrgID(context.Background(), "1") + got, err := DefaultCodec.EncodeRequest(ctx, toEncode) require.Nil(t, err) require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start")) require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end")) @@ -760,6 +767,7 @@ func Test_codec_seriesVolume_EncodeRequest(t *testing.T) { } func Test_codec_seriesVolume_DecodeRequest(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "1") t.Run("instant queries set a step of 0", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/loki/api/v1/index/volume"+ @@ -767,7 +775,7 @@ func Test_codec_seriesVolume_DecodeRequest(t *testing.T) { "&end=1"+ "&step=42"+ "&query=%7Bfoo%3D%22bar%22%7D", nil) - got, err := DefaultCodec.DecodeRequest(context.Background(), req, nil) + got, err := DefaultCodec.DecodeRequest(ctx, req, nil) require.NoError(t, err) require.Equal(t, int64(0), got.(*logproto.VolumeRequest).Step) @@ -779,7 +787,7 @@ func Test_codec_seriesVolume_DecodeRequest(t *testing.T) { "&end=1"+ "&step=42"+ "&query=%7Bfoo%3D%22bar%22%7D", nil) - got, err := DefaultCodec.DecodeRequest(context.Background(), req, nil) + got, err := DefaultCodec.DecodeRequest(ctx, req, nil) require.NoError(t, err) require.Equal(t, (42 * time.Second).Milliseconds(), got.(*logproto.VolumeRequest).Step) @@ -790,7 +798,7 @@ func Test_codec_seriesVolume_DecodeRequest(t *testing.T) { "?start=0"+ "&end=1"+ "&query=%7Bfoo%3D%22bar%22%7D", nil) - got, err := DefaultCodec.DecodeRequest(context.Background(), req, nil) + got, err := DefaultCodec.DecodeRequest(ctx, req, nil) require.NoError(t, err) require.Equal(t, time.Second.Milliseconds(), got.(*logproto.VolumeRequest).Step) @@ -925,7 +933,8 @@ func Test_codec_EncodeResponse(t *testing.T) { URL: u, Header: h, } - got, err := DefaultCodec.EncodeResponse(context.TODO(), req, tt.res) + ctx := user.InjectOrgID(context.Background(), "1") + got, err := DefaultCodec.EncodeResponse(ctx, req, tt.res) if (err != nil) != tt.wantErr { t.Errorf("codec.EncodeResponse() error = %v, wantErr %v", err, tt.wantErr) return @@ -1424,6 +1433,7 @@ func (badResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader { ret func (b badResponse) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response { return b } +func (badResponse) SetHeader(string, string) {} type badReader struct{} diff --git a/pkg/querier/queryrange/instrument.go b/pkg/querier/queryrange/instrument.go index 8c32fad4ca304..497cfb2dd8a1a 100644 --- a/pkg/querier/queryrange/instrument.go +++ b/pkg/querier/queryrange/instrument.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/instrument" "github.com/grafana/dskit/middleware" + "github.com/opentracing/opentracing-go" "github.com/grafana/dskit/server" @@ -52,3 +53,18 @@ func (i Instrument) observe(ctx context.Context, route string, err error, durati } instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(method, route, respStatus, "false"), duration.Seconds()) } + +type Tracer struct{} + +var _ queryrangebase.Middleware = Tracer{} + +// Wrap implements the queryrangebase.Middleware +func (t Tracer) Wrap(next queryrangebase.Handler) queryrangebase.Handler { + return queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + route := DefaultCodec.Path(r) + route = middleware.MakeLabelValue(route) + span, ctx := opentracing.StartSpanFromContext(ctx, route) + defer span.Finish() + return next.Do(ctx, r) + }) +}