Skip to content

Commit

Permalink
fix(querytags): propagate query tags to ingesters (#12267)
Browse files Browse the repository at this point in the history
Co-authored-by: Travis Patterson <[email protected]>
  • Loading branch information
ashwanthgoli and MasslessParticle authored Mar 20, 2024
1 parent 74b2d04 commit ef742c4
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 41 deletions.
4 changes: 4 additions & 0 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"time"

"github.com/grafana/loki/pkg/util/server"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
Expand Down Expand Up @@ -88,6 +90,7 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
if !cfg.Internal {
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
Expand All @@ -96,6 +99,7 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC

var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
if !cfg.Internal {
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
Expand Down
8 changes: 7 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,10 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(IngesterQuerier, t.initIngesterQuerier)
mm.RegisterModule(IngesterQueryTagsInterceptors, t.initIngesterQueryTagsInterceptors, modules.UserInvisibleModule)
mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendMiddleware, modules.UserInvisibleModule)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule)
Expand Down Expand Up @@ -711,6 +712,11 @@ func (t *Loki) setupModuleManager() error {
deps[QueryFrontend] = append(deps[QueryFrontend], QueryScheduler)
}

// Initialise query tags interceptors on targets running ingester
if t.Cfg.isModuleEnabled(Ingester) || t.Cfg.isModuleEnabled(Write) || t.Cfg.isModuleEnabled(All) {
deps[Server] = append(deps[Server], IngesterQueryTagsInterceptors)
}

// Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled.
if t.Cfg.BloomGateway.Enabled {
deps[IndexGateway] = append(deps[IndexGateway], BloomGatewayRing)
Expand Down
89 changes: 49 additions & 40 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,46 +86,47 @@ const maxChunkAgeForTableManager = 12 * time.Hour

// The various modules that make up Loki.
const (
Ring string = "ring"
RuntimeConfig string = "runtime-config"
Overrides string = "overrides"
OverridesExporter string = "overrides-exporter"
TenantConfigs string = "tenant-configs"
Server string = "server"
InternalServer string = "internal-server"
Distributor string = "distributor"
Ingester string = "ingester"
Querier string = "querier"
CacheGenerationLoader string = "cache-generation-loader"
IngesterQuerier string = "ingester-querier"
QueryFrontend string = "query-frontend"
QueryFrontendTripperware string = "query-frontend-tripperware"
QueryLimiter string = "query-limiter"
QueryLimitsInterceptors string = "query-limits-interceptors"
QueryLimitsTripperware string = "query-limits-tripper"
RulerStorage string = "ruler-storage"
Ruler string = "ruler"
RuleEvaluator string = "rule-evaluator"
Store string = "store"
TableManager string = "table-manager"
MemberlistKV string = "memberlist-kv"
Compactor string = "compactor"
BloomGateway string = "bloom-gateway"
BloomGatewayRing string = "bloom-gateway-ring"
IndexGateway string = "index-gateway"
IndexGatewayRing string = "index-gateway-ring"
IndexGatewayInterceptors string = "index-gateway-interceptors"
QueryScheduler string = "query-scheduler"
QuerySchedulerRing string = "query-scheduler-ring"
BloomCompactor string = "bloom-compactor"
BloomCompactorRing string = "bloom-compactor-ring"
BloomStore string = "bloom-store"
All string = "all"
Read string = "read"
Write string = "write"
Backend string = "backend"
Analytics string = "analytics"
InitCodec string = "init-codec"
Ring string = "ring"
RuntimeConfig string = "runtime-config"
Overrides string = "overrides"
OverridesExporter string = "overrides-exporter"
TenantConfigs string = "tenant-configs"
Server string = "server"
InternalServer string = "internal-server"
Distributor string = "distributor"
Querier string = "querier"
CacheGenerationLoader string = "cache-generation-loader"
Ingester string = "ingester"
IngesterQuerier string = "ingester-querier"
IngesterQueryTagsInterceptors string = "ingester-query-tags-interceptors"
QueryFrontend string = "query-frontend"
QueryFrontendTripperware string = "query-frontend-tripperware"
QueryLimiter string = "query-limiter"
QueryLimitsInterceptors string = "query-limits-interceptors"
QueryLimitsTripperware string = "query-limits-tripper"
RulerStorage string = "ruler-storage"
Ruler string = "ruler"
RuleEvaluator string = "rule-evaluator"
Store string = "store"
TableManager string = "table-manager"
MemberlistKV string = "memberlist-kv"
Compactor string = "compactor"
BloomGateway string = "bloom-gateway"
BloomGatewayRing string = "bloom-gateway-ring"
IndexGateway string = "index-gateway"
IndexGatewayRing string = "index-gateway-ring"
IndexGatewayInterceptors string = "index-gateway-interceptors"
QueryScheduler string = "query-scheduler"
QuerySchedulerRing string = "query-scheduler-ring"
BloomCompactor string = "bloom-compactor"
BloomCompactorRing string = "bloom-compactor-ring"
BloomStore string = "bloom-store"
All string = "all"
Read string = "read"
Write string = "write"
Backend string = "backend"
Analytics string = "analytics"
InitCodec string = "init-codec"
)

const (
Expand Down Expand Up @@ -1578,6 +1579,14 @@ func (t *Loki) initQueryLimitsInterceptors() (services.Service, error) {
return nil, nil
}

func (t *Loki) initIngesterQueryTagsInterceptors() (services.Service, error) {
_ = level.Debug(util_log.Logger).Log("msg", "initializing ingester query tags interceptors")
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, serverutil.StreamServerQueryTagsInterceptor)
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, serverutil.UnaryServerQueryTagsInterceptor)

return nil, nil
}

func (t *Loki) initAnalytics() (services.Service, error) {
if !t.Cfg.Analytics.Enabled {
return nil, nil
Expand Down
81 changes: 81 additions & 0 deletions pkg/util/server/grpc_query_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package server

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/grafana/loki/pkg/util/httpreq"
)

func getQueryTags(ctx context.Context) string {
v, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string)
return v
}

func injectIntoGRPCRequest(ctx context.Context) context.Context {
queryTags := getQueryTags(ctx)
if queryTags == "" {
return ctx
}

// inject into GRPC metadata
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(map[string]string{})
}
md = md.Copy()
md.Set(string(httpreq.QueryTagsHTTPHeader), queryTags)

return metadata.NewOutgoingContext(ctx, md)
}

func extractFromGRPCRequest(ctx context.Context) context.Context {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
// No metadata, just return as is
return ctx
}

headerValues := md.Get(string(httpreq.QueryTagsHTTPHeader))
if len(headerValues) == 0 {
return ctx
}

return context.WithValue(ctx, httpreq.QueryTagsHTTPHeader, headerValues[0])
}

// UnaryClientQueryTagsInterceptor propagates the query tags from the context to gRPC metadata, which eventually ends up as a HTTP2 header.
// For unary gRPC requests.
func UnaryClientQueryTagsInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(injectIntoGRPCRequest(ctx), method, req, reply, cc, opts...)
}

// StreamClientQueryTagsInterceptor propagates the query tags from the context to gRPC metadata, which eventually ends up as a HTTP2 header.
// For streaming gRPC requests.
func StreamClientQueryTagsInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(injectIntoGRPCRequest(ctx), desc, cc, method, opts...)
}

// UnaryServerQueryTagsInterceptor propagates the query tags from the gRPC metadata back to our context for unary gRPC requests.
func UnaryServerQueryTagsInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(extractFromGRPCRequest(ctx), req)
}

// StreamServerQueryTagsInterceptor propagates the query tags from the gRPC metadata back to our context for streaming gRPC requests.
func StreamServerQueryTagsInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, serverStream{
ctx: extractFromGRPCRequest(ss.Context()),
ServerStream: ss,
})
}

type serverStream struct {
ctx context.Context
grpc.ServerStream
}

func (ss serverStream) Context() context.Context {
return ss.ctx
}
81 changes: 81 additions & 0 deletions pkg/util/server/grpc_query_tags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package server

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"github.com/grafana/loki/pkg/util/httpreq"
)

func TestInjectQueryTagsIntoGRPCRequest(t *testing.T) {
for _, tt := range []struct {
name, tags string
md, expectMetadata metadata.MD
}{
{
name: "creates new metadata and sets query tags",
tags: "Source=logvolhist",
expectMetadata: metadata.New(map[string]string{"x-query-tags": "Source=logvolhist"}),
},
{
name: "sets query tags on existing metadata",
tags: "Source=logvolhist",
md: metadata.New(map[string]string{"x-foo": "bar"}),
expectMetadata: metadata.New(map[string]string{"x-foo": "bar", "x-query-tags": "Source=logvolhist"}),
},
{
name: "no query tags, leave metadata untouched",
md: metadata.New(map[string]string{"x-foo": "bar"}),
expectMetadata: metadata.New(map[string]string{"x-foo": "bar"}),
},
{
name: "no query tags",
expectMetadata: nil,
},
} {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
if tt.tags != "" {
ctx = httpreq.InjectQueryTags(context.Background(), tt.tags)
}

if tt.md != nil {
ctx = metadata.NewOutgoingContext(ctx, tt.md)
}

ctx = injectIntoGRPCRequest(ctx)
md, _ := metadata.FromOutgoingContext(ctx)
require.EqualValues(t, tt.expectMetadata, md)
})
}
}

func TestExtractQueryTagsFromGRPCRequest(t *testing.T) {
for _, tt := range []struct {
name string
md metadata.MD
expectedResp string
}{
{
name: "extracts query tags from metadata",
md: metadata.New(map[string]string{"x-query-tags": "Source=logvolhist"}),
expectedResp: "Source=logvolhist",
},
{
name: "non-nil metadata without query tags",
md: metadata.New(map[string]string{"x-foo": "bar"}),
},
{
name: "nil metadata",
},
} {
t.Run(tt.name, func(t *testing.T) {
ctx := metadata.NewIncomingContext(context.Background(), tt.md)
require.Equal(t, tt.expectedResp, getQueryTags(extractFromGRPCRequest(ctx)))
})
}

}

0 comments on commit ef742c4

Please sign in to comment.