Skip to content

Commit

Permalink
feat: discovery of name of services emitting the logs (#12392)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Mar 29, 2024
1 parent 6e1547f commit 288c006
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [11897](https://github.com/grafana/loki/pull/11897) **ashwanthgoli** Metadata: Introduces a separate split interval of `split_recent_metadata_queries_by_interval` for `recent_metadata_query_window` to help with caching recent metadata query results.
* [11970](https://github.com/grafana/loki/pull/11897) **masslessparticle** Ksonnet: Introduces memory limits to the compactor configuration to avoid unbounded memory usage.
* [12318](https://github.com/grafana/loki/pull/12318) **DylanGuedes** Memcached: Add mTLS support.
* [12392](https://github.com/grafana/loki/pull/12392) **sandeepsukhani** Detect name of service emitting logs and add it as a label.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2824,6 +2824,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -validation.increment-duplicate-timestamps
[increment_duplicate_timestamp: <boolean> | default = false]

# If no service_name label exists, Loki maps a single label from the configured
# list to service_name. If none of the configured labels exist in the stream,
# label is set to unknown_service. Empty list disables setting the label.
# CLI flag: -validation.discover-service-name
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name component workload job]]

# Maximum number of active streams per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-streams-per-user
[max_streams_per_user: <int> | default = 0]
Expand Down
1 change: 1 addition & 0 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ limits_config:
ingestion_burst_size_mb: 50
reject_old_samples: false
allow_structured_metadata: true
discover_service_name:
otlp_config:
resource_attributes:
attributes_config:
Expand Down
25 changes: 21 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ const (
ringKey = "distributor"

ringAutoForgetUnhealthyPeriods = 2

labelServiceName = "service_name"
serviceUnknown = "unknown_service"
)

var (
Expand Down Expand Up @@ -348,7 +351,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
d.truncateLines(validationContext, &stream)

var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
Expand Down Expand Up @@ -425,7 +428,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, &stream)
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
continue
}
Expand Down Expand Up @@ -717,7 +720,7 @@ type labelData struct {
hash uint64
}

func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (labels.Labels, string, uint64, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
labelVal := val.(labelData)
return labelVal.ls, labelVal.ls.String(), labelVal.hash, nil
Expand All @@ -728,10 +731,24 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}

if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
if err := d.validator.ValidateLabels(vContext, ls, stream); err != nil {
return nil, "", 0, err
}

// We do not want to count service_name added by us in the stream limit so adding it after validating original labels.
if !ls.Has(labelServiceName) && len(vContext.discoverServiceName) > 0 {
serviceName := serviceUnknown
for _, labelName := range vContext.discoverServiceName {
if labelVal := ls.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

ls = labels.NewBuilder(ls).Set(labelServiceName, serviceName).Labels()
stream.Labels = ls.String()
}

lsHash := ls.Hash()

d.labelCache.Add(key, labelData{ls, lsHash})
Expand Down
163 changes: 162 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestDistributor(t *testing.T) {
t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)
Expand Down Expand Up @@ -134,13 +136,19 @@ func TestDistributor(t *testing.T) {
func Test_IncrementTimestamp(t *testing.T) {
incrementingDisabled := &validation.Limits{}
flagext.DefaultValues(incrementingDisabled)
incrementingDisabled.DiscoverServiceName = nil
incrementingDisabled.RejectOldSamples = false

incrementingEnabled := &validation.Limits{}
flagext.DefaultValues(incrementingEnabled)
incrementingEnabled.DiscoverServiceName = nil
incrementingEnabled.RejectOldSamples = false
incrementingEnabled.IncrementDuplicateTimestamp = true

defaultLimits := &validation.Limits{}
flagext.DefaultValues(defaultLimits)
now := time.Now()

tests := map[string]struct {
limits *validation.Limits
push *logproto.PushRequest
Expand Down Expand Up @@ -386,6 +394,34 @@ func Test_IncrementTimestamp(t *testing.T) {
},
},
},
"default limit adding service_name label": {
limits: defaultLimits,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\", service_name=\"foo\"}",
Hash: 0x86ca305b6d86e8b0,
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
},
}

for testName, testData := range tests {
Expand All @@ -405,6 +441,7 @@ func Test_IncrementTimestamp(t *testing.T) {
func TestDistributorPushConcurrently(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil

distributors, ingesters := prepare(t, 1, 5, limits, nil)

Expand Down Expand Up @@ -497,6 +534,7 @@ func TestDistributorPushErrors(t *testing.T) {
func Test_SortLabelsOnPush(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
ingester := &mockIngester{}
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

Expand Down Expand Up @@ -788,13 +826,136 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
}
}

func TestParseStreamLabels(t *testing.T) {
defaultLimit := &validation.Limits{}
flagext.DefaultValues(defaultLimit)

for _, tc := range []struct {
name string
origLabels string
expectedLabels labels.Labels
expectedErr error
generateLimits func() *validation.Limits
}{
{
name: "service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{foo="bar"}`,
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
},
},
{
name: "no labels defined - service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{}`,
expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg),
},
{
name: "service name label enabled",
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
{
Name: labelServiceName,
Value: serviceUnknown,
},
},
},
{
name: "service name label should not get counted against max labels count",
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.MaxLabelNamesPerSeries = 1
return limits
},
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
{
Name: labelServiceName,
Value: serviceUnknown,
},
},
},
{
name: "use label service as service name",
origLabels: `{container="nginx", foo="bar", service="auth"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "container",
Value: "nginx",
},
{
Name: "foo",
Value: "bar",
},
{
Name: "service",
Value: "auth",
},
{
Name: labelServiceName,
Value: "auth",
},
},
},
} {
limits := tc.generateLimits()
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
d := distributors[0]

vCtx := d.validator.getValidationContextForTime(testTime, "123")

t.Run(tc.name, func(t *testing.T) {
lbs, lbsString, hash, err := d.parseStreamLabels(vCtx, tc.origLabels, logproto.Stream{
Labels: tc.origLabels,
})
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedLabels.String(), lbsString)
require.Equal(t, tc.expectedLabels, lbs)
require.Equal(t, tc.expectedLabels.Hash(), hash)
})
}
}

func Benchmark_Push(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Limits interface {
RejectOldSamplesMaxAge(userID string) time.Duration

IncrementDuplicateTimestamps(userID string) bool
DiscoverServiceName(userID string) []string

ShardStreams(userID string) *shardstreams.Config
IngestionRateStrategy() string
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type validationContext struct {
maxLabelValueLength int

incrementDuplicateTimestamps bool
discoverServiceName []string

allowStructuredMetadata bool
maxStructuredMetadataSize int
Expand All @@ -63,6 +64,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
discoverServiceName: v.DiscoverServiceName(userID),
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
Expand Down
18 changes: 18 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Limits struct {
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`

// Ingester enforced limits.
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
Expand Down Expand Up @@ -240,6 +241,19 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 15, "Maximum number of label names per series.")
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", true, "Whether or not old samples will be rejected.")
f.BoolVar(&l.IncrementDuplicateTimestamp, "validation.increment-duplicate-timestamps", false, "Alter the log line timestamp during ingestion when the timestamp is the same as the previous entry for the same stream. When enabled, if a log line in a push request has the same timestamp as the previous line for the same stream, one nanosecond is added to the log line. This will preserve the received order of log lines with the exact same timestamp when they are queried, by slightly altering their stored timestamp. NOTE: This is imperfect, because Loki accepts out of order writes, and another push request for the same stream could contain duplicate timestamps to existing entries and they will not be incremented.")
l.DiscoverServiceName = []string{
"service",
"app",
"application",
"name",
"app_kubernetes_io_name",
"container",
"container_name",
"component",
"workload",
"job",
}
f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.")

_ = l.RejectOldSamplesMaxAge.Set("7d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
Expand Down Expand Up @@ -897,6 +911,10 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).IncrementDuplicateTimestamp
}

func (o *Overrides) DiscoverServiceName(userID string) []string {
return o.getOverridesForUser(userID).DiscoverServiceName
}

// VolumeEnabled returns whether volume endpoints are enabled for a user.
func (o *Overrides) VolumeEnabled(userID string) bool {
return o.getOverridesForUser(userID).VolumeEnabled
Expand Down
Loading

0 comments on commit 288c006

Please sign in to comment.