diff --git a/CHANGELOG.md b/CHANGELOG.md index ca0216ae345e5..fdc5c9c29ed06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ * [11897](https://github.com/grafana/loki/pull/11897) **ashwanthgoli** Metadata: Introduces a separate split interval of `split_recent_metadata_queries_by_interval` for `recent_metadata_query_window` to help with caching recent metadata query results. * [11970](https://github.com/grafana/loki/pull/11897) **masslessparticle** Ksonnet: Introduces memory limits to the compactor configuration to avoid unbounded memory usage. * [12318](https://github.com/grafana/loki/pull/12318) **DylanGuedes** Memcached: Add mTLS support. +* [12392](https://github.com/grafana/loki/pull/12392) **sandeepsukhani** Detect name of service emitting logs and add it as a label. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 6a4fd280c0a59..dc0716a34fa23 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2824,6 +2824,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -validation.increment-duplicate-timestamps [increment_duplicate_timestamp: | default = false] +# If no service_name label exists, Loki maps a single label from the configured +# list to service_name. If none of the configured labels exist in the stream, +# label is set to unknown_service. Empty list disables setting the label. +# CLI flag: -validation.discover-service-name +[discover_service_name: | default = [service app application name app_kubernetes_io_name container container_name component workload job]] + # Maximum number of active streams per user, per ingester. 0 to disable. # CLI flag: -ingester.max-streams-per-user [max_streams_per_user: | default = 0] diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index c7a0ba2d17dd7..5e29413a68c62 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -62,6 +62,7 @@ limits_config: ingestion_burst_size_mb: 50 reject_old_samples: false allow_structured_metadata: true + discover_service_name: otlp_config: resource_attributes: attributes_config: diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 818c0fe735ae8..9b34913d42a19 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -54,6 +54,9 @@ const ( ringKey = "distributor" ringAutoForgetUnhealthyPeriods = 2 + + labelServiceName = "service_name" + serviceUnknown = "unknown_service" ) var ( @@ -348,7 +351,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log d.truncateLines(validationContext, &stream) var lbs labels.Labels - lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) + lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream) if err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) @@ -425,7 +428,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log if d.usageTracker != nil { for _, stream := range req.Streams { - lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, &stream) + lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream) if err != nil { continue } @@ -717,7 +720,7 @@ type labelData struct { hash uint64 } -func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (labels.Labels, string, uint64, error) { +func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, error) { if val, ok := d.labelCache.Get(key); ok { labelVal := val.(labelData) return labelVal.ls, labelVal.ls.String(), labelVal.hash, nil @@ -728,10 +731,24 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) } - if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil { + if err := d.validator.ValidateLabels(vContext, ls, stream); err != nil { return nil, "", 0, err } + // We do not want to count service_name added by us in the stream limit so adding it after validating original labels. + if !ls.Has(labelServiceName) && len(vContext.discoverServiceName) > 0 { + serviceName := serviceUnknown + for _, labelName := range vContext.discoverServiceName { + if labelVal := ls.Get(labelName); labelVal != "" { + serviceName = labelVal + break + } + } + + ls = labels.NewBuilder(ls).Set(labelServiceName, serviceName).Labels() + stream.Labels = ls.String() + } + lsHash := ls.Hash() d.labelCache.Add(key, labelData{ls, lsHash}) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 04747ffb72334..81a7fb09b94a5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -98,6 +99,7 @@ func TestDistributor(t *testing.T) { t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil limits.IngestionRateMB = ingestionRateLimit limits.IngestionBurstSizeMB = ingestionRateLimit limits.MaxLineSize = fe.ByteSize(tc.maxLineSize) @@ -134,13 +136,19 @@ func TestDistributor(t *testing.T) { func Test_IncrementTimestamp(t *testing.T) { incrementingDisabled := &validation.Limits{} flagext.DefaultValues(incrementingDisabled) + incrementingDisabled.DiscoverServiceName = nil incrementingDisabled.RejectOldSamples = false incrementingEnabled := &validation.Limits{} flagext.DefaultValues(incrementingEnabled) + incrementingEnabled.DiscoverServiceName = nil incrementingEnabled.RejectOldSamples = false incrementingEnabled.IncrementDuplicateTimestamp = true + defaultLimits := &validation.Limits{} + flagext.DefaultValues(defaultLimits) + now := time.Now() + tests := map[string]struct { limits *validation.Limits push *logproto.PushRequest @@ -386,6 +394,34 @@ func Test_IncrementTimestamp(t *testing.T) { }, }, }, + "default limit adding service_name label": { + limits: defaultLimits, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {Timestamp: now.Add(-2 * time.Second), Line: "hey1"}, + {Timestamp: now.Add(-time.Second), Line: "hey2"}, + {Timestamp: now, Line: "hey3"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\", service_name=\"foo\"}", + Hash: 0x86ca305b6d86e8b0, + Entries: []logproto.Entry{ + {Timestamp: now.Add(-2 * time.Second), Line: "hey1"}, + {Timestamp: now.Add(-time.Second), Line: "hey2"}, + {Timestamp: now, Line: "hey3"}, + }, + }, + }, + }, + }, } for testName, testData := range tests { @@ -405,6 +441,7 @@ func Test_IncrementTimestamp(t *testing.T) { func TestDistributorPushConcurrently(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil distributors, ingesters := prepare(t, 1, 5, limits, nil) @@ -497,6 +534,7 @@ func TestDistributorPushErrors(t *testing.T) { func Test_SortLabelsOnPush(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil ingester := &mockIngester{} distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) @@ -788,13 +826,136 @@ func Benchmark_SortLabelsOnPush(b *testing.B) { for n := 0; n < b.N; n++ { stream := request.Streams[0] stream.Labels = `{buzz="f", a="b"}` - _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream) + _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream) if err != nil { panic("parseStreamLabels fail,err:" + err.Error()) } } } +func TestParseStreamLabels(t *testing.T) { + defaultLimit := &validation.Limits{} + flagext.DefaultValues(defaultLimit) + + for _, tc := range []struct { + name string + origLabels string + expectedLabels labels.Labels + expectedErr error + generateLimits func() *validation.Limits + }{ + { + name: "service name label mapping disabled", + generateLimits: func() *validation.Limits { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil + return limits + }, + origLabels: `{foo="bar"}`, + expectedLabels: labels.Labels{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + { + name: "no labels defined - service name label mapping disabled", + generateLimits: func() *validation.Limits { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.DiscoverServiceName = nil + return limits + }, + origLabels: `{}`, + expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg), + }, + { + name: "service name label enabled", + origLabels: `{foo="bar"}`, + generateLimits: func() *validation.Limits { + return defaultLimit + }, + expectedLabels: labels.Labels{ + { + Name: "foo", + Value: "bar", + }, + { + Name: labelServiceName, + Value: serviceUnknown, + }, + }, + }, + { + name: "service name label should not get counted against max labels count", + origLabels: `{foo="bar"}`, + generateLimits: func() *validation.Limits { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.MaxLabelNamesPerSeries = 1 + return limits + }, + expectedLabels: labels.Labels{ + { + Name: "foo", + Value: "bar", + }, + { + Name: labelServiceName, + Value: serviceUnknown, + }, + }, + }, + { + name: "use label service as service name", + origLabels: `{container="nginx", foo="bar", service="auth"}`, + generateLimits: func() *validation.Limits { + return defaultLimit + }, + expectedLabels: labels.Labels{ + { + Name: "container", + Value: "nginx", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "service", + Value: "auth", + }, + { + Name: labelServiceName, + Value: "auth", + }, + }, + }, + } { + limits := tc.generateLimits() + distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) + d := distributors[0] + + vCtx := d.validator.getValidationContextForTime(testTime, "123") + + t.Run(tc.name, func(t *testing.T) { + lbs, lbsString, hash, err := d.parseStreamLabels(vCtx, tc.origLabels, logproto.Stream{ + Labels: tc.origLabels, + }) + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedLabels.String(), lbsString) + require.Equal(t, tc.expectedLabels, lbs) + require.Equal(t, tc.expectedLabels.Hash(), hash) + }) + } +} + func Benchmark_Push(b *testing.B) { limits := &validation.Limits{} flagext.DefaultValues(limits) diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 6db6995662dd2..d2f655f1c8329 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -22,6 +22,7 @@ type Limits interface { RejectOldSamplesMaxAge(userID string) time.Duration IncrementDuplicateTimestamps(userID string) bool + DiscoverServiceName(userID string) []string ShardStreams(userID string) *shardstreams.Config IngestionRateStrategy() string diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 7e7006c836201..ca2186c1d2626 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -43,6 +43,7 @@ type validationContext struct { maxLabelValueLength int incrementDuplicateTimestamps bool + discoverServiceName []string allowStructuredMetadata bool maxStructuredMetadataSize int @@ -63,6 +64,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val maxLabelNameLength: v.MaxLabelNameLength(userID), maxLabelValueLength: v.MaxLabelValueLength(userID), incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID), + discoverServiceName: v.DiscoverServiceName(userID), allowStructuredMetadata: v.AllowStructuredMetadata(userID), maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 757f88d25c608..8c2197113a41f 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -80,6 +80,7 @@ type Limits struct { MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"` MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"` + DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"` // Ingester enforced limits. MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` @@ -240,6 +241,19 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 15, "Maximum number of label names per series.") f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", true, "Whether or not old samples will be rejected.") f.BoolVar(&l.IncrementDuplicateTimestamp, "validation.increment-duplicate-timestamps", false, "Alter the log line timestamp during ingestion when the timestamp is the same as the previous entry for the same stream. When enabled, if a log line in a push request has the same timestamp as the previous line for the same stream, one nanosecond is added to the log line. This will preserve the received order of log lines with the exact same timestamp when they are queried, by slightly altering their stored timestamp. NOTE: This is imperfect, because Loki accepts out of order writes, and another push request for the same stream could contain duplicate timestamps to existing entries and they will not be incremented.") + l.DiscoverServiceName = []string{ + "service", + "app", + "application", + "name", + "app_kubernetes_io_name", + "container", + "container_name", + "component", + "workload", + "job", + } + f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.") _ = l.RejectOldSamplesMaxAge.Set("7d") f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.") @@ -897,6 +911,10 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool { return o.getOverridesForUser(userID).IncrementDuplicateTimestamp } +func (o *Overrides) DiscoverServiceName(userID string) []string { + return o.getOverridesForUser(userID).DiscoverServiceName +} + // VolumeEnabled returns whether volume endpoints are enabled for a user. func (o *Overrides) VolumeEnabled(userID string) bool { return o.getOverridesForUser(userID).VolumeEnabled diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 9096d9b179444..d8527bcdaa59c 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -215,6 +215,7 @@ ruler_remote_write_headers: `, exp: Limits{ RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"foo": "bar"}}, + DiscoverServiceName: []string{}, // Rest from new defaults StreamRetention: []StreamRetention{ @@ -232,6 +233,7 @@ ruler_remote_write_headers: ruler_remote_write_headers: `, exp: Limits{ + DiscoverServiceName: []string{}, // Rest from new defaults StreamRetention: []StreamRetention{ @@ -251,6 +253,7 @@ retention_stream: selector: '{foo="bar"}' `, exp: Limits{ + DiscoverServiceName: []string{}, StreamRetention: []StreamRetention{ { Period: model.Duration(24 * time.Hour), @@ -269,7 +272,8 @@ retention_stream: reject_old_samples: true `, exp: Limits{ - RejectOldSamples: true, + RejectOldSamples: true, + DiscoverServiceName: []string{}, // Rest from new defaults RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, @@ -288,7 +292,8 @@ reject_old_samples: true query_timeout: 5m `, exp: Limits{ - QueryTimeout: model.Duration(5 * time.Minute), + DiscoverServiceName: []string{}, + QueryTimeout: model.Duration(5 * time.Minute), // Rest from new defaults. RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},