Skip to content

Commit

Permalink
feat: sanitize structured metadata during ingestion in the distributor (
Browse files Browse the repository at this point in the history
#15141)

Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan authored Nov 29, 2024
1 parent 990f71c commit be4f17e
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 17 deletions.
17 changes: 17 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"strings"
"sync"
"time"
"unicode/utf8"

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -66,6 +69,14 @@ const (
var (
maxLabelCacheSize = 100000
rfStats = analytics.NewInt("distributor_replication_factor")

// the rune error replacement is rejected by Prometheus hence replacing them with space.
removeInvalidUtf = func(r rune) rune {
if r == utf8.RuneError {
return 32 // rune value for space
}
return r
}
)

// Config for a Distributor.
Expand Down Expand Up @@ -517,6 +528,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
for i := range entry.StructuredMetadata {
structuredMetadata[i].Name = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
if strings.ContainsRune(structuredMetadata[i].Value, utf8.RuneError) {
structuredMetadata[i].Value = strings.Map(removeInvalidUtf, structuredMetadata[i].Value)
}
}
if shouldDiscoverLevels {
logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
Expand Down
141 changes: 128 additions & 13 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
"sync"
"testing"
"time"
"unicode/utf8"

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
Expand Down Expand Up @@ -46,6 +51,13 @@ import (
"github.com/grafana/loki/v3/pkg/validation"
)

const (
smValidName = "valid_name"
smInvalidName = "invalid-name"
smValidValue = "valid-value私"
smInvalidValue = "valid-value�"
)

var (
success = &logproto.PushResponse{}
ctx = user.InjectOrgID(context.Background(), "test")
Expand Down Expand Up @@ -428,7 +440,7 @@ func TestDistributorPushConcurrently(t *testing.T) {
[]string{
fmt.Sprintf(`{app="foo-%d"}`, n),
fmt.Sprintf(`{instance="bar-%d"}`, n),
},
}, false, false, false,
)
response, err := distributors[n%len(distributors)].Push(ctx, request)
assert.NoError(t, err)
Expand Down Expand Up @@ -1226,17 +1238,58 @@ func Benchmark_Push(b *testing.B) {
limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour)
limits.CreationGracePeriod = model.Duration(24 * time.Hour)
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
request := makeWriteRequest(100000, 100)

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
_, err := distributors[0].Push(ctx, request)
if err != nil {
require.NoError(b, err)
b.Run("no structured metadata", func(b *testing.B) {
for n := 0; n < b.N; n++ {
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, false, false, false)
_, err := distributors[0].Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
}
})

b.Run("all valid structured metadata", func(b *testing.B) {
for n := 0; n < b.N; n++ {
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, false, false)
_, err := distributors[0].Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
})

b.Run("structured metadata with invalid names", func(b *testing.B) {
for n := 0; n < b.N; n++ {
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, true, false)
_, err := distributors[0].Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
})

b.Run("structured metadata with invalid values", func(b *testing.B) {
for n := 0; n < b.N; n++ {
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, false, true)
_, err := distributors[0].Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
})

b.Run("structured metadata with invalid names and values", func(b *testing.B) {
for n := 0; n < b.N; n++ {
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, true, true)
_, err := distributors[0].Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
})
}

func TestShardCalculation(t *testing.T) {
Expand Down Expand Up @@ -1696,7 +1749,7 @@ func makeWriteRequestWithLabelsWithLevel(lines, size int, labels []string, level
}
}

func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.PushRequest {
func makeWriteRequestWithLabels(lines, size int, labels []string, addStructuredMetadata, invalidName, invalidValue bool) *logproto.PushRequest {
streams := make([]logproto.Stream, len(labels))
for i := 0; i < len(labels); i++ {
stream := logproto.Stream{Labels: labels[i]}
Expand All @@ -1705,11 +1758,24 @@ func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.Push
// Construct the log line, honoring the input size
line := strconv.Itoa(j) + strings.Repeat("0", size)
line = line[:size]

stream.Entries = append(stream.Entries, logproto.Entry{
entry := logproto.Entry{
Timestamp: time.Now().Add(time.Duration(j) * time.Millisecond),
Line: line,
})
}
if addStructuredMetadata {
name := smValidName
value := smValidValue
if invalidName {
name = smInvalidName
}
if invalidValue {
value = smInvalidValue
}
entry.StructuredMetadata = push.LabelsAdapter{
{Name: name, Value: value},
}
}
stream.Entries = append(stream.Entries, entry)
}

streams[i] = stream
Expand All @@ -1721,7 +1787,7 @@ func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.Push
}

func makeWriteRequest(lines, size int) *logproto.PushRequest {
return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`})
return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}, false, false, false)
}

type mockKafkaWriter struct {
Expand Down Expand Up @@ -1777,6 +1843,19 @@ func (i *mockIngester) Push(_ context.Context, in *logproto.PushRequest, _ ...gr

i.mu.Lock()
defer i.mu.Unlock()
for _, s := range in.Streams {
for _, e := range s.Entries {
for _, sm := range e.StructuredMetadata {
if strings.ContainsRune(sm.Value, utf8.RuneError) {
return nil, fmt.Errorf("sm value was not sanitized before being pushed to ignester, invalid utf 8 rune %d", utf8.RuneError)
}
if sm.Name != otlptranslate.NormalizeLabel(sm.Name) {
return nil, fmt.Errorf("sm name was not sanitized before being sent to ingester, contained characters %s", sm.Name)

}
}
}
}

i.pushed = append(i.pushed, in)
return nil, nil
Expand Down Expand Up @@ -1875,3 +1954,39 @@ func TestDistributorTee(t *testing.T) {
require.Equal(t, "test", tee.tenant)
}
}

func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
for _, tc := range []struct {
req *logproto.PushRequest
expectedResponse *logproto.PushResponse
}{
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, false),
success,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, false),
success,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, true),
success,
},
{
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, true),
success,
},
} {
distributors, _ := prepare(t, 1, 5, limits, nil)

var request logproto.PushRequest
request.Streams = append(request.Streams, tc.req.Streams[0])

// the error would happen in the ingester mock, it's set to reject SM that has not been sanitized
response, err := distributors[0].Push(ctx, &request)
require.NoError(t, err)
assert.Equal(t, tc.expectedResponse, response)
}
}
8 changes: 4 additions & 4 deletions pkg/distributor/level_detection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_DetectLogLevels(t *testing.T) {
limits, ingester := setup(false)
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false)
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
Expand All @@ -43,7 +43,7 @@ func Test_DetectLogLevels(t *testing.T) {
limits, ingester := setup(true)
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false)
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
Expand Down Expand Up @@ -80,7 +80,7 @@ func Test_DetectLogLevels(t *testing.T) {
limits, ingester := setup(true)
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`})
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}, false, false, false)
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
Expand All @@ -95,7 +95,7 @@ func Test_DetectLogLevels(t *testing.T) {
limits, ingester := setup(true)
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false)
writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{
{
Name: "severity",
Expand Down

0 comments on commit be4f17e

Please sign in to comment.