diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 4c99c3b4c8d1a..cce92e24eb2f1 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -validation.discover-service-name [discover_service_name: | default = [service app application name app_kubernetes_io_name container container_name component workload job]] +# Discover and add log levels during ingestion, if not present already. Levels +# would be added to Structured Metadata with name 'level' and one of the values +# from 'debug', 'info', 'warn', 'error', 'critical', 'fatal'. +# CLI flag: -validation.discover-log-levels +[discover_log_levels: | default = false] + # Maximum number of active streams per user, per ingester. 0 to disable. # CLI flag: -ingester.max-streams-per-user [max_streams_per_user: | default = 0] diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9b34913d42a19..01f77e320b5a6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/prometheus/prometheus/model/labels" + "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc/codes" "github.com/grafana/dskit/httpgrpc" @@ -57,6 +58,13 @@ const ( labelServiceName = "service_name" serviceUnknown = "unknown_service" + labelLevel = "level" + logLevelDebug = "debug" + logLevelInfo = "info" + logLevelWarn = "warn" + logLevelError = "error" + logLevelFatal = "fatal" + logLevelCritical = "critical" ) var ( @@ -367,6 +375,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log n := 0 pushSize := 0 prevTs := stream.Entries[0].Timestamp + addLogLevel := validationContext.allowStructuredMetadata && validationContext.discoverLogLevels && !lbs.Has(labelLevel) for _, entry := range stream.Entries { if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil { d.writeFailuresManager.Log(tenantID, err) @@ -374,6 +383,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log continue } + structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata) + if addLogLevel && !structuredMetadata.Has(labelLevel) { + logLevel := detectLogLevelFromLogEntry(entry, structuredMetadata) + entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{ + Name: labelLevel, + Value: logLevel, + }) + } stream.Entries[n] = entry // If configured for this tenant, increment duplicate timestamps. Note, this is imperfect @@ -838,3 +855,56 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l func (d *Distributor) HealthyInstancesCount() int { return int(d.healthyInstancesCount.Load()) } + +func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string { + // otlp logs have a severity number, using which we are defining the log levels. + // Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber + if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" { + otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt) + if err != nil { + return logLevelInfo + } + if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) { + return logLevelDebug + } else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) { + return logLevelInfo + } else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) { + return logLevelWarn + } else if otlpSeverityNumber <= int(plog.SeverityNumberError4) { + return logLevelError + } else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) { + return logLevelFatal + } + return logLevelInfo + } + + return extractLogLevelFromLogLine(entry.Line) +} + +func extractLogLevelFromLogLine(log string) string { + if strings.Contains(log, `:"err"`) || strings.Contains(log, `:"ERR"`) || + strings.Contains(log, "=err") || strings.Contains(log, "=ERR") || + strings.Contains(log, "err:") || strings.Contains(log, "ERR:") || + strings.Contains(log, "error") || strings.Contains(log, "ERROR") { + return logLevelError + } + if strings.Contains(log, `:"warn"`) || strings.Contains(log, `:"WARN"`) || + strings.Contains(log, "=warn") || strings.Contains(log, "=WARN") || + strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") || + strings.Contains(log, "warning") || strings.Contains(log, "WARNING") { + return logLevelWarn + } + if strings.Contains(log, `:"critical"`) || strings.Contains(log, `:"CRITICAL"`) || + strings.Contains(log, "=critical") || strings.Contains(log, "=CRITICAL") || + strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") { + return logLevelCritical + } + if strings.Contains(log, `:"debug"`) || strings.Contains(log, `:"DEBUG"`) || + strings.Contains(log, "=debug") || strings.Contains(log, "=DEBUG") || + strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") { + return logLevelDebug + } + + // Default to info if no specific level is found + return logLevelInfo +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 81a7fb09b94a5..e4bf766b42ee6 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -26,13 +26,16 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/ingester/client" + loghttp_push "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/util/constants" fe "github.com/grafana/loki/pkg/util/flagext" @@ -1491,3 +1494,146 @@ func TestDistributorTee(t *testing.T) { require.Equal(t, "test", tee.tenant) } } + +func Test_DetectLogLevels(t *testing.T) { + setup := func(discoverLogLevels bool) (*validation.Limits, *mockIngester) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + limits.DiscoverLogLevels = discoverLogLevels + limits.DiscoverServiceName = nil + limits.AllowStructuredMetadata = true + return limits, &mockIngester{} + } + + t.Run("log level detection disabled", func(t *testing.T) { + limits, ingester := setup(false) + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0) + }) + + t.Run("log level detection enabled", func(t *testing.T) { + limits, ingester := setup(true) + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + require.Equal(t, push.LabelsAdapter{ + { + Name: labelLevel, + Value: logLevelInfo, + }, + }, topVal.Streams[0].Entries[0].StructuredMetadata) + }) + + t.Run("log level detection enabled but log level already present in stream", func(t *testing.T) { + limits, ingester := setup(true) + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels) + require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0) + }) + + t.Run("log level detection enabled but log level already present as structured metadata", func(t *testing.T) { + limits, ingester := setup(true) + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{ + { + Name: labelLevel, + Value: logLevelWarn, + }, + } + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + require.Equal(t, push.LabelsAdapter{ + { + Name: labelLevel, + Value: logLevelWarn, + }, + }, topVal.Streams[0].Entries[0].StructuredMetadata) + }) +} + +func Test_detectLogLevelFromLogEntry(t *testing.T) { + for _, tc := range []struct { + name string + entry logproto.Entry + expectedLogLevel string + }{ + { + name: "use severity number from otlp logs", + entry: logproto.Entry{ + Line: "error", + StructuredMetadata: push.LabelsAdapter{ + { + Name: loghttp_push.OTLPSeverityNumber, + Value: fmt.Sprintf("%d", plog.SeverityNumberDebug3), + }, + }, + }, + expectedLogLevel: logLevelDebug, + }, + { + name: "invalid severity number should not cause any issues", + entry: logproto.Entry{ + StructuredMetadata: push.LabelsAdapter{ + { + Name: loghttp_push.OTLPSeverityNumber, + Value: "foo", + }, + }, + }, + expectedLogLevel: logLevelInfo, + }, + { + name: "non otlp without any of the log level keywords in log line", + entry: logproto.Entry{ + Line: "foo", + }, + expectedLogLevel: logLevelInfo, + }, + { + name: "non otlp with log level keywords in log line", + entry: logproto.Entry{ + Line: "this is a warning log", + }, + expectedLogLevel: logLevelWarn, + }, + { + name: "json log line with an error", + entry: logproto.Entry{ + Line: `{"foo":"bar","level":"error"}`, + }, + expectedLogLevel: logLevelError, + }, + { + name: "logfmt log line with a warn", + entry: logproto.Entry{ + Line: `foo=bar level=warn`, + }, + expectedLogLevel: logLevelWarn, + }, + } { + t.Run(tc.name, func(t *testing.T) { + detectedLogLevel := detectLogLevelFromLogEntry(tc.entry, logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata)) + require.Equal(t, tc.expectedLogLevel, detectedLogLevel) + }) + } +} diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index d2f655f1c8329..927374416e8ba 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -23,6 +23,7 @@ type Limits interface { IncrementDuplicateTimestamps(userID string) bool DiscoverServiceName(userID string) []string + DiscoverLogLevels(userID string) bool ShardStreams(userID string) *shardstreams.Config IngestionRateStrategy() string diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index ca2186c1d2626..6f0bce53d983b 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -44,6 +44,7 @@ type validationContext struct { incrementDuplicateTimestamps bool discoverServiceName []string + discoverLogLevels bool allowStructuredMetadata bool maxStructuredMetadataSize int @@ -65,6 +66,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val maxLabelValueLength: v.MaxLabelValueLength(userID), incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID), discoverServiceName: v.DiscoverServiceName(userID), + discoverLogLevels: v.DiscoverLogLevels(userID), allowStructuredMetadata: v.AllowStructuredMetadata(userID), maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index a001b52b210f6..8bd206fce29ac 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -27,6 +27,8 @@ const ( pbContentType = "application/x-protobuf" gzipContentEncoding = "gzip" attrServiceName = "service.name" + + OTLPSeverityNumber = "severity_number" ) func newPushStats() *Stats { @@ -287,7 +289,7 @@ func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry { if severityNum := log.SeverityNumber(); severityNum != plog.SeverityNumberUnspecified { structuredMetadata = append(structuredMetadata, push.LabelAdapter{ - Name: "severity_number", + Name: OTLPSeverityNumber, Value: fmt.Sprintf("%d", severityNum), }) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 8c2197113a41f..a004b8eb94f89 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -81,6 +81,7 @@ type Limits struct { MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"` DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"` + DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"` // Ingester enforced limits. MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` @@ -254,6 +255,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { "job", } f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.") + f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", false, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name 'level' and one of the values from 'debug', 'info', 'warn', 'error', 'critical', 'fatal'.") _ = l.RejectOldSamplesMaxAge.Set("7d") f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.") @@ -915,6 +917,10 @@ func (o *Overrides) DiscoverServiceName(userID string) []string { return o.getOverridesForUser(userID).DiscoverServiceName } +func (o *Overrides) DiscoverLogLevels(userID string) bool { + return o.getOverridesForUser(userID).DiscoverLogLevels +} + // VolumeEnabled returns whether volume endpoints are enabled for a user. func (o *Overrides) VolumeEnabled(userID string) bool { return o.getOverridesForUser(userID).VolumeEnabled