diff --git a/CHANGELOG.md b/CHANGELOG.md index 04b692efcb00e..8c22d9126f9c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ * [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. * [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results. * [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results. +* [11143](https://github.com/grafana/loki/pull/11143) **sandeepsukhani** otel: Add support for per tenant configuration for mapping otlp data to loki format * [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs. ##### Fixes diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index 1e8affb389a17..c981de0de3dda 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -111,7 +111,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := tenant.TenantID(r.Context()) - req, err := push.ParseRequest(logger, userID, r, nil, push.ParseLokiRequest) + req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index edb394733a3c5..2a2c51544f013 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3140,6 +3140,17 @@ shard_streams: # Maximum number of structured metadata entries per log line. # CLI flag: -limits.max-structured-metadata-entries-count [max_structured_metadata_entries_count: | default = 128] + +# OTLP log ingestion configurations +otlp_config: + resource_attributes: + [ignore_defaults: ] + + [attributes: ] + + [scope_attributes: ] + + [log_attributes: ] ``` ### frontend_worker diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 67db8e5c5ba7b..ce242355e077b 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe http.Error(w, err.Error(), http.StatusBadRequest) return } - req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, pushRequestParser) + req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser) if err != nil { if d.tenantConfigs.LogPushRequest(tenantID) { level.Debug(logger).Log( diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index add9d17708dbd..6db6995662dd2 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/loki/pkg/compactor/retention" "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/loghttp/push" ) // Limits is an interface for distributor limits/related configs @@ -29,4 +30,5 @@ type Limits interface { AllowStructuredMetadata(userID string) bool MaxStructuredMetadataSize(userID string) int MaxStructuredMetadataCount(userID string) int + OTLPConfig(userID string) push.OTLPConfig } diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 737e9b78ae72a..f4f937b93dc33 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -25,28 +25,9 @@ import ( const ( pbContentType = "application/x-protobuf" gzipContentEncoding = "gzip" + attrServiceName = "service.name" ) -var blessedAttributes = []string{ - "service.name", - "service.namespace", - "service.instance.id", - "deployment.environment", - "cloud.region", - "cloud.availability_zone", - "k8s.cluster.name", - "k8s.namespace.name", - "k8s.pod.name", - "k8s.container.name", - "container.name", - "k8s.replicaset.name", - "k8s.deployment.name", - "k8s.statefulset.name", - "k8s.daemonset.name", - "k8s.cronjob.name", - "k8s.job.name", -} - var blessedAttributesNormalized = make([]string, len(blessedAttributes)) func init() { @@ -62,14 +43,14 @@ func newPushStats() *Stats { } } -func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) { +func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) { stats := newPushStats() otlpLogs, err := extractLogs(r, stats) if err != nil { return nil, nil, err } - req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, stats) + req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), stats) return req, stats, nil } @@ -120,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, stats *Stats) *logproto.PushRequest { +func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, stats *Stats) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -131,29 +112,31 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants for i := 0; i < rls.Len(); i++ { sls := rls.At(i).ScopeLogs() res := rls.At(i).Resource() + resAttrs := res.Attributes() - flattenedResourceAttributes := labels.NewBuilder(logproto.FromLabelAdaptersToLabels(attributesToLabels(res.Attributes(), ""))) - // service.name is a required Resource Attribute. If it is not present, we will set it to "unknown_service". - if flattenedResourceAttributes.Get("service_name") == "" { - flattenedResourceAttributes = flattenedResourceAttributes.Set("service_name", "unknown_service") + if v, _ := resAttrs.Get(attrServiceName); v.AsString() == "" { + resAttrs.PutStr(attrServiceName, "unknown_service") } + resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len()) + streamLabels := make(model.LabelSet, len(blessedAttributesNormalized)) - if dac := res.DroppedAttributesCount(); dac != 0 { - flattenedResourceAttributes = flattenedResourceAttributes.Set("resource_dropped_attributes_count", fmt.Sprintf("%d", dac)) - } + resAttrs.Range(func(k string, v pcommon.Value) bool { + action := otlpConfig.ActionForResourceAttribute(k) + if action == Drop { + return true + } - // copy blessed attributes to stream labels - streamLabels := make(model.LabelSet, len(blessedAttributesNormalized)) - for _, ba := range blessedAttributesNormalized { - v := flattenedResourceAttributes.Get(ba) - if v == "" { - continue + attributeAsLabels := attributeToLabels(k, v, "") + if action == IndexLabel { + for _, lbl := range attributeAsLabels { + streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + } else if action == StructuredMetadata { + resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...) } - streamLabels[model.LabelName(ba)] = model.LabelValue(v) - // remove the blessed attributes copied to stream labels - flattenedResourceAttributes.Del(ba) - } + return true + }) if err := streamLabels.Validate(); err != nil { stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err)) @@ -161,9 +144,6 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } labelsStr := streamLabels.String() - // convert the remaining resource attributes to structured metadata - resourceAttributesAsStructuredMetadata := logproto.FromLabelsToLabelAdapters(flattenedResourceAttributes.Labels()) - lbs := modelLabelsSetToLabelsList(streamLabels) if _, ok := pushRequestsByStream[labelsStr]; !ok { pushRequestsByStream[labelsStr] = logproto.Stream{ @@ -178,6 +158,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants for j := 0; j < sls.Len(); j++ { scope := sls.At(j).Scope() logs := sls.At(j).LogRecords() + scopeAttrs := scope.Attributes() // it would be rare to have multiple scopes so if the entries slice is empty, pre-allocate it for the number of log entries if cap(pushRequestsByStream[labelsStr].Entries) == 0 { @@ -187,7 +168,20 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } // use fields and attributes from scope as structured metadata - scopeAttributesAsStructuredMetadata := attributesToLabels(scope.Attributes(), "") + scopeAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, scopeAttrs.Len()+3) + scopeAttrs.Range(func(k string, v pcommon.Value) bool { + action := otlpConfig.ActionForScopeAttribute(k) + if action == Drop { + return true + } + + attributeAsLabels := attributeToLabels(k, v, "") + if action == StructuredMetadata { + scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, attributeAsLabels...) + } + + return true + }) if scopeName := scope.Name(); scopeName != "" { scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, push.LabelAdapter{ @@ -213,7 +207,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants for k := 0; k < logs.Len(); k++ { log := logs.At(k) - entry := otlpLogToPushEntry(log) + entry := otlpLogToPushEntry(log, otlpConfig) // if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata) @@ -251,9 +245,23 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } // otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry. -func otlpLogToPushEntry(log plog.LogRecord) push.Entry { +func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry { // copy log attributes and all the fields from log(except log.Body) to structured metadata - structuredMetadata := attributesToLabels(log.Attributes(), "") + logAttrs := log.Attributes() + structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7) + logAttrs.Range(func(k string, v pcommon.Value) bool { + action := otlpConfig.ActionForLogAttribute(k) + if action == Drop { + return true + } + + attributeAsLabels := attributeToLabels(k, v, "") + if action == StructuredMetadata { + structuredMetadata = append(structuredMetadata, attributeAsLabels...) + } + + return true + }) // if log.Timestamp() is 0, we would have already stored log.ObservedTimestamp as log timestamp so no need to store again in structured metadata if log.Timestamp() != 0 && log.ObservedTimestamp() != 0 { @@ -316,25 +324,39 @@ func attributesToLabels(attrs pcommon.Map, prefix string) push.LabelsAdapter { } attrs.Range(func(k string, v pcommon.Value) bool { - keyWithPrefix := k - if prefix != "" { - keyWithPrefix = prefix + "_" + k - } - keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix) - - typ := v.Type() - if typ == pcommon.ValueTypeMap { - labelsAdapter = append(labelsAdapter, attributesToLabels(v.Map(), keyWithPrefix)...) - } else { - labelsAdapter = append(labelsAdapter, push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()}) - } - + labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, prefix)...) return true }) return labelsAdapter } +func attributeToLabels(k string, v pcommon.Value, prefix string) push.LabelsAdapter { + var labelsAdapter push.LabelsAdapter + + keyWithPrefix := k + if prefix != "" { + keyWithPrefix = prefix + "_" + k + } + keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix) + + typ := v.Type() + if typ == pcommon.ValueTypeMap { + mv := v.Map() + labelsAdapter = make(push.LabelsAdapter, 0, mv.Len()) + mv.Range(func(k string, v pcommon.Value) bool { + labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, keyWithPrefix)...) + return true + }) + } else { + labelsAdapter = push.LabelsAdapter{ + push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()}, + } + } + + return labelsAdapter +} + func timestampFromLogRecord(lr plog.LogRecord) time.Time { if lr.Timestamp() != 0 { return time.Unix(0, int64(lr.Timestamp())) diff --git a/pkg/loghttp/push/otlp_config.go b/pkg/loghttp/push/otlp_config.go new file mode 100644 index 0000000000000..64120d4a6252e --- /dev/null +++ b/pkg/loghttp/push/otlp_config.go @@ -0,0 +1,166 @@ +package push + +import ( + "fmt" + + "github.com/prometheus/prometheus/model/relabel" +) + +var blessedAttributes = []string{ + "service.name", + "service.namespace", + "service.instance.id", + "deployment.environment", + "cloud.region", + "cloud.availability_zone", + "k8s.cluster.name", + "k8s.namespace.name", + "k8s.pod.name", + "k8s.container.name", + "container.name", + "k8s.replicaset.name", + "k8s.deployment.name", + "k8s.statefulset.name", + "k8s.daemonset.name", + "k8s.cronjob.name", + "k8s.job.name", +} + +// Action is the action to be performed on OTLP Resource Attribute. +type Action string + +const ( + // IndexLabel stores a Resource Attribute as a label in index to identify streams. + IndexLabel Action = "index_label" + // StructuredMetadata stores an Attribute as Structured Metadata with each log entry. + StructuredMetadata Action = "structured_metadata" + // Drop drops Attributes for which the Attribute name does match the regex. + Drop Action = "drop" +) + +var ( + errUnsupportedAction = fmt.Errorf("unsupported action, it must be one of: %s, %s, %s", Drop, IndexLabel, StructuredMetadata) + errAttributesAndRegexNotSet = fmt.Errorf("attributes or regex must be set") + errAttributesAndRegexBothSet = fmt.Errorf("only one of attributes or regex must be set") +) + +var DefaultOTLPConfig = OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: blessedAttributes, + }, + }, + }, +} + +type OTLPConfig struct { + ResourceAttributes ResourceAttributesConfig `yaml:"resource_attributes,omitempty"` + ScopeAttributes []AttributesConfig `yaml:"scope_attributes,omitempty"` + LogAttributes []AttributesConfig `yaml:"log_attributes,omitempty"` +} + +func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultOTLPConfig + type plain OTLPConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + return nil +} + +func (c *OTLPConfig) actionForAttribute(attribute string, cfgs []AttributesConfig) Action { + for i := 0; i < len(cfgs); i++ { + if cfgs[i].Regex.Regexp != nil && cfgs[i].Regex.MatchString(attribute) { + return cfgs[i].Action + } + for _, cfgAttr := range cfgs[i].Attributes { + if cfgAttr == attribute { + return cfgs[i].Action + } + } + } + + return StructuredMetadata +} + +func (c *OTLPConfig) ActionForResourceAttribute(attribute string) Action { + return c.actionForAttribute(attribute, c.ResourceAttributes.AttributesConfig) +} + +func (c *OTLPConfig) ActionForScopeAttribute(attribute string) Action { + return c.actionForAttribute(attribute, c.ScopeAttributes) +} + +func (c *OTLPConfig) ActionForLogAttribute(attribute string) Action { + return c.actionForAttribute(attribute, c.LogAttributes) +} + +func (c *OTLPConfig) Validate() error { + for _, ac := range c.ScopeAttributes { + if ac.Action == IndexLabel { + return fmt.Errorf("%s action is only supported for resource_attributes", IndexLabel) + } + } + + for _, ac := range c.LogAttributes { + if ac.Action == IndexLabel { + return fmt.Errorf("%s action is only supported for resource_attributes", IndexLabel) + } + } + + return nil +} + +type AttributesConfig struct { + Action Action `yaml:"action,omitempty"` + Attributes []string `yaml:"attributes,omitempty"` + Regex relabel.Regexp `yaml:"regex,omitempty"` +} + +func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain AttributesConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + if c.Action == "" { + c.Action = StructuredMetadata + } + + if c.Action != IndexLabel && c.Action != StructuredMetadata && c.Action != Drop { + return errUnsupportedAction + } + + if len(c.Attributes) == 0 && c.Regex.Regexp == nil { + return errAttributesAndRegexNotSet + } + + if len(c.Attributes) != 0 && c.Regex.Regexp != nil { + return errAttributesAndRegexBothSet + } + + return nil +} + +type ResourceAttributesConfig struct { + IgnoreDefaults bool `yaml:"ignore_defaults,omitempty"` + AttributesConfig []AttributesConfig `yaml:"attributes,omitempty"` +} + +func (c *ResourceAttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain ResourceAttributesConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + if !c.IgnoreDefaults { + c.AttributesConfig = append([]AttributesConfig{ + DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0], + }, c.AttributesConfig...) + } + + return nil +} diff --git a/pkg/loghttp/push/otlp_config_test.go b/pkg/loghttp/push/otlp_config_test.go new file mode 100644 index 0000000000000..a1cfc15ff52c8 --- /dev/null +++ b/pkg/loghttp/push/otlp_config_test.go @@ -0,0 +1,355 @@ +package push + +import ( + "testing" + + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestUnmarshalOTLPConfig(t *testing.T) { + for _, tc := range []struct { + name string + yamlConfig []byte + expectedCfg OTLPConfig + expectedErr error + }{ + { + name: "only resource_attributes set", + yamlConfig: []byte(` +resource_attributes: + attributes: + - action: index_label + regex: foo`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0], + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + }, + }, + }, + { + name: "resource_attributes with defaults ignored", + yamlConfig: []byte(` +resource_attributes: + ignore_defaults: true + attributes: + - action: index_label + regex: foo`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + IgnoreDefaults: true, + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + }, + }, + }, + { + name: "resource_attributes not set", + yamlConfig: []byte(` +scope_attributes: + - action: drop + attributes: + - fizz`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: blessedAttributes, + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"fizz"}, + }, + }, + }, + }, + { + name: "all 3 set", + yamlConfig: []byte(` +resource_attributes: + attributes: + - action: index_label + regex: foo +scope_attributes: + - action: drop + attributes: + - fizz +log_attributes: + - action: structured_metadata + attributes: + - buzz`), + expectedCfg: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0], + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"fizz"}, + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Attributes: []string{"buzz"}, + }, + }, + }, + }, + { + name: "unsupported action should error", + yamlConfig: []byte(` +log_attributes: + - action: keep + attributes: + - fizz`), + expectedErr: errUnsupportedAction, + }, + { + name: "attributes and regex both not set should error", + yamlConfig: []byte(` +log_attributes: + - action: drop`), + expectedErr: errAttributesAndRegexNotSet, + }, + { + name: "attributes and regex both being set should error", + yamlConfig: []byte(` +log_attributes: + - action: drop + regex: foo + attributes: + - fizz`), + expectedErr: errAttributesAndRegexBothSet, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := OTLPConfig{} + err := yaml.UnmarshalStrict(tc.yamlConfig, &cfg) + if tc.expectedErr != nil { + require.ErrorIs(t, err, tc.expectedErr) + return + } + require.Equal(t, tc.expectedCfg, cfg) + }) + } +} + +func TestOTLPConfig(t *testing.T) { + type attrAndExpAction struct { + attr string + expectedAction Action + } + + for _, tc := range []struct { + name string + otlpConfig OTLPConfig + resAttrs []attrAndExpAction + scopeAttrs []attrAndExpAction + logAttrs []attrAndExpAction + }{ + { + name: "default OTLPConfig", + otlpConfig: DefaultOTLPConfig, + resAttrs: []attrAndExpAction{ + { + attr: attrServiceName, + expectedAction: IndexLabel, + }, + { + attr: "not_blessed", + expectedAction: StructuredMetadata, + }, + }, + scopeAttrs: []attrAndExpAction{ + { + attr: "method", + expectedAction: StructuredMetadata, + }, + }, + logAttrs: []attrAndExpAction{ + { + attr: "user_id", + expectedAction: StructuredMetadata, + }, + }, + }, + { + name: "drop everything except a few attrs", + otlpConfig: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: []string{attrServiceName}, + }, + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp("^foo.*"), + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Attributes: []string{"method"}, + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Attributes: []string{"user_id"}, + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + resAttrs: []attrAndExpAction{ + { + attr: attrServiceName, + expectedAction: IndexLabel, + }, + { + attr: "foo_bar", + expectedAction: StructuredMetadata, + }, + { + attr: "ping_foo", + expectedAction: Drop, + }, + }, + scopeAttrs: []attrAndExpAction{ + { + attr: "method", + expectedAction: StructuredMetadata, + }, + { + attr: "version", + expectedAction: Drop, + }, + }, + logAttrs: []attrAndExpAction{ + { + attr: "user_id", + expectedAction: StructuredMetadata, + }, + { + attr: "order_id", + expectedAction: Drop, + }, + }, + }, + { + name: "keep everything except a few attrs", + otlpConfig: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{attrServiceName}, + }, + { + Action: IndexLabel, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"method"}, + }, + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"user_id"}, + }, + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + resAttrs: []attrAndExpAction{ + { + attr: attrServiceName, + expectedAction: Drop, + }, + { + attr: "foo_bar", + expectedAction: IndexLabel, + }, + }, + scopeAttrs: []attrAndExpAction{ + { + attr: "method", + expectedAction: Drop, + }, + { + attr: "version", + expectedAction: StructuredMetadata, + }, + }, + logAttrs: []attrAndExpAction{ + { + attr: "user_id", + expectedAction: Drop, + }, + { + attr: "order_id", + expectedAction: StructuredMetadata, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + for _, c := range tc.resAttrs { + require.Equal(t, c.expectedAction, tc.otlpConfig.ActionForResourceAttribute(c.attr)) + } + + for _, c := range tc.scopeAttrs { + require.Equal(t, c.expectedAction, tc.otlpConfig.ActionForScopeAttribute(c.attr)) + } + + for _, c := range tc.logAttrs { + require.Equal(t, c.expectedAction, tc.otlpConfig.ActionForLogAttribute(c.attr)) + } + }) + } +} diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index 8d02485833775..d817c933a43d5 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -23,6 +24,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { generateLogs func() plog.Logs expectedPushRequest logproto.PushRequest expectedStats Stats + otlpConfig OTLPConfig }{ { name: "no logs", @@ -31,6 +33,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedPushRequest: logproto.PushRequest{}, expectedStats: *newPushStats(), + otlpConfig: DefaultOTLPConfig, }, { name: "resource with no logs", @@ -41,9 +44,11 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedPushRequest: logproto.PushRequest{}, expectedStats: *newPushStats(), + otlpConfig: DefaultOTLPConfig, }, { - name: "resource with a log entry", + name: "resource with a log entry", + otlpConfig: DefaultOTLPConfig, generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.name", "service-1") @@ -78,7 +83,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, { - name: "resource attributes and scope attributes stored as structured metadata", + name: "resource attributes and scope attributes stored as structured metadata", + otlpConfig: DefaultOTLPConfig, generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty() @@ -152,7 +158,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, { - name: "attributes with nested data", + name: "attributes with nested data", + otlpConfig: DefaultOTLPConfig, generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty() @@ -234,10 +241,153 @@ func TestOTLPToLokiPushRequest(t *testing.T) { mostRecentEntryTimestamp: now, }, }, + { + name: "custom otlp config", + otlpConfig: OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: []string{"pod.name"}, + }, { + Action: IndexLabel, + Regex: relabel.MustNewRegexp("service.*"), + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp("drop.*"), + }, + { + Action: StructuredMetadata, + Attributes: []string{"resource.nested"}, + }, + }, + }, + ScopeAttributes: []AttributesConfig{ + { + Action: Drop, + Attributes: []string{"drop.function"}, + }, + }, + LogAttributes: []AttributesConfig{ + { + Action: StructuredMetadata, + Regex: relabel.MustNewRegexp(".*_id"), + }, + { + Action: Drop, + Regex: relabel.MustNewRegexp(".*"), + }, + }, + }, + generateLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty() + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("service.name", "service-1") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("pod.name", "service-1-abc") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("pod.ip", "10.200.200.200") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("drop.service.addr", "192.168.0.1") + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("drop.service.version", "v1") + ld.ResourceLogs().At(0).Resource().Attributes().PutEmptyMap("resource.nested").PutStr("foo", "bar") + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty() + ld.ResourceLogs().At(0).ScopeLogs().At(0).Scope().SetName("fizz") + ld.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("drop.function", "login") + ld.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutEmptyMap("scope.nested").PutStr("foo", "bar") + for i := 0; i < 2; i++ { + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty() + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().SetStr(fmt.Sprintf("test body - %d", i)) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).SetTimestamp(pcommon.Timestamp(now.UnixNano())) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().PutStr("user_id", "u1") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().PutStr("order_id", "o1") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().PutEmptyMap("drop.log.nested").PutStr("foo", fmt.Sprintf("bar - %d", i)) + } + return ld + }, + expectedPushRequest: logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{pod_name="service-1-abc", service_name="service-1"}`, + Entries: []logproto.Entry{ + { + Timestamp: now, + Line: "test body - 0", + StructuredMetadata: push.LabelsAdapter{ + { + Name: "user_id", + Value: "u1", + }, + { + Name: "order_id", + Value: "o1", + }, + { + Name: "pod_ip", + Value: "10.200.200.200", + }, + { + Name: "resource_nested_foo", + Value: "bar", + }, + { + Name: "scope_nested_foo", + Value: "bar", + }, + { + Name: "scope_name", + Value: "fizz", + }, + }, + }, + { + Timestamp: now, + Line: "test body - 1", + StructuredMetadata: push.LabelsAdapter{ + { + Name: "user_id", + Value: "u1", + }, + { + Name: "order_id", + Value: "o1", + }, + { + Name: "pod_ip", + Value: "10.200.200.200", + }, + { + Name: "resource_nested_foo", + Value: "bar", + }, + { + Name: "scope_nested_foo", + Value: "bar", + }, + { + Name: "scope_name", + Value: "fizz", + }, + }, + }, + }, + }, + }, + }, + expectedStats: Stats{ + numLines: 2, + logLinesBytes: map[time.Duration]int64{ + time.Hour: 26, + }, + structuredMetadataBytes: map[time.Duration]int64{ + time.Hour: 113, + }, + streamLabelsSize: 42, + mostRecentEntryTimestamp: now, + }, + }, } { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() - pushReq := otlpToLokiPushRequest(tc.generateLogs(), "foo", fakeRetention{}, stats) + pushReq := otlpToLokiPushRequest(tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, stats) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) }) @@ -324,7 +474,7 @@ func TestOTLPLogToPushEntry(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord())) + require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig)) }) } diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index dffa5ab1a05e3..15b7bba0a78c9 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -58,7 +58,11 @@ type TenantsRetention interface { RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration } -type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) +type Limits interface { + OTLPConfig(userID string) OTLPConfig +} + +type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) type Stats struct { errs []error @@ -72,8 +76,8 @@ type Stats struct { bodySize int64 } -func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, pushRequestParser RequestParser) (*logproto.PushRequest, error) { - req, pushStats, err := pushRequestParser(userID, r, tenantsRetention) +func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser) (*logproto.PushRequest, error) { + req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits) if err != nil { return nil, err } @@ -131,7 +135,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 286b0e013a241..fa1e2fb28d115 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -200,7 +200,7 @@ func TestParseRequest(t *testing.T) { request.Header.Add("Content-Encoding", test.contentEncoding) } - data, err := ParseRequest(util_log.Logger, "fake", request, nil, ParseLokiRequest) + data, err := ParseRequest(util_log.Logger, "fake", request, nil, nil, ParseLokiRequest) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index ac25798c33e31..e3052c1781b89 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/compactor/deletionmode" "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logql/syntax" ruler_config "github.com/grafana/loki/pkg/ruler/config" "github.com/grafana/loki/pkg/ruler/util" @@ -199,6 +200,7 @@ type Limits struct { AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` MaxStructuredMetadataEntriesCount int `yaml:"max_structured_metadata_entries_count" json:"max_structured_metadata_entries_count" doc:"description=Maximum number of structured metadata entries per log line."` + OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"` } type StreamRetention struct { @@ -341,7 +343,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxStructuredMetadataSize.Set(defaultMaxStructuredMetadataSize) f.Var(&l.MaxStructuredMetadataSize, "limits.max-structured-metadata-size", "Maximum size accepted for structured metadata per entry. Default: 64 kb. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.") f.IntVar(&l.MaxStructuredMetadataEntriesCount, "limits.max-structured-metadata-entries-count", defaultMaxStructuredMetadataCount, "Maximum number of structured metadata entries per log line. Default: 128. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.") - + l.OTLPConfig = push.DefaultOTLPConfig } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -398,6 +400,10 @@ func (l *Limits) Validate() error { l.MaxQueryCapacity = 1 } + if err := l.OTLPConfig.Validate(); err != nil { + return err + } + return nil } @@ -892,6 +898,10 @@ func (o *Overrides) MaxStructuredMetadataCount(userID string) int { return o.getOverridesForUser(userID).MaxStructuredMetadataEntriesCount } +func (o *Overrides) OTLPConfig(userID string) push.OTLPConfig { + return o.getOverridesForUser(userID).OTLPConfig +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 908531f9858f6..0fb6dcbae2ef0 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -12,6 +12,7 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/compactor/deletionmode" + "github.com/grafana/loki/pkg/loghttp/push" ) func TestLimitsTagsYamlMatchJson(t *testing.T) { @@ -173,6 +174,18 @@ func TestLimitsDoesNotMutate(t *testing.T) { defaultLimits = initialDefault }() + defaultOTLPConfig := push.OTLPConfig{ + ResourceAttributes: push.ResourceAttributesConfig{ + IgnoreDefaults: true, + AttributesConfig: []push.AttributesConfig{ + { + Action: push.IndexLabel, + Attributes: []string{"pod"}, + }, + }, + }, + } + // Set new defaults with non-nil values for non-scalar types newDefaults := Limits{ RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, @@ -182,6 +195,7 @@ func TestLimitsDoesNotMutate(t *testing.T) { Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, } SetDefaultLimitsForYAMLUnmarshalling(newDefaults) @@ -206,6 +220,7 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -222,6 +237,7 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -241,6 +257,7 @@ retention_stream: // Rest from new defaults RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -259,6 +276,7 @@ reject_old_samples: true Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, { @@ -277,6 +295,7 @@ query_timeout: 5m Selector: `{a="b"}`, }, }, + OTLPConfig: defaultOTLPConfig, }, }, } {