diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index fa14294911ba4..3d0c48e8d0a3f 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -5,6 +5,8 @@ import ( "io" "time" + "github.com/grafana/loki/pkg/util/server" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/middleware" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" @@ -88,6 +90,7 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) { func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { var unaryInterceptors []grpc.UnaryClientInterceptor unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...) + unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor) unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) if !cfg.Internal { unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor) @@ -96,6 +99,7 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC var streamInterceptors []grpc.StreamClientInterceptor streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) + streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor) streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())) if !cfg.Internal { streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 96814966b01a8..90bb134a56df3 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -604,9 +604,10 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule) - mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(Querier, t.initQuerier) + mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(IngesterQuerier, t.initIngesterQuerier) + mm.RegisterModule(IngesterQueryTagsInterceptors, t.initIngesterQueryTagsInterceptors, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendMiddleware, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontend, t.initQueryFrontend) mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule) @@ -711,6 +712,11 @@ func (t *Loki) setupModuleManager() error { deps[QueryFrontend] = append(deps[QueryFrontend], QueryScheduler) } + // Initialise query tags interceptors on targets running ingester + if t.Cfg.isModuleEnabled(Ingester) || t.Cfg.isModuleEnabled(Write) || t.Cfg.isModuleEnabled(All) { + deps[Server] = append(deps[Server], IngesterQueryTagsInterceptors) + } + // Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled. if t.Cfg.BloomGateway.Enabled { deps[IndexGateway] = append(deps[IndexGateway], BloomGatewayRing) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 7dec8946c71dd..00dbda9b5fefd 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -86,46 +86,47 @@ const maxChunkAgeForTableManager = 12 * time.Hour // The various modules that make up Loki. const ( - Ring string = "ring" - RuntimeConfig string = "runtime-config" - Overrides string = "overrides" - OverridesExporter string = "overrides-exporter" - TenantConfigs string = "tenant-configs" - Server string = "server" - InternalServer string = "internal-server" - Distributor string = "distributor" - Ingester string = "ingester" - Querier string = "querier" - CacheGenerationLoader string = "cache-generation-loader" - IngesterQuerier string = "ingester-querier" - QueryFrontend string = "query-frontend" - QueryFrontendTripperware string = "query-frontend-tripperware" - QueryLimiter string = "query-limiter" - QueryLimitsInterceptors string = "query-limits-interceptors" - QueryLimitsTripperware string = "query-limits-tripper" - RulerStorage string = "ruler-storage" - Ruler string = "ruler" - RuleEvaluator string = "rule-evaluator" - Store string = "store" - TableManager string = "table-manager" - MemberlistKV string = "memberlist-kv" - Compactor string = "compactor" - BloomGateway string = "bloom-gateway" - BloomGatewayRing string = "bloom-gateway-ring" - IndexGateway string = "index-gateway" - IndexGatewayRing string = "index-gateway-ring" - IndexGatewayInterceptors string = "index-gateway-interceptors" - QueryScheduler string = "query-scheduler" - QuerySchedulerRing string = "query-scheduler-ring" - BloomCompactor string = "bloom-compactor" - BloomCompactorRing string = "bloom-compactor-ring" - BloomStore string = "bloom-store" - All string = "all" - Read string = "read" - Write string = "write" - Backend string = "backend" - Analytics string = "analytics" - InitCodec string = "init-codec" + Ring string = "ring" + RuntimeConfig string = "runtime-config" + Overrides string = "overrides" + OverridesExporter string = "overrides-exporter" + TenantConfigs string = "tenant-configs" + Server string = "server" + InternalServer string = "internal-server" + Distributor string = "distributor" + Querier string = "querier" + CacheGenerationLoader string = "cache-generation-loader" + Ingester string = "ingester" + IngesterQuerier string = "ingester-querier" + IngesterQueryTagsInterceptors string = "ingester-query-tags-interceptors" + QueryFrontend string = "query-frontend" + QueryFrontendTripperware string = "query-frontend-tripperware" + QueryLimiter string = "query-limiter" + QueryLimitsInterceptors string = "query-limits-interceptors" + QueryLimitsTripperware string = "query-limits-tripper" + RulerStorage string = "ruler-storage" + Ruler string = "ruler" + RuleEvaluator string = "rule-evaluator" + Store string = "store" + TableManager string = "table-manager" + MemberlistKV string = "memberlist-kv" + Compactor string = "compactor" + BloomGateway string = "bloom-gateway" + BloomGatewayRing string = "bloom-gateway-ring" + IndexGateway string = "index-gateway" + IndexGatewayRing string = "index-gateway-ring" + IndexGatewayInterceptors string = "index-gateway-interceptors" + QueryScheduler string = "query-scheduler" + QuerySchedulerRing string = "query-scheduler-ring" + BloomCompactor string = "bloom-compactor" + BloomCompactorRing string = "bloom-compactor-ring" + BloomStore string = "bloom-store" + All string = "all" + Read string = "read" + Write string = "write" + Backend string = "backend" + Analytics string = "analytics" + InitCodec string = "init-codec" ) const ( @@ -1578,6 +1579,14 @@ func (t *Loki) initQueryLimitsInterceptors() (services.Service, error) { return nil, nil } +func (t *Loki) initIngesterQueryTagsInterceptors() (services.Service, error) { + _ = level.Debug(util_log.Logger).Log("msg", "initializing ingester query tags interceptors") + t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, serverutil.StreamServerQueryTagsInterceptor) + t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, serverutil.UnaryServerQueryTagsInterceptor) + + return nil, nil +} + func (t *Loki) initAnalytics() (services.Service, error) { if !t.Cfg.Analytics.Enabled { return nil, nil diff --git a/pkg/util/server/grpc_query_tags.go b/pkg/util/server/grpc_query_tags.go new file mode 100644 index 0000000000000..d5d6e58d2c938 --- /dev/null +++ b/pkg/util/server/grpc_query_tags.go @@ -0,0 +1,81 @@ +package server + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/grafana/loki/pkg/util/httpreq" +) + +func getQueryTags(ctx context.Context) string { + v, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) + return v +} + +func injectIntoGRPCRequest(ctx context.Context) context.Context { + queryTags := getQueryTags(ctx) + if queryTags == "" { + return ctx + } + + // inject into GRPC metadata + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(map[string]string{}) + } + md = md.Copy() + md.Set(string(httpreq.QueryTagsHTTPHeader), queryTags) + + return metadata.NewOutgoingContext(ctx, md) +} + +func extractFromGRPCRequest(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + // No metadata, just return as is + return ctx + } + + headerValues := md.Get(string(httpreq.QueryTagsHTTPHeader)) + if len(headerValues) == 0 { + return ctx + } + + return context.WithValue(ctx, httpreq.QueryTagsHTTPHeader, headerValues[0]) +} + +// UnaryClientQueryTagsInterceptor propagates the query tags from the context to gRPC metadata, which eventually ends up as a HTTP2 header. +// For unary gRPC requests. +func UnaryClientQueryTagsInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(injectIntoGRPCRequest(ctx), method, req, reply, cc, opts...) +} + +// StreamClientQueryTagsInterceptor propagates the query tags from the context to gRPC metadata, which eventually ends up as a HTTP2 header. +// For streaming gRPC requests. +func StreamClientQueryTagsInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(injectIntoGRPCRequest(ctx), desc, cc, method, opts...) +} + +// UnaryServerQueryTagsInterceptor propagates the query tags from the gRPC metadata back to our context for unary gRPC requests. +func UnaryServerQueryTagsInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return handler(extractFromGRPCRequest(ctx), req) +} + +// StreamServerQueryTagsInterceptor propagates the query tags from the gRPC metadata back to our context for streaming gRPC requests. +func StreamServerQueryTagsInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, serverStream{ + ctx: extractFromGRPCRequest(ss.Context()), + ServerStream: ss, + }) +} + +type serverStream struct { + ctx context.Context + grpc.ServerStream +} + +func (ss serverStream) Context() context.Context { + return ss.ctx +} diff --git a/pkg/util/server/grpc_query_tags_test.go b/pkg/util/server/grpc_query_tags_test.go new file mode 100644 index 0000000000000..ae718178caa00 --- /dev/null +++ b/pkg/util/server/grpc_query_tags_test.go @@ -0,0 +1,81 @@ +package server + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + + "github.com/grafana/loki/pkg/util/httpreq" +) + +func TestInjectQueryTagsIntoGRPCRequest(t *testing.T) { + for _, tt := range []struct { + name, tags string + md, expectMetadata metadata.MD + }{ + { + name: "creates new metadata and sets query tags", + tags: "Source=logvolhist", + expectMetadata: metadata.New(map[string]string{"x-query-tags": "Source=logvolhist"}), + }, + { + name: "sets query tags on existing metadata", + tags: "Source=logvolhist", + md: metadata.New(map[string]string{"x-foo": "bar"}), + expectMetadata: metadata.New(map[string]string{"x-foo": "bar", "x-query-tags": "Source=logvolhist"}), + }, + { + name: "no query tags, leave metadata untouched", + md: metadata.New(map[string]string{"x-foo": "bar"}), + expectMetadata: metadata.New(map[string]string{"x-foo": "bar"}), + }, + { + name: "no query tags", + expectMetadata: nil, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + if tt.tags != "" { + ctx = httpreq.InjectQueryTags(context.Background(), tt.tags) + } + + if tt.md != nil { + ctx = metadata.NewOutgoingContext(ctx, tt.md) + } + + ctx = injectIntoGRPCRequest(ctx) + md, _ := metadata.FromOutgoingContext(ctx) + require.EqualValues(t, tt.expectMetadata, md) + }) + } +} + +func TestExtractQueryTagsFromGRPCRequest(t *testing.T) { + for _, tt := range []struct { + name string + md metadata.MD + expectedResp string + }{ + { + name: "extracts query tags from metadata", + md: metadata.New(map[string]string{"x-query-tags": "Source=logvolhist"}), + expectedResp: "Source=logvolhist", + }, + { + name: "non-nil metadata without query tags", + md: metadata.New(map[string]string{"x-foo": "bar"}), + }, + { + name: "nil metadata", + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := metadata.NewIncomingContext(context.Background(), tt.md) + require.Equal(t, tt.expectedResp, getQueryTags(extractFromGRPCRequest(ctx))) + }) + } + +}