Skip to content

Commit

Permalink
Turn frontend Tripperware into a Middleware. (#10688)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Currently, a request to Loki's frontend API goes through these
conversions:
```
        http.Request 
            ↓ limitedRoundTripper
        queryrangebase.Request
            ↓ queryrangebase.Middlware
        … 
            ↓
        queryrangebase.Request
            ↓ limitedRoundTripper  
        http.Request
            ↓ grpcRoundTripperAdapter
        httpgrpc
            ↓ grpcRoundTripperAdapter
        http.Response
            ↓ limitedRoundTripper
        queryrangebase.Response
            ↓ limitedRoundtripper
        http.Response
```

Since `httgrpc` and `queryrangebase.Request` are Protobufs there's no
good reason to encode and decode them to HTTP responses/requests.
Furthermore, the encoding to HTTP makes it harder for us to encode query
plans.

Thus the conversions are changed to the following:
```
        http.Request 
            ↓
        queryrangebase.Request
            ↓ queryrangebase.Middlware
        … 
            ↓
        queryrangebase.Request
            ↓
        httpgrpc
            ↓
        queryrangebase.Response
            ↓
        http.Response
```

In order to achieve this the `http.RoundTripper` is pushed to the
outside. Only the serialization layer from `http.Request` to
`queryrangebase.Request` and `http.Response` to
`queryrangebase.Response` will be an `http.RoundTripper`. Everything
else is either a `queryrangebase.Handler` or
`queryrangebase.Middleware`.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
jeschkies authored Oct 23, 2023
1 parent 6069df8 commit 6948c4a
Show file tree
Hide file tree
Showing 28 changed files with 844 additions and 1,229 deletions.
19 changes: 5 additions & 14 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ type Loki struct {
runtimeConfig *runtimeconfig.Manager
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware queryrangebase.Tripperware
QueryFrontEndMiddleware queryrangebase.Middleware
queryScheduler *scheduler.Scheduler
querySchedulerRingManager *lokiring.RingManager
usageReport *analytics.Reporter
Expand Down Expand Up @@ -590,7 +590,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(IngesterQuerier, t.initIngesterQuerier)
mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendTripperware, modules.UserInvisibleModule)
mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendMiddleware, modules.UserInvisibleModule)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule)
mm.RegisterModule(Ruler, t.initRuler)
Expand Down Expand Up @@ -653,26 +653,17 @@ func (t *Loki) setupModuleManager() error {
level.Debug(util_log.Logger).Log("msg", "per-query request limits support enabled")
mm.RegisterModule(QueryLimiter, t.initQueryLimiter, modules.UserInvisibleModule)
mm.RegisterModule(QueryLimitsInterceptors, t.initQueryLimitsInterceptors, modules.UserInvisibleModule)
mm.RegisterModule(QueryLimitsTripperware, t.initQueryLimitsTripperware, modules.UserInvisibleModule)

// This module is defunct but the target remains for backwards compatibility.
mm.RegisterModule(QueryLimitsTripperware, func() (services.Service, error) { return nil, nil }, modules.UserInvisibleModule)

// Ensure query limiter embeds overrides after they've been
// created.
deps[QueryLimiter] = []string{Overrides}
deps[QueryLimitsInterceptors] = []string{}

// Ensure query limits tripperware embeds the query frontend
// tripperware after it's been created. Any additional
// middleware/tripperware you want to add to the querier or
// frontend must happen inject a dependence on the query limits
// tripperware.
deps[QueryLimitsTripperware] = []string{QueryFrontendTripperware}

deps[Querier] = append(deps[Querier], QueryLimiter)

// The frontend receives a tripperware. Make sure it uses the
// wrapped one.
deps[QueryFrontend] = append(deps[QueryFrontend], QueryLimitsTripperware)

// query frontend tripperware uses t.Overrides. Make sure it
// uses the one wrapped by query limiter.
deps[QueryFrontendTripperware] = append(deps[QueryFrontendTripperware], QueryLimiter)
Expand Down
23 changes: 8 additions & 15 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,10 +781,10 @@ type disabledShuffleShardingLimits struct{}

func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) int { return 0 }

func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")

tripperware, stopper, err := queryrange.NewTripperware(
middleware, stopper, err := queryrange.NewMiddleware(
t.Cfg.QueryRange,
t.Cfg.Querier.Engine,
util_log.Logger,
Expand All @@ -797,7 +797,7 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
return
}
t.stopper = stopper
t.QueryFrontEndTripperware = tripperware
t.QueryFrontEndMiddleware = middleware

return services.NewIdleService(nil, nil), nil
}
Expand Down Expand Up @@ -864,13 +864,15 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
FrontendV2: t.Cfg.Frontend.FrontendV2,
DownstreamURL: t.Cfg.Frontend.DownstreamURL,
}
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
frontendTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
combinedCfg,
scheduler.SafeReadRing(t.Cfg.QueryScheduler, t.querySchedulerRingManager),
disabledShuffleShardingLimits{},
t.Cfg.Server.GRPCListenPort,
util_log.Logger,
prometheus.DefaultRegisterer)
prometheus.DefaultRegisterer,
queryrange.DefaultCodec,
)
if err != nil {
return nil, err
}
Expand All @@ -887,7 +889,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "no query frontend configured")
}

roundTripper = t.QueryFrontEndTripperware(roundTripper)
roundTripper := queryrange.NewSerializeRoundTripper(t.QueryFrontEndMiddleware.Wrap(frontendTripper), queryrange.DefaultCodec)

frontendHandler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
if t.Cfg.Frontend.CompressResponses {
Expand Down Expand Up @@ -1477,15 +1479,6 @@ func (t *Loki) initQueryLimitsInterceptors() (services.Service, error) {
return nil, nil
}

func (t *Loki) initQueryLimitsTripperware() (services.Service, error) {
_ = level.Debug(util_log.Logger).Log("msg", "initializing query limits tripperware")
t.QueryFrontEndTripperware = querylimits.WrapTripperware(
t.QueryFrontEndTripperware,
)

return nil, nil
}

func (t *Loki) initAnalytics() (services.Service, error) {
if !t.Cfg.Analytics.Enabled {
return nil, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/lokifrontend/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
v1 "github.com/grafana/loki/pkg/lokifrontend/frontend/v1"
v2 "github.com/grafana/loki/pkg/lokifrontend/frontend/v2"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/util"
)

Expand All @@ -38,7 +39,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg CombinedFrontendConfig, ring ring.ReadRing, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
func InitFrontend(cfg CombinedFrontendConfig, ring ring.ReadRing, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer, codec transport.Codec) (queryrangebase.Handler, *v1.Frontend, *v2.Frontend, error) {
switch {
case cfg.DownstreamURL != "":
// If the user has specified a downstream Prometheus, then we should use that.
Expand All @@ -59,15 +60,15 @@ func InitFrontend(cfg CombinedFrontendConfig, ring ring.ReadRing, limits v1.Limi
cfg.FrontendV2.Port = grpcListenPort
}

fr, err := v2.NewFrontend(cfg.FrontendV2, ring, log, reg)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err
fr, err := v2.NewFrontend(cfg.FrontendV2, ring, log, reg, codec)
return fr, nil, fr, err

default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
if err != nil {
return nil, nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
return transport.AdaptGrpcRoundTripperToHandler(fr, codec), fr, nil, nil
}
}
36 changes: 32 additions & 4 deletions pkg/lokifrontend/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package frontend

import (
"context"
"fmt"
"net/http"
"net/url"
"path"

"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)

// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
codec queryrangebase.Codec
}

func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (queryrangebase.Handler, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
Expand All @@ -23,8 +29,19 @@ func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper
return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
}

func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(r.Context())
func (d downstreamRoundTripper) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)

var r *http.Request

r, err := d.codec.EncodeRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("connot convert request ot HTTP request: %w", err)
}
if err := user.InjectOrgIDIntoHTTPRequest(ctx, r); err != nil {
return nil, err
}

if tracer != nil && span != nil {
carrier := opentracing.HTTPHeadersCarrier(r.Header)
err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier)
Expand All @@ -37,5 +54,16 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
r.Host = ""
return d.transport.RoundTrip(r)

httpResp, err := d.transport.RoundTrip(r)
if err != nil {
return nil, err
}

resp, err := d.codec.DecodeResponse(ctx, httpResp, req)
if err != nil {
return nil, fmt.Errorf("cannot convert HTTP response to response: %w", err)
}

return resp, nil
}
34 changes: 34 additions & 0 deletions pkg/lokifrontend/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
querier_stats "github.com/grafana/loki/pkg/querier/stats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
Expand Down Expand Up @@ -252,3 +254,35 @@ func statsValue(name string, d time.Duration) string {
durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64)
return name + ";dur=" + durationInMs
}

func AdaptGrpcRoundTripperToHandler(r GrpcRoundTripper, codec Codec) queryrangebase.Handler {
return &grpcRoundTripperToHandlerAdapter{roundTripper: r, codec: codec}
}

// This adapter wraps GrpcRoundTripper and converts it into a queryrangebase.Handler
type grpcRoundTripperToHandlerAdapter struct {
roundTripper GrpcRoundTripper
codec Codec
}

func (a *grpcRoundTripperToHandlerAdapter) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
httpReq, err := a.codec.EncodeRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("cannot convert request to HTTP request: %w", err)
}
if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil {
return nil, err
}

grpcReq, err := server.HTTPRequest(httpReq)
if err != nil {
return nil, fmt.Errorf("cannot convert HTTP request to gRPC request: %w", err)
}

grpcResp, err := a.roundTripper.RoundTripGRPC(ctx, grpcReq)
if err != nil {
return nil, err
}

return a.codec.DecodeHTTPGrpcResponse(grpcResp, req)
}
48 changes: 5 additions & 43 deletions pkg/lokifrontend/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,19 @@
package transport

import (
"bytes"
"context"
"io"
"net/http"

"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)

// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
return &grpcRoundTripperAdapter{roundTripper: r}
}

// This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper
type grpcRoundTripperAdapter struct {
roundTripper GrpcRoundTripper
}

type buffer struct {
buff []byte
io.ReadCloser
}

func (b *buffer) Bytes() []byte {
return b.buff
}

func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
req, err := server.HTTPRequest(r)
if err != nil {
return nil, err
}

resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
if err != nil {
return nil, err
}

httpResp := &http.Response{
StatusCode: int(resp.Code),
Body: &buffer{buff: resp.Body, ReadCloser: io.NopCloser(bytes.NewReader(resp.Body))},
Header: http.Header{},
ContentLength: int64(len(resp.Body)),
}
for _, h := range resp.Headers {
httpResp.Header[h.Key] = h.Values
}
return httpResp, nil
type Codec interface {
queryrangebase.Codec
DecodeHTTPGrpcResponse(r *httpgrpc.HTTPResponse, req queryrangebase.Request) (queryrangebase.Response, error)
}
11 changes: 6 additions & 5 deletions pkg/lokifrontend/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
"github.com/grafana/loki/pkg/querier/queryrange"
Expand All @@ -44,7 +45,7 @@ const (

func TestFrontend(t *testing.T) {
handler := queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
return &queryrange.LokiLabelNamesResponse{Data: []string{"Hello", "world"}}, nil
return &queryrange.LokiLabelNamesResponse{Data: []string{"Hello", "world"}, Version: uint32(loghttp.VersionV1)}, nil
})
test := func(addr string, _ *Frontend) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", addr, labelQuery), nil)
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestFrontendPropagateTrace(t *testing.T) {
traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID())
observedTraceID <- traceID

return &queryrange.LokiLabelNamesResponse{Data: []string{"Hello", "world"}}, nil
return &queryrange.LokiLabelNamesResponse{Data: []string{"Hello", "world"}, Version: uint32(loghttp.VersionV1)}, nil
})

test := func(addr string, _ *Frontend) {
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestFrontendCancel(t *testing.T) {

func TestFrontendMetricsCleanup(t *testing.T) {
handler := queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
return &queryrange.LokiLabelNamesResponse{Data: []string{"Hello", "world"}}, nil
return &queryrange.LokiLabelNamesResponse{Data: []string{"Hello", "world"}, Version: uint32(loghttp.VersionV1)}, nil
})

for _, matchMaxConcurrency := range []bool{false, true} {
Expand Down Expand Up @@ -260,12 +261,12 @@ func testFrontend(t *testing.T, config Config, handler queryrangebase.Handler, t
handlerCfg := transport.HandlerConfig{}
flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
rt := queryrange.NewSerializeHTTPHandler(transport.AdaptGrpcRoundTripperToHandler(v1, queryrange.DefaultCodec), queryrange.DefaultCodec)
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil)))
).Wrap(rt))

httpServer := http.Server{
Handler: r,
Expand Down
Loading

0 comments on commit 6948c4a

Please sign in to comment.