diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index a361bbbf196de..e45c0a60ac309 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -47,7 +47,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe return nil, nil, err } - req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats) + req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats) return req, stats, nil } @@ -98,7 +98,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest { +func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -111,12 +111,13 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten res := rls.At(i).Resource() resAttrs := res.Attributes() - if v, ok := resAttrs.Get(attrServiceName); !ok || v.AsString() == "" { - resAttrs.PutStr(attrServiceName, "unknown_service") - } resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len()) streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size + hasServiceName := false + if v, ok := resAttrs.Get(attrServiceName); ok && v.AsString() != "" { + hasServiceName = true + } resAttrs.Range(func(k string, v pcommon.Value) bool { action := otlpConfig.ActionForResourceAttribute(k) if action == Drop { @@ -127,6 +128,16 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten if action == IndexLabel { for _, lbl := range attributeAsLabels { streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + + if !hasServiceName && len(discoverServiceName) > 0 && !stats.IsAggregatedMetric { + for _, labelName := range discoverServiceName { + if lbl.Name == labelName { + streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(lbl.Value) + hasServiceName = true + break + } + } + } } } else if action == StructuredMetadata { resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...) @@ -135,6 +146,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten return true }) + if !hasServiceName && len(discoverServiceName) > 0 && !stats.IsAggregatedMetric { + streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown) + } + if err := streamLabels.Validate(); err != nil { stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err)) continue diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index bcdeb18d17069..4201b93b114fd 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -493,7 +493,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() tracker := NewMockTracker() - pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats) + pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, []string{}, tracker, stats) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 80e7c5e7eead1..e81414bd2ea0d 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -16,7 +16,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "github.com/grafana/dskit/flagext" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -256,7 +259,7 @@ func TestParseRequest(t *testing.T) { } tracker := NewMockTracker() - data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived @@ -314,12 +317,111 @@ func TestParseRequest(t *testing.T) { } } +func Test_ServiceDetction(t *testing.T) { + tracker := NewMockTracker() + + t.Run("detects servce from loki push requests", func(t *testing.T) { + body := `{"streams": [{ "stream": { "foo": "bar" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}` + request := httptest.NewRequest( + "POST", + `/loki/api/v1/push`, + strings.NewReader(body), + ) + request.Header.Add("Content-Type", "application/json") + + limits := &fakeLimits{enabled: true, labels: []string{"foo"}} + data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker) + + require.NoError(t, err) + require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) + }) + + t.Run("detects servce from OTLP push requests using default indexing", func(t *testing.T) { + now := time.Unix(0, time.Now().UnixNano()) + + tracker := NewMockTracker() + + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("k8s.job.name", "bar") + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano())) + + jsonMarshaller := plog.JSONMarshaler{} + body, err := jsonMarshaller.MarshalLogs(ld) + + require.NoError(t, err) + request := httptest.NewRequest( + "POST", + `/loki/api/v1/push`, + bytes.NewReader(body), + ) + request.Header.Add("Content-Type", "application/json") + + limits := &fakeLimits{enabled: true} + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + require.NoError(t, err) + require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) + }) + + t.Run("detects servce from OTLP push requests using custom indexing", func(t *testing.T) { + now := time.Unix(0, time.Now().UnixNano()) + + tracker := NewMockTracker() + + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("special", "sauce") + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano())) + + jsonMarshaller := plog.JSONMarshaler{} + body, err := jsonMarshaller.MarshalLogs(ld) + + require.NoError(t, err) + request := httptest.NewRequest( + "POST", + `/loki/api/v1/push`, + bytes.NewReader(body), + ) + request.Header.Add("Content-Type", "application/json") + + limits := &fakeLimits{ + enabled: true, + labels: []string{"special"}, + indexAttributes: []string{"special"}, + } + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + require.NoError(t, err) + require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels) + }) +} + type fakeLimits struct { - enabled bool + enabled bool + labels []string + indexAttributes []string +} + +func (f *fakeLimits) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration { + return time.Hour } func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig { - return OTLPConfig{} + if len(l.indexAttributes) > 0 { + return OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: l.indexAttributes, + }, + }, + }, + } + } + + defaultGlobalOTLPConfig := GlobalOTLPConfig{} + flagext.DefaultValues(&defaultGlobalOTLPConfig) + return DefaultOTLPConfig(defaultGlobalOTLPConfig) } func (l *fakeLimits) DiscoverServiceName(_ string) []string { @@ -327,6 +429,10 @@ func (l *fakeLimits) DiscoverServiceName(_ string) []string { return nil } + if len(l.labels) > 0 { + return l.labels + } + return []string{ "service", "app", @@ -335,12 +441,16 @@ func (l *fakeLimits) DiscoverServiceName(_ string) []string { "app_kubernetes_io_name", "container", "container_name", + "k8s_container_name", "component", "workload", "job", + "k8s_job_name", } } +// RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration + type MockCustomTracker struct { receivedBytes map[string]float64 discardedBytes map[string]float64 diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 3e44dc2047773..75128607b882e 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -263,9 +263,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { "app_kubernetes_io_name", "container", "container_name", + "k8s_container_name", "component", "workload", "job", + "k8s_job_name", } f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.") f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")