From e625d7f1caa4c9373215a46cb018ecdfc6563c65 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 24 Jan 2024 17:06:01 +0530 Subject: [PATCH] otel: add support for per tenant configuration for mapping otlp data to loki format (#11143) **What this PR does / why we need it**: In OTEL, we pick select Resource Attributes to identify the streams and store all the other attributes as Structured Metadata, as explained here. The problem however is that the list of Resource Attributes that are picked as Stream labels are hardcoded, and there is no way to drop unwanted data. This PR adds support for configuring how data is mapped from OTEL to Loki format per tenant. It also adds support for dropping unwanted data. We decided to make the config look similar to Prometheus's relabling config to make it familiar. **Special notes for your reviewer**: Opening a draft PR to get some initial feedback. I will add documentation in a separate PR. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated --- CHANGELOG.md | 1 + .../promtail/targets/lokipush/pushtarget.go | 2 +- docs/sources/configure/_index.md | 11 + pkg/distributor/http.go | 2 +- pkg/distributor/limits.go | 2 + pkg/loghttp/push/otlp.go | 142 ++++--- pkg/loghttp/push/otlp_config.go | 166 ++++++++ pkg/loghttp/push/otlp_config_test.go | 355 ++++++++++++++++++ pkg/loghttp/push/otlp_test.go | 160 +++++++- pkg/loghttp/push/push.go | 12 +- pkg/loghttp/push/push_test.go | 2 +- pkg/validation/limits.go | 12 +- pkg/validation/limits_test.go | 19 + 13 files changed, 813 insertions(+), 73 deletions(-) create mode 100644 pkg/loghttp/push/otlp_config.go create mode 100644 pkg/loghttp/push/otlp_config_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 04b692efcb00e..8c22d9126f9c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ * [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. * [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results. * [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results. +* [11143](https://github.com/grafana/loki/pull/11143) **sandeepsukhani** otel: Add support for per tenant configuration for mapping otlp data to loki format * [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs. ##### Fixes diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index 1e8affb389a17..c981de0de3dda 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -111,7 +111,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := tenant.TenantID(r.Context()) - req, err := push.ParseRequest(logger, userID, r, nil, push.ParseLokiRequest) + req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index edb394733a3c5..2a2c51544f013 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3140,6 +3140,17 @@ shard_streams: # Maximum number of structured metadata entries per log line. # CLI flag: -limits.max-structured-metadata-entries-count [max_structured_metadata_entries_count: | default = 128] + +# OTLP log ingestion configurations +otlp_config: + resource_attributes: + [ignore_defaults: ] + + [attributes: ] + + [scope_attributes: ] + + [log_attributes: ] ``` ### frontend_worker diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 67db8e5c5ba7b..ce242355e077b 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe http.Error(w, err.Error(), http.StatusBadRequest) return } - req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, pushRequestParser) + req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser) if err != nil { if d.tenantConfigs.LogPushRequest(tenantID) { level.Debug(logger).Log( diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index add9d17708dbd..6db6995662dd2 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/loki/pkg/compactor/retention" "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/loghttp/push" ) // Limits is an interface for distributor limits/related configs @@ -29,4 +30,5 @@ type Limits interface { AllowStructuredMetadata(userID string) bool MaxStructuredMetadataSize(userID string) int MaxStructuredMetadataCount(userID string) int + OTLPConfig(userID string) push.OTLPConfig } diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 737e9b78ae72a..f4f937b93dc33 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -25,28 +25,9 @@ import ( const ( pbContentType = "application/x-protobuf" gzipContentEncoding = "gzip" + attrServiceName = "service.name" ) -var blessedAttributes = []string{ - "service.name", - "service.namespace", - "service.instance.id", - "deployment.environment", - "cloud.region", - "cloud.availability_zone", - "k8s.cluster.name", - "k8s.namespace.name", - "k8s.pod.name", - "k8s.container.name", - "container.name", - "k8s.replicaset.name", - "k8s.deployment.name", - "k8s.statefulset.name", - "k8s.daemonset.name", - "k8s.cronjob.name", - "k8s.job.name", -} - var blessedAttributesNormalized = make([]string, len(blessedAttributes)) func init() { @@ -62,14 +43,14 @@ func newPushStats() *Stats { } } -func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) { +func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) { stats := newPushStats() otlpLogs, err := extractLogs(r, stats) if err != nil { return nil, nil, err } - req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, stats) + req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), stats) return req, stats, nil } @@ -120,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, stats *Stats) *logproto.PushRequest { +func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, stats *Stats) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -131,29 +112,31 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants for i := 0; i < rls.Len(); i++ { sls := rls.At(i).ScopeLogs() res := rls.At(i).Resource() + resAttrs := res.Attributes() - flattenedResourceAttributes := labels.NewBuilder(logproto.FromLabelAdaptersToLabels(attributesToLabels(res.Attributes(), ""))) - // service.name is a required Resource Attribute. If it is not present, we will set it to "unknown_service". - if flattenedResourceAttributes.Get("service_name") == "" { - flattenedResourceAttributes = flattenedResourceAttributes.Set("service_name", "unknown_service") + if v, _ := resAttrs.Get(attrServiceName); v.AsString() == "" { + resAttrs.PutStr(attrServiceName, "unknown_service") } + resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len()) + streamLabels := make(model.LabelSet, len(blessedAttributesNormalized)) - if dac := res.DroppedAttributesCount(); dac != 0 { - flattenedResourceAttributes = flattenedResourceAttributes.Set("resource_dropped_attributes_count", fmt.Sprintf("%d", dac)) - } + resAttrs.Range(func(k string, v pcommon.Value) bool { + action := otlpConfig.ActionForResourceAttribute(k) + if action == Drop { + return true + } - // copy blessed attributes to stream labels - streamLabels := make(model.LabelSet, len(blessedAttributesNormalized)) - for _, ba := range blessedAttributesNormalized { - v := flattenedResourceAttributes.Get(ba) - if v == "" { - continue + attributeAsLabels := attributeToLabels(k, v, "") + if action == IndexLabel { + for _, lbl := range attributeAsLabels { + streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + } else if action == StructuredMetadata { + resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...) } - streamLabels[model.LabelName(ba)] = model.LabelValue(v) - // remove the blessed attributes copied to stream labels - flattenedResourceAttributes.Del(ba) - } + return true + }) if err := streamLabels.Validate(); err != nil { stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err)) @@ -161,9 +144,6 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } labelsStr := streamLabels.String() - // convert the remaining resource attributes to structured metadata - resourceAttributesAsStructuredMetadata := logproto.FromLabelsToLabelAdapters(flattenedResourceAttributes.Labels()) - lbs := modelLabelsSetToLabelsList(streamLabels) if _, ok := pushRequestsByStream[labelsStr]; !ok { pushRequestsByStream[labelsStr] = logproto.Stream{ @@ -178,6 +158,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants for j := 0; j < sls.Len(); j++ { scope := sls.At(j).Scope() logs := sls.At(j).LogRecords() + scopeAttrs := scope.Attributes() // it would be rare to have multiple scopes so if the entries slice is empty, pre-allocate it for the number of log entries if cap(pushRequestsByStream[labelsStr].Entries) == 0 { @@ -187,7 +168,20 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } // use fields and attributes from scope as structured metadata - scopeAttributesAsStructuredMetadata := attributesToLabels(scope.Attributes(), "") + scopeAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, scopeAttrs.Len()+3) + scopeAttrs.Range(func(k string, v pcommon.Value) bool { + action := otlpConfig.ActionForScopeAttribute(k) + if action == Drop { + return true + } + + attributeAsLabels := attributeToLabels(k, v, "") + if action == StructuredMetadata { + scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, attributeAsLabels...) + } + + return true + }) if scopeName := scope.Name(); scopeName != "" { scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, push.LabelAdapter{ @@ -213,7 +207,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants for k := 0; k < logs.Len(); k++ { log := logs.At(k) - entry := otlpLogToPushEntry(log) + entry := otlpLogToPushEntry(log, otlpConfig) // if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata) @@ -251,9 +245,23 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } // otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry. -func otlpLogToPushEntry(log plog.LogRecord) push.Entry { +func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry { // copy log attributes and all the fields from log(except log.Body) to structured metadata - structuredMetadata := attributesToLabels(log.Attributes(), "") + logAttrs := log.Attributes() + structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7) + logAttrs.Range(func(k string, v pcommon.Value) bool { + action := otlpConfig.ActionForLogAttribute(k) + if action == Drop { + return true + } + + attributeAsLabels := attributeToLabels(k, v, "") + if action == StructuredMetadata { + structuredMetadata = append(structuredMetadata, attributeAsLabels...) + } + + return true + }) // if log.Timestamp() is 0, we would have already stored log.ObservedTimestamp as log timestamp so no need to store again in structured metadata if log.Timestamp() != 0 && log.ObservedTimestamp() != 0 { @@ -316,25 +324,39 @@ func attributesToLabels(attrs pcommon.Map, prefix string) push.LabelsAdapter { } attrs.Range(func(k string, v pcommon.Value) bool { - keyWithPrefix := k - if prefix != "" { - keyWithPrefix = prefix + "_" + k - } - keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix) - - typ := v.Type() - if typ == pcommon.ValueTypeMap { - labelsAdapter = append(labelsAdapter, attributesToLabels(v.Map(), keyWithPrefix)...) - } else { - labelsAdapter = append(labelsAdapter, push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()}) - } - + labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, prefix)...) return true }) return labelsAdapter } +func attributeToLabels(k string, v pcommon.Value, prefix string) push.LabelsAdapter { + var labelsAdapter push.LabelsAdapter + + keyWithPrefix := k + if prefix != "" { + keyWithPrefix = prefix + "_" + k + } + keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix) + + typ := v.Type() + if typ == pcommon.ValueTypeMap { + mv := v.Map() + labelsAdapter = make(push.LabelsAdapter, 0, mv.Len()) + mv.Range(func(k string, v pcommon.Value) bool { + labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, keyWithPrefix)...) + return true + }) + } else { + labelsAdapter = push.LabelsAdapter{ + push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()}, + } + } + + return labelsAdapter +} + func timestampFromLogRecord(lr plog.LogRecord) time.Time { if lr.Timestamp() != 0 { return time.Unix(0, int64(lr.Timestamp())) diff --git a/pkg/loghttp/push/otlp_config.go b/pkg/loghttp/push/otlp_config.go new file mode 100644 index 0000000000000..64120d4a6252e --- /dev/null +++ b/pkg/loghttp/push/otlp_config.go @@ -0,0 +1,166 @@ +package push + +import ( + "fmt" + + "github.com/prometheus/prometheus/model/relabel" +) + +var blessedAttributes = []string{ + "service.name", + "service.namespace", + "service.instance.id", + "deployment.environment", + "cloud.region", + "cloud.availability_zone", + "k8s.cluster.name", + "k8s.namespace.name", + "k8s.pod.name", + "k8s.container.name", + "container.name", + "k8s.replicaset.name", + "k8s.deployment.name", + "k8s.statefulset.name", + "k8s.daemonset.name", + "k8s.cronjob.name", + "k8s.job.name", +} + +// Action is the action to be performed on OTLP Resource Attribute. +type Action string + +const ( + // IndexLabel stores a Resource Attribute as a label in index to identify streams. + IndexLabel Action = "index_label" + // StructuredMetadata stores an Attribute as Structured Metadata with each log entry. + StructuredMetadata Action = "structured_metadata" + // Drop drops Attributes for which the Attribute name does match the regex. + Drop Action = "drop" +) + +var ( + errUnsupportedAction = fmt.Errorf("unsupported action, it must be one of: %s, %s, %s", Drop, IndexLabel, StructuredMetadata) + errAttributesAndRegexNotSet = fmt.Errorf("attributes or regex must be set") + errAttributesAndRegexBothSet = fmt.Errorf("only one of attributes or regex must be set") +) + +var DefaultOTLPConfig = OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: blessedAttributes, + }, + }, + }, +} + +type OTLPConfig struct { + ResourceAttributes ResourceAttributesConfig `yaml:"resource_attributes,omitempty"` + ScopeAttributes []AttributesConfig `yaml:"scope_attributes,omitempty"` + LogAttributes []AttributesConfig `yaml:"log_attributes,omitempty"` +} + +func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultOTLPConfig + type plain OTLPConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + return nil +} + +func (c *OTLPConfig) actionForAttribute(attribute string, cfgs []AttributesConfig) Action { + for i := 0; i < len(cfgs); i++ { + if cfgs[i].Regex.Regexp != nil && cfgs[i].Regex.MatchString(attribute) { + return cfgs[i].Action + } + for _, cfgAttr := range cfgs[i].Attributes { + if cfgAttr == attribute { + return cfgs[i].Action + } + } + } + + return StructuredMetadata +} + +func (c *OTLPConfig) ActionForResourceAttribute(attribute string) Action { + return c.actionForAttribute(attribute, c.ResourceAttributes.AttributesConfig) +} + +func (c *OTLPConfig) ActionForScopeAttribute(attribute string) Action { + return c.actionForAttribute(attribute, c.ScopeAttributes) +} + +func (c *OTLPConfig) ActionForLogAttribute(attribute string) Action { + return c.actionForAttribute(attribute, c.LogAttributes) +} + +func (c *OTLPConfig) Validate() error { + for _, ac := range c.ScopeAttributes { + if ac.Action == IndexLabel { + return fmt.Errorf("%s action is only supported for resource_attributes", IndexLabel) + } + } + + for _, ac := range c.LogAttributes { + if ac.Action == IndexLabel { + return fmt.Errorf("%s action is only supported for resource_attributes", IndexLabel) + } + } + + return nil +} + +type AttributesConfig struct { + Action Action `yaml:"action,omitempty"` + Attributes []string `yaml:"attributes,omitempty"` + Regex relabel.Regexp `yaml:"regex,omitempty"` +} + +func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain AttributesConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + if c.Action == "" { + c.Action = StructuredMetadata + } + + if c.Action != IndexLabel && c.Action != StructuredMetadata && c.Action != Drop { + return errUnsupportedAction + } + + if len(c.Attributes) == 0 && c.Regex.Regexp == nil { + return errAttributesAndRegexNotSet + } + + if len(c.Attributes) != 0 && c.Regex.Regexp != nil { + return errAttributesAndRegexBothSet + } + + return nil +} + +type ResourceAttributesConfig struct { + IgnoreDefaults bool `yaml:"ignore_defaults,omitempty"` + AttributesConfig []AttributesConfig `yaml:"attributes,omitempty"` +} + +func (c *ResourceAttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain ResourceAttributesConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + if !c.IgnoreDefaults { + c.AttributesConfig = append([]AttributesConfig{ + DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0], + }, c.AttributesConfig...) + } + + return nil +} diff --git a/pkg/loghttp/push/otlp_config_test.go b/pkg/loghttp/push/otlp_config_test.go new file mode 100644 index 0000000000000..a1cfc15ff52c8 --- /dev/null +++ b/pkg/loghttp/push/otlp_config_test.go @@ -0,0 +1,355 @@ +package push + +import ( + "testing" + + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestUnmarshalOTLPConfig(t *testing.T) { + for _, tc := range []struct { + name string + yamlConfig []byte + expectedCfg OTLPConfig + expectedErr error + }{ + { + name: "only resource_attributes set", + yamlConfig: []byte(` +resource_attributes: + attributes: + - action: index_label + regex: foo`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0], + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + }, + }, + }, + { + name: "resource_attributes with defaults ignored", + yamlConfig: []byte(` +resource_attributes: + ignore_defaults: true + attributes: + - action: index_label + regex: foo`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + IgnoreDefaults: true, + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + }, + }, + }, + { + name: "resource_attributes not set", + yamlConfig: []byte(` +scope_attributes: + - action: drop + attributes: + - fizz`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: blessedAttributes, + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"fizz"}, + }, + }, + }, + }, + { + name: "all 3 set", + yamlConfig: []byte(` +resource_attributes: + attributes: + - action: index_label + regex: foo +scope_attributes: + - action: drop + attributes: + - fizz +log_attributes: + - action: structured_metadata + attributes: + - buzz`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0], + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"fizz"}, + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Attributes: []string{"buzz"}, + }, + }, + }, + }, + { + name: "unsupported action should error", + yamlConfig: []byte(` +log_attributes: + - action: keep + attributes: + - fizz`), + expectedErr: errUnsupportedAction, + }, + { + name: "attributes and regex both not set should error", + yamlConfig: []byte(` +log_attributes: + - action: drop`), + expectedErr: errAttributesAndRegexNotSet, + }, + { + name: "attributes and regex both being set should error", + yamlConfig: []byte(` +log_attributes: + - action: drop + regex: foo + attributes: + - fizz`), + expectedErr: errAttributesAndRegexBothSet, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := OTLPConfig{} + err := yaml.UnmarshalStrict(tc.yamlConfig, &cfg) + if tc.expectedErr != nil { + require.ErrorIs(t, err, tc.expectedErr) + return + } + require.Equal(t, tc.expectedCfg, cfg) + }) + } +} + +func TestOTLPConfig(t *testing.T) { + type attrAndExpAction struct { + attr string + expectedAction Action + } + + for _, tc := range []struct { + name string + otlpConfig OTLPConfig + resAttrs []attrAndExpAction + scopeAttrs []attrAndExpAction + logAttrs []attrAndExpAction + }{ + { + name: "default OTLPConfig", + otlpConfig: DefaultOTLPConfig, + resAttrs: []attrAndExpAction{ + { + attr: attrServiceName, + expectedAction: IndexLabel, + }, + { + attr: "not_blessed", + expectedAction: StructuredMetadata, + }, + }, + scopeAttrs: []attrAndExpAction{ + { + attr: "method", + expectedAction: StructuredMetadata, + }, + }, + logAttrs: []attrAndExpAction{ + { + attr: "user_id", + expectedAction: StructuredMetadata, + }, + }, + }, + { + name: "drop everything except a few attrs", + otlpConfig: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: []string{attrServiceName}, + }, + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp("^foo.*"), + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Attributes: []string{"method"}, + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Attributes: []string{"user_id"}, + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + resAttrs: []attrAndExpAction{ + { + attr: attrServiceName, + expectedAction: IndexLabel, + }, + { + attr: "foo_bar", + expectedAction: StructuredMetadata, + }, + { + attr: "ping_foo", + expectedAction: Drop, + }, + }, + scopeAttrs: []attrAndExpAction{ + { + attr: "method", + expectedAction: StructuredMetadata, + }, + { + attr: "version", + expectedAction: Drop, + }, + }, + logAttrs: []attrAndExpAction{ + { + attr: "user_id", + expectedAction: StructuredMetadata, + }, + { + attr: "order_id", + expectedAction: Drop, + }, + }, + }, + { + name: "keep everything except a few attrs", + otlpConfig: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{attrServiceName}, + }, + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"method"}, + }, + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"user_id"}, + }, + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + resAttrs: []attrAndExpAction{ + { + attr: attrServiceName, + expectedAction: Drop, + }, + { + attr: "foo_bar", + expectedAction: IndexLabel, + }, + }, + scopeAttrs: []attrAndExpAction{ + { + attr: "method", + expectedAction: Drop, + }, + { + attr: "version", + expectedAction: StructuredMetadata, + }, + }, + logAttrs: []attrAndExpAction{ + { + attr: "user_id", + expectedAction: Drop, + }, + { + attr: "order_id", + expectedAction: StructuredMetadata, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + for _, c := range tc.resAttrs { + require.Equal(t, c.expectedAction, tc.otlpConfig.ActionForResourceAttribute(c.attr)) + } + + for _, c := range tc.scopeAttrs { + require.Equal(t, c.expectedAction, tc.otlpConfig.ActionForScopeAttribute(c.attr)) + } + + for _, c := range tc.logAttrs { + require.Equal(t, c.expectedAction, tc.otlpConfig.ActionForLogAttribute(c.attr)) + } + }) + } +} diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index 8d02485833775..d817c933a43d5 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -23,6 +24,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { generateLogs func() plog.Logs expectedPushRequest logproto.PushRequest expectedStats Stats + otlpConfig OTLPConfig }{ { name: "no logs", @@ -31,6 +33,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedPushRequest: logproto.PushRequest{}, expectedStats: *newPushStats(), + otlpConfig: DefaultOTLPConfig, }, { name: "resource with no logs", @@ -41,9 +44,11 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedPushRequest: logproto.PushRequest{}, expectedStats: *newPushStats(), + otlpConfig: DefaultOTLPConfig, }, { - name: "resource with a log entry", + name: "resource with a log entry", + otlpConfig: DefaultOTLPConfig, generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.name", "service-1") @@ -78,7 +83,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, { - name: "resource attributes and scope attributes stored as structured metadata", + name: "resource attributes and scope attributes stored as structured metadata", + otlpConfig: DefaultOTLPConfig, generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty() @@ -152,7 +158,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, { - name: "attributes with nested data", + name: "attributes with nested data", + otlpConfig: DefaultOTLPConfig, generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty() @@ -234,10 +241,153 @@ func TestOTLPToLokiPushRequest(t *testing.T) { mostRecentEntryTimestamp: now, }, }, + { + name: "custom otlp config", + otlpConfig: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: []string{"pod.name"}, + }, { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("service.*"), + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp("drop.*"), + }, + { + Action: StructuredMetadata, + Attributes: []string{"resource.nested"}, + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"drop.function"}, + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp(".*_id"), + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + generateLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty() + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("service.name", "service-1") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("pod.name", "service-1-abc") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("pod.ip", "10.200.200.200") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("drop.service.addr", "192.168.0.1") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("drop.service.version", "v1") + ld.ResourceLogs().At(0).Resource().Attributes().PutEmptyMap("resource.nested").PutStr("foo", "bar") + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty() + ld.ResourceLogs().At(0).ScopeLogs().At(0).Scope().SetName("fizz") + ld.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("drop.function", "login") + ld.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutEmptyMap("scope.nested").PutStr("foo", "bar") + for i := 0; i < 2; i++ { + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty() + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().SetStr(fmt.Sprintf("test body - %d", i)) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).SetTimestamp(pcommon.Timestamp(now.UnixNano())) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().PutStr("user_id", "u1") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().PutStr("order_id", "o1") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().PutEmptyMap("drop.log.nested").PutStr("foo", fmt.Sprintf("bar - %d", i)) + } + return ld + }, + expectedPushRequest: logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{pod_name="service-1-abc", service_name="service-1"}`, + Entries: []logproto.Entry{ + { + Timestamp: now, + Line: "test body - 0", + StructuredMetadata: push.LabelsAdapter{ + { + Name: "user_id", + Value: "u1", + }, + { + Name: "order_id", + Value: "o1", + }, + { + Name: "pod_ip", + Value: "10.200.200.200", + }, + { + Name: "resource_nested_foo", + Value: "bar", + }, + { + Name: "scope_nested_foo", + Value: "bar", + }, + { + Name: "scope_name", + Value: "fizz", + }, + }, + }, + { + Timestamp: now, + Line: "test body - 1", + StructuredMetadata: push.LabelsAdapter{ + { + Name: "user_id", + Value: "u1", + }, + { + Name: "order_id", + Value: "o1", + }, + { + Name: "pod_ip", + Value: "10.200.200.200", + }, + { + Name: "resource_nested_foo", + Value: "bar", + }, + { + Name: "scope_nested_foo", + Value: "bar", + }, + { + Name: "scope_name", + Value: "fizz", + }, + }, + }, + }, + }, + }, + }, + expectedStats: Stats{ + numLines: 2, + logLinesBytes: map[time.Duration]int64{ + time.Hour: 26, + }, + structuredMetadataBytes: map[time.Duration]int64{ + time.Hour: 113, + }, + streamLabelsSize: 42, + mostRecentEntryTimestamp: now, + }, + }, } { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() - pushReq := otlpToLokiPushRequest(tc.generateLogs(), "foo", fakeRetention{}, stats) + pushReq := otlpToLokiPushRequest(tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, stats) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) }) @@ -324,7 +474,7 @@ func TestOTLPLogToPushEntry(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord())) + require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig)) }) } diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index dffa5ab1a05e3..15b7bba0a78c9 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -58,7 +58,11 @@ type TenantsRetention interface { RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration } -type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) +type Limits interface { + OTLPConfig(userID string) OTLPConfig +} + +type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) type Stats struct { errs []error @@ -72,8 +76,8 @@ type Stats struct { bodySize int64 } -func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, pushRequestParser RequestParser) (*logproto.PushRequest, error) { - req, pushStats, err := pushRequestParser(userID, r, tenantsRetention) +func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser) (*logproto.PushRequest, error) { + req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits) if err != nil { return nil, err } @@ -131,7 +135,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 286b0e013a241..fa1e2fb28d115 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -200,7 +200,7 @@ func TestParseRequest(t *testing.T) { request.Header.Add("Content-Encoding", test.contentEncoding) } - data, err := ParseRequest(util_log.Logger, "fake", request, nil, ParseLokiRequest) + data, err := ParseRequest(util_log.Logger, "fake", request, nil, nil, ParseLokiRequest) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index ac25798c33e31..e3052c1781b89 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/compactor/deletionmode" "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logql/syntax" ruler_config "github.com/grafana/loki/pkg/ruler/config" "github.com/grafana/loki/pkg/ruler/util" @@ -199,6 +200,7 @@ type Limits struct { AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` MaxStructuredMetadataEntriesCount int `yaml:"max_structured_metadata_entries_count" json:"max_structured_metadata_entries_count" doc:"description=Maximum number of structured metadata entries per log line."` + OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"` } type StreamRetention struct { @@ -341,7 +343,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxStructuredMetadataSize.Set(defaultMaxStructuredMetadataSize) f.Var(&l.MaxStructuredMetadataSize, "limits.max-structured-metadata-size", "Maximum size accepted for structured metadata per entry. Default: 64 kb. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.") f.IntVar(&l.MaxStructuredMetadataEntriesCount, "limits.max-structured-metadata-entries-count", defaultMaxStructuredMetadataCount, "Maximum number of structured metadata entries per log line. Default: 128. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.") - + l.OTLPConfig = push.DefaultOTLPConfig } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -398,6 +400,10 @@ func (l *Limits) Validate() error { l.MaxQueryCapacity = 1 } + if err := l.OTLPConfig.Validate(); err != nil { + return err + } + return nil } @@ -892,6 +898,10 @@ func (o *Overrides) MaxStructuredMetadataCount(userID string) int { return o.getOverridesForUser(userID).MaxStructuredMetadataEntriesCount } +func (o *Overrides) OTLPConfig(userID string) push.OTLPConfig { + return o.getOverridesForUser(userID).OTLPConfig +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 908531f9858f6..0fb6dcbae2ef0 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -12,6 +12,7 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/compactor/deletionmode" + "github.com/grafana/loki/pkg/loghttp/push" ) func TestLimitsTagsYamlMatchJson(t *testing.T) { @@ -173,6 +174,18 @@ func TestLimitsDoesNotMutate(t *testing.T) { defaultLimits = initialDefault }() + defaultOTLPConfig := push.OTLPConfig{ + ResourceAttributes: push.ResourceAttributesConfig{ + IgnoreDefaults: true, + AttributesConfig: []push.AttributesConfig{ + { + Action: push.IndexLabel, + Attributes: []string{"pod"}, + }, + }, + }, + } + // Set new defaults with non-nil values for non-scalar types newDefaults := Limits{ RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, @@ -182,6 +195,7 @@ func TestLimitsDoesNotMutate(t *testing.T) { Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, } SetDefaultLimitsForYAMLUnmarshalling(newDefaults) @@ -206,6 +220,7 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -222,6 +237,7 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -241,6 +257,7 @@ retention_stream: // Rest from new defaults RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -259,6 +276,7 @@ reject_old_samples: true Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -277,6 +295,7 @@ query_timeout: 5m Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, } {