diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f776e6778b43d..88c71dd62886a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -12,6 +12,9 @@ import ( "strings" "sync" "time" + "unicode/utf8" + + otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -66,6 +69,14 @@ const ( var ( maxLabelCacheSize = 100000 rfStats = analytics.NewInt("distributor_replication_factor") + + // the rune error replacement is rejected by Prometheus hence replacing them with space. + removeInvalidUtf = func(r rune) rune { + if r == utf8.RuneError { + return 32 // rune value for space + } + return r + } ) // Config for a Distributor. @@ -517,6 +528,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata) + for i := range entry.StructuredMetadata { + structuredMetadata[i].Name = otlptranslate.NormalizeLabel(structuredMetadata[i].Name) + if strings.ContainsRune(structuredMetadata[i].Value, utf8.RuneError) { + structuredMetadata[i].Value = strings.Map(removeInvalidUtf, structuredMetadata[i].Value) + } + } if shouldDiscoverLevels { logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry) if ok { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index fb727a7884b73..e03c675ebb1b5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -11,6 +11,11 @@ import ( "sync" "testing" "time" + "unicode/utf8" + + otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + + "github.com/grafana/loki/pkg/push" "github.com/c2h5oh/datasize" "github.com/go-kit/log" @@ -46,6 +51,13 @@ import ( "github.com/grafana/loki/v3/pkg/validation" ) +const ( + smValidName = "valid_name" + smInvalidName = "invalid-name" + smValidValue = "valid-value私" + smInvalidValue = "valid-value�" +) + var ( success = &logproto.PushResponse{} ctx = user.InjectOrgID(context.Background(), "test") @@ -428,7 +440,7 @@ func TestDistributorPushConcurrently(t *testing.T) { []string{ fmt.Sprintf(`{app="foo-%d"}`, n), fmt.Sprintf(`{instance="bar-%d"}`, n), - }, + }, false, false, false, ) response, err := distributors[n%len(distributors)].Push(ctx, request) assert.NoError(t, err) @@ -1226,17 +1238,58 @@ func Benchmark_Push(b *testing.B) { limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour) limits.CreationGracePeriod = model.Duration(24 * time.Hour) distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) - request := makeWriteRequest(100000, 100) - b.ResetTimer() b.ReportAllocs() - for n := 0; n < b.N; n++ { - _, err := distributors[0].Push(ctx, request) - if err != nil { - require.NoError(b, err) + b.Run("no structured metadata", func(b *testing.B) { + for n := 0; n < b.N; n++ { + request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, false, false, false) + _, err := distributors[0].Push(ctx, request) + if err != nil { + require.NoError(b, err) + } } - } + }) + + b.Run("all valid structured metadata", func(b *testing.B) { + for n := 0; n < b.N; n++ { + request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, false, false) + _, err := distributors[0].Push(ctx, request) + if err != nil { + require.NoError(b, err) + } + } + }) + + b.Run("structured metadata with invalid names", func(b *testing.B) { + for n := 0; n < b.N; n++ { + request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, true, false) + _, err := distributors[0].Push(ctx, request) + if err != nil { + require.NoError(b, err) + } + } + }) + + b.Run("structured metadata with invalid values", func(b *testing.B) { + for n := 0; n < b.N; n++ { + request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, false, true) + _, err := distributors[0].Push(ctx, request) + if err != nil { + require.NoError(b, err) + } + } + }) + + b.Run("structured metadata with invalid names and values", func(b *testing.B) { + for n := 0; n < b.N; n++ { + request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, true, true) + _, err := distributors[0].Push(ctx, request) + if err != nil { + require.NoError(b, err) + } + } + }) } func TestShardCalculation(t *testing.T) { @@ -1696,7 +1749,7 @@ func makeWriteRequestWithLabelsWithLevel(lines, size int, labels []string, level } } -func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.PushRequest { +func makeWriteRequestWithLabels(lines, size int, labels []string, addStructuredMetadata, invalidName, invalidValue bool) *logproto.PushRequest { streams := make([]logproto.Stream, len(labels)) for i := 0; i < len(labels); i++ { stream := logproto.Stream{Labels: labels[i]} @@ -1705,11 +1758,24 @@ func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.Push // Construct the log line, honoring the input size line := strconv.Itoa(j) + strings.Repeat("0", size) line = line[:size] - - stream.Entries = append(stream.Entries, logproto.Entry{ + entry := logproto.Entry{ Timestamp: time.Now().Add(time.Duration(j) * time.Millisecond), Line: line, - }) + } + if addStructuredMetadata { + name := smValidName + value := smValidValue + if invalidName { + name = smInvalidName + } + if invalidValue { + value = smInvalidValue + } + entry.StructuredMetadata = push.LabelsAdapter{ + {Name: name, Value: value}, + } + } + stream.Entries = append(stream.Entries, entry) } streams[i] = stream @@ -1721,7 +1787,7 @@ func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.Push } func makeWriteRequest(lines, size int) *logproto.PushRequest { - return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}) + return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}, false, false, false) } type mockKafkaWriter struct { @@ -1777,6 +1843,19 @@ func (i *mockIngester) Push(_ context.Context, in *logproto.PushRequest, _ ...gr i.mu.Lock() defer i.mu.Unlock() + for _, s := range in.Streams { + for _, e := range s.Entries { + for _, sm := range e.StructuredMetadata { + if strings.ContainsRune(sm.Value, utf8.RuneError) { + return nil, fmt.Errorf("sm value was not sanitized before being pushed to ignester, invalid utf 8 rune %d", utf8.RuneError) + } + if sm.Name != otlptranslate.NormalizeLabel(sm.Name) { + return nil, fmt.Errorf("sm name was not sanitized before being sent to ingester, contained characters %s", sm.Name) + + } + } + } + } i.pushed = append(i.pushed, in) return nil, nil @@ -1875,3 +1954,39 @@ func TestDistributorTee(t *testing.T) { require.Equal(t, "test", tee.tenant) } } + +func TestDistributor_StructuredMetadataSanitization(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + for _, tc := range []struct { + req *logproto.PushRequest + expectedResponse *logproto.PushResponse + }{ + { + makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, false), + success, + }, + { + makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, false), + success, + }, + { + makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, true), + success, + }, + { + makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, true), + success, + }, + } { + distributors, _ := prepare(t, 1, 5, limits, nil) + + var request logproto.PushRequest + request.Streams = append(request.Streams, tc.req.Streams[0]) + + // the error would happen in the ingester mock, it's set to reject SM that has not been sanitized + response, err := distributors[0].Push(ctx, &request) + require.NoError(t, err) + assert.Equal(t, tc.expectedResponse, response) + } +} diff --git a/pkg/distributor/level_detection_test.go b/pkg/distributor/level_detection_test.go index a9fb0d36b6956..ec674e8bf871b 100644 --- a/pkg/distributor/level_detection_test.go +++ b/pkg/distributor/level_detection_test.go @@ -31,7 +31,7 @@ func Test_DetectLogLevels(t *testing.T) { limits, ingester := setup(false) distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false) _, err := distributors[0].Push(ctx, writeReq) require.NoError(t, err) topVal := ingester.Peek() @@ -43,7 +43,7 @@ func Test_DetectLogLevels(t *testing.T) { limits, ingester := setup(true) distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false) _, err := distributors[0].Push(ctx, writeReq) require.NoError(t, err) topVal := ingester.Peek() @@ -80,7 +80,7 @@ func Test_DetectLogLevels(t *testing.T) { limits, ingester := setup(true) distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}) + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}, false, false, false) _, err := distributors[0].Push(ctx, writeReq) require.NoError(t, err) topVal := ingester.Peek() @@ -95,7 +95,7 @@ func Test_DetectLogLevels(t *testing.T) { limits, ingester := setup(true) distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false) writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{ { Name: "severity",