Skip to content

Commit

Permalink
fix: sanatize structured metadata at query time (grafana#13983)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored and pascal-sochacki committed Aug 29, 2024
1 parent aaad361 commit 3dc47eb
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
22 changes: 17 additions & 5 deletions pkg/logql/log/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"unsafe"

"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/prometheus/prometheus/model/labels"
)

Expand Down Expand Up @@ -67,7 +69,7 @@ func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline {
}
n.mu.RUnlock()

sp := &noopStreamPipeline{n.baseBuilder.ForLabels(labels, h)}
sp := &noopStreamPipeline{n.baseBuilder.ForLabels(labels, h), make([]int, 0, 10)}

n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -92,7 +94,8 @@ func IsNoopPipeline(p Pipeline) bool {
}

type noopStreamPipeline struct {
builder *LabelsBuilder
builder *LabelsBuilder
offsetsBuf []int
}

func (n noopStreamPipeline) ReferencedStructuredMetadata() bool {
Expand All @@ -101,6 +104,9 @@ func (n noopStreamPipeline) ReferencedStructuredMetadata() bool {

func (n noopStreamPipeline) Process(_ int64, line []byte, structuredMetadata ...labels.Label) ([]byte, LabelsResult, bool) {
n.builder.Reset()
for i, lb := range structuredMetadata {
structuredMetadata[i].Name = prometheus.NormalizeLabel(lb.Name)
}
n.builder.Add(StructuredMetadataLabel, structuredMetadata...)
return line, n.builder.LabelsResult(), true
}
Expand Down Expand Up @@ -176,12 +182,13 @@ func NewPipeline(stages []Stage) Pipeline {
}

type streamPipeline struct {
stages []Stage
builder *LabelsBuilder
stages []Stage
builder *LabelsBuilder
offsetsBuf []int
}

func NewStreamPipeline(stages []Stage, labelsBuilder *LabelsBuilder) StreamPipeline {
return &streamPipeline{stages, labelsBuilder}
return &streamPipeline{stages, labelsBuilder, make([]int, 0, 10)}
}

func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline {
Expand Down Expand Up @@ -220,6 +227,11 @@ func (p *streamPipeline) ReferencedStructuredMetadata() bool {
func (p *streamPipeline) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]byte, LabelsResult, bool) {
var ok bool
p.builder.Reset()

for i, lb := range structuredMetadata {
structuredMetadata[i].Name = prometheus.NormalizeLabel(lb.Name)
}

p.builder.Add(StructuredMetadataLabel, structuredMetadata...)

for _, s := range p.stages {
Expand Down
36 changes: 36 additions & 0 deletions pkg/logql/log/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ func TestNoopPipeline(t *testing.T) {
require.Equal(t, expectedLabelsResults.String(), lbr.String())
require.Equal(t, true, matches)

// test structured metadata with disallowed label names
structuredMetadata = append(labels.FromStrings("y", "1", "z", "2"), labels.Label{Name: "zsomething-bad", Value: "foo"})
expectedStructuredMetadata := append(labels.FromStrings("y", "1", "z", "2"), labels.Label{Name: "zsomething_bad", Value: "foo"})
expectedLabelsResults = append(lbs, expectedStructuredMetadata...)

l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), structuredMetadata...)
require.Equal(t, []byte(""), l)
require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedStructuredMetadata, labels.EmptyLabels()), lbr)
require.Equal(t, expectedLabelsResults.Hash(), lbr.Hash())
require.Equal(t, expectedLabelsResults.String(), lbr.String())
require.Equal(t, true, matches)

pipeline.Reset()
require.Len(t, pipeline.cache, 0)
}
Expand Down Expand Up @@ -171,6 +183,17 @@ func TestPipelineWithStructuredMetadata(t *testing.T) {
require.Equal(t, nil, lbr)
require.Equal(t, false, matches)

// test structured metadata with disallowed label names
withBadLabel := append(structuredMetadata, labels.Label{Name: "zsomething-bad", Value: "foo"})
expectedStructuredMetadata := append(structuredMetadata, labels.Label{Name: "zsomething_bad", Value: "foo"})
expectedLabelsResults = append(lbs, expectedStructuredMetadata...)

_, lbr, matches = p.ForStream(lbs).Process(0, []byte(""), withBadLabel...)
require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedStructuredMetadata, labels.EmptyLabels()), lbr)
require.Equal(t, expectedLabelsResults.Hash(), lbr.Hash())
require.Equal(t, expectedLabelsResults.String(), lbr.String())
require.Equal(t, true, matches)

// Reset caches
p.baseBuilder.del = []string{"foo", "bar"}
p.baseBuilder.add = [numValidCategories]labels.Labels{
Expand Down Expand Up @@ -566,6 +589,19 @@ func Benchmark_Pipeline(b *testing.B) {
}
})

b.Run("pipeline bytes no invalid structured metadata", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resLine, resLbs, resMatches = sp.Process(0, line, labels.Label{Name: "valid_name", Value: "foo"})
}
})
b.Run("pipeline string with invalid structured metadata", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resLine, resLbs, resMatches = sp.Process(0, line, labels.Label{Name: "invalid-name", Value: "foo"}, labels.Label{Name: "other-invalid-name", Value: "foo"})
}
})

extractor, err := NewLineSampleExtractor(CountExtractor, stages, []string{"cluster", "level"}, false, false)
require.NoError(b, err)
ex := extractor.ForStream(lbs)
Expand Down

0 comments on commit 3dc47eb

Please sign in to comment.