Skip to content

Commit

Permalink
fix: enable service detection for otlp endoint (#14036)
Browse files Browse the repository at this point in the history
Co-authored-by: Sven Grossmann <[email protected]>
Co-authored-by: Ashwanth <[email protected]>
Co-authored-by: JordanRushing <[email protected]>
  • Loading branch information
4 people authored Sep 4, 2024
1 parent aa1ac99 commit 4f962ef
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3680,7 +3680,7 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# list to service_name. If none of the configured labels exist in the stream,
# label is set to unknown_service. Empty list disables setting the label.
# CLI flag: -validation.discover-service-name
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name component workload job]]
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name k8s_container_name component workload job k8s_job_name]]

# Discover and add log levels during ingestion, if not present already. Levels
# would be added to Structured Metadata with name
Expand Down
26 changes: 21 additions & 5 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
return req, stats, nil
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -111,12 +111,14 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
res := rls.At(i).Resource()
resAttrs := res.Attributes()

if v, ok := resAttrs.Get(attrServiceName); !ok || v.AsString() == "" {
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size

shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric
hasServiceName := false
if v, ok := resAttrs.Get(attrServiceName); ok && v.AsString() != "" {
hasServiceName = true
}
resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
if action == Drop {
Expand All @@ -127,6 +129,16 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)

if !hasServiceName && shouldDiscoverServiceName {
for _, labelName := range discoverServiceName {
if lbl.Name == labelName {
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(lbl.Value)
hasServiceName = true
break
}
}
}
}
} else if action == StructuredMetadata {
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
Expand All @@ -135,6 +147,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
return true
})

if !hasServiceName && shouldDiscoverServiceName {
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
}

if err := streamLabels.Validate(); err != nil {
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
continue
Expand Down
20 changes: 17 additions & 3 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ import (

func TestOTLPToLokiPushRequest(t *testing.T) {
now := time.Unix(0, time.Now().UnixNano())
defaultServiceDetection := []string{
"service",
"app",
"application",
"name",
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -346,7 +360,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
{
Action: IndexLabel,
Attributes: []string{"pod.name"},
}, {
},
{
Action: IndexLabel,
Regex: relabel.MustNewRegexp("service.*"),
},
Expand Down Expand Up @@ -493,7 +508,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
tracker := NewMockTracker()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats)
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

Expand Down Expand Up @@ -592,7 +607,6 @@ func TestOTLPLogToPushEntry(t *testing.T) {
require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig(defaultGlobalOTLPConfig)))
})
}

}

func TestAttributesToLabels(t *testing.T) {
Expand Down
125 changes: 119 additions & 6 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"compress/gzip"
"context"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"strings"
"testing"
Expand All @@ -16,6 +18,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/grafana/dskit/flagext"

util_log "github.com/grafana/loki/v3/pkg/util/log"
)
Expand Down Expand Up @@ -256,7 +262,7 @@ func TestParseRequest(t *testing.T) {
}

tracker := NewMockTracker()
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)

structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
Expand Down Expand Up @@ -314,19 +320,124 @@ func TestParseRequest(t *testing.T) {
}
}

func Test_ServiceDetection(t *testing.T) {
tracker := NewMockTracker()

createOtlpLogs := func(labels ...string) []byte {
now := time.Unix(0, time.Now().UnixNano())
ld := plog.NewLogs()
for i := 0; i < len(labels); i += 2 {
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr(labels[i], labels[i+1])
}
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body")
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano()))

jsonMarshaller := plog.JSONMarshaler{}
body, err := jsonMarshaller.MarshalLogs(ld)

require.NoError(t, err)
return body
}

createRequest := func(path string, body io.Reader) *http.Request {
request := httptest.NewRequest(
"POST",
path,
body,
)
request.Header.Add("Content-Type", "application/json")

return request
}

t.Run("detects servce from loki push requests", func(t *testing.T) {
body := `{"streams": [{ "stream": { "foo": "bar" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`
request := createRequest("/loki/api/v1/push", strings.NewReader(body))

limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)

require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})

t.Run("detects servce from OTLP push requests using default indexing", func(t *testing.T) {
body := createOtlpLogs("k8s.job.name", "bar")
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{enabled: true}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})

t.Run("detects servce from OTLP push requests using custom indexing", func(t *testing.T) {
body := createOtlpLogs("special", "sauce")
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{
enabled: true,
labels: []string{"special"},
indexAttributes: []string{"special"},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})

t.Run("only detects custom service label from indexed labels", func(t *testing.T) {
body := createOtlpLogs("special", "sauce")
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{
enabled: true,
labels: []string{"special"},
indexAttributes: []string{},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
})
}

type fakeLimits struct {
enabled bool
enabled bool
labels []string
indexAttributes []string
}

func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig {
return OTLPConfig{}
func (f *fakeLimits) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration {
return time.Hour
}

func (l *fakeLimits) DiscoverServiceName(_ string) []string {
if !l.enabled {
func (f *fakeLimits) OTLPConfig(_ string) OTLPConfig {
if len(f.indexAttributes) > 0 {
return OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
{
Action: IndexLabel,
Attributes: f.indexAttributes,
},
},
},
}
}

defaultGlobalOTLPConfig := GlobalOTLPConfig{}
flagext.DefaultValues(&defaultGlobalOTLPConfig)
return DefaultOTLPConfig(defaultGlobalOTLPConfig)
}

func (f *fakeLimits) DiscoverServiceName(_ string) []string {
if !f.enabled {
return nil
}

if len(f.labels) > 0 {
return f.labels
}

return []string{
"service",
"app",
Expand All @@ -335,9 +446,11 @@ func (l *fakeLimits) DiscoverServiceName(_ string) []string {
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}
f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.")
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")
Expand Down

0 comments on commit 4f962ef

Please sign in to comment.