Skip to content

Commit

Permalink
Merge branch 'main' into small-detected-fields-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r committed Sep 27, 2024
2 parents a430bbf + 6354ded commit 92c756d
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 31 deletions.
90 changes: 75 additions & 15 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,20 +327,52 @@ func (b *LabelsBuilder) Get(key string) (string, bool) {
// Del deletes the label of the given name.
func (b *LabelsBuilder) Del(ns ...string) *LabelsBuilder {
for _, n := range ns {
for category, lbls := range b.add {
for i, a := range lbls {
if a.Name == n {
b.add[category] = append(lbls[:i], lbls[i+1:]...)
}
}
for category := range b.add {
b.deleteWithCategory(LabelCategory(category), n)
}
b.del = append(b.del, n)
}
return b
}

// deleteWithCategory removes the label from the specified category
func (b *LabelsBuilder) deleteWithCategory(category LabelCategory, n string) {
for i, l := range b.add[category] {
if l.Name == n {
b.add[category] = append(b.add[category][:i], b.add[category][i+1:]...)
}
}
}

// Set the name/value pair as a label.
// The value `v` may not be set if a category with higher preference already contains `n`.
// Category preference goes as Parsed > Structured Metadata > Stream.
func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder {
// Parsed takes precedence over Structured Metadata and Stream labels.
// If category is Parsed, we delete `n` from the structured metadata and stream labels.
if category == ParsedLabel {
b.deleteWithCategory(StructuredMetadataLabel, n)
b.deleteWithCategory(StreamLabel, n)
}

// Structured Metadata takes precedence over Stream labels.
// If category is `StructuredMetadataLabel`,we delete `n` from the stream labels.
// If `n` exists in the parsed labels, we won't overwrite it's value and we just return what we have.
if category == StructuredMetadataLabel {
b.deleteWithCategory(StreamLabel, n)
if labelsContain(b.add[ParsedLabel], n) {
return b
}
}

// Finally, if category is `StreamLabel` and `n` already exists in either the structured metadata or
// parsed labels, the `Set` operation is a noop and we return the unmodified labels builder.
if category == StreamLabel {
if labelsContain(b.add[StructuredMetadataLabel], n) || labelsContain(b.add[ParsedLabel], n) {
return b
}
}

for i, a := range b.add[category] {
if a.Name == n {
b.add[category][i].Value = v
Expand Down Expand Up @@ -430,6 +462,7 @@ func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCat
} else {
buf = buf[:0]
}

if categoriesContain(categories, StreamLabel) {
Outer:
for _, l := range b.base {
Expand All @@ -439,20 +472,38 @@ func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCat
continue Outer
}
}
// Skip stream labels which value will be replaced
for _, lbls := range b.add {
for _, la := range lbls {
if l.Name == la.Name {
continue Outer
}
}

// Skip stream labels which value will be replaced by structured metadata
if labelsContain(b.add[StructuredMetadataLabel], l.Name) {
continue
}

// Skip stream labels which value will be replaced by parsed labels
if labelsContain(b.add[ParsedLabel], l.Name) {
continue
}

// Take value from stream label if present
if labelsContain(b.add[StreamLabel], l.Name) {
buf = append(buf, labels.Label{Name: l.Name, Value: b.add[StreamLabel].Get(l.Name)})
} else {
buf = append(buf, l)
}
}
}

if categoriesContain(categories, StructuredMetadataLabel) {
for _, l := range b.add[StructuredMetadataLabel] {
if labelsContain(b.add[ParsedLabel], l.Name) {
continue
}

buf = append(buf, l)
}
}

for _, category := range categories {
buf = append(buf, b.add[category]...)
if categoriesContain(categories, ParsedLabel) {
buf = append(buf, b.add[ParsedLabel]...)
}
if (b.HasErr() || b.HasErrorDetails()) && categoriesContain(categories, ParsedLabel) {
buf = b.appendErrors(buf)
Expand Down Expand Up @@ -566,6 +617,15 @@ func flattenLabels(buf labels.Labels, many ...labels.Labels) labels.Labels {
return buf
}

func labelsContain(labels labels.Labels, name string) bool {
for _, l := range labels {
if l.Name == name {
return true
}
}
return false
}

func (b *BaseLabelsBuilder) toUncategorizedResult(buf labels.Labels) LabelsResult {
hash := b.hasher.Hash(buf)
if cached, ok := b.resultCache[hash]; ok {
Expand Down
180 changes: 180 additions & 0 deletions pkg/logql/log/labels_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package log

import (
"sort"
"testing"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -198,6 +199,185 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) {
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())

b.Reset()
b.Set(StreamLabel, "namespace", "tempo")
b.Set(StreamLabel, "bazz", "tazz")
b.Set(StructuredMetadataLabel, "bazz", "sazz")
b.Set(ParsedLabel, "ToReplace", "other")

expectedStreamLbls = labels.FromStrings(
"namespace", "tempo",
"cluster", "us-central1",
"job", "us-central1/loki",
)
expectedStucturedMetadataLbls = labels.FromStrings(
"bazz", "sazz",
)
expectedParsedLbls = labels.FromStrings(
"ToReplace", "other",
)

expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
expected = append(expected, expectedStreamLbls...)
expected = append(expected, expectedStucturedMetadataLbls...)
expected = append(expected, expectedParsedLbls...)
expected = labels.New(expected...)
assertLabelResult(t, expected, b.LabelsResult())
// cached.
assertLabelResult(t, expected, b.LabelsResult())
actual = b.LabelsResult()
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())
}

func TestLabelsBuilder_Set(t *testing.T) {
strs := []string{
"namespace", "loki",
"cluster", "us-central1",
"toreplace", "fuzz",
}
lbs := labels.FromStrings(strs...)
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())

// test duplicating stream label with parsed label
b.Set(StructuredMetadataLabel, "stzz", "stvzz")
b.Set(ParsedLabel, "toreplace", "buzz")
expectedStreamLbls := labels.FromStrings("namespace", "loki", "cluster", "us-central1")
expectedStucturedMetadataLbls := labels.FromStrings("stzz", "stvzz")
expectedParsedLbls := labels.FromStrings("toreplace", "buzz")

expected := make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
expected = append(expected, expectedStreamLbls...)
expected = append(expected, expectedStucturedMetadataLbls...)
expected = append(expected, expectedParsedLbls...)
expected = labels.New(expected...)

actual := b.LabelsResult()
assertLabelResult(t, expected, actual)
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())

b.Reset()

// test duplicating structured metadata label with parsed label
b.Set(StructuredMetadataLabel, "stzz", "stvzz")
b.Set(StructuredMetadataLabel, "toreplace", "muzz")
b.Set(ParsedLabel, "toreplace", "buzz")
expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1")
expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzz")
expectedParsedLbls = labels.FromStrings("toreplace", "buzz")

expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
expected = append(expected, expectedStreamLbls...)
expected = append(expected, expectedStucturedMetadataLbls...)
expected = append(expected, expectedParsedLbls...)
expected = labels.New(expected...)

actual = b.LabelsResult()
assertLabelResult(t, expected, actual)
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())

b.Reset()

// test duplicating stream label with structured meta data label
b.Set(StructuredMetadataLabel, "toreplace", "muzz")
b.Set(ParsedLabel, "stzz", "stvzz")
expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1")
expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz")
expectedParsedLbls = labels.FromStrings("stzz", "stvzz")

expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
expected = append(expected, expectedStreamLbls...)
expected = append(expected, expectedStucturedMetadataLbls...)
expected = append(expected, expectedParsedLbls...)
expected = labels.New(expected...)

actual = b.LabelsResult()
assertLabelResult(t, expected, actual)
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())

b.Reset()

// test duplicating parsed label with structured meta data label
b.Set(ParsedLabel, "toreplace", "puzz")
b.Set(StructuredMetadataLabel, "stzz", "stvzzz")
b.Set(StructuredMetadataLabel, "toreplace", "muzz")
expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1")
expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzzz")
expectedParsedLbls = labels.FromStrings("toreplace", "puzz")

expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
expected = append(expected, expectedStreamLbls...)
expected = append(expected, expectedStucturedMetadataLbls...)
expected = append(expected, expectedParsedLbls...)
expected = labels.New(expected...)

actual = b.LabelsResult()
assertLabelResult(t, expected, actual)
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())

b.Reset()

// test duplicating structured meta data label with stream label
b.Set(ParsedLabel, "stzz", "stvzzz")
b.Set(StructuredMetadataLabel, "toreplace", "muzz")
expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1")
expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz")
expectedParsedLbls = labels.FromStrings("stzz", "stvzzz")

expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
expected = append(expected, expectedStreamLbls...)
expected = append(expected, expectedStucturedMetadataLbls...)
expected = append(expected, expectedParsedLbls...)
expected = labels.New(expected...)

actual = b.LabelsResult()
assertLabelResult(t, expected, actual)
assert.Equal(t, expectedStreamLbls, actual.Stream())
assert.Equal(t, expectedStucturedMetadataLbls, actual.StructuredMetadata())
assert.Equal(t, expectedParsedLbls, actual.Parsed())
}

func TestLabelsBuilder_UnsortedLabels(t *testing.T) {
strs := []string{
"namespace", "loki",
"cluster", "us-central1",
"toreplace", "fuzz",
}
lbs := labels.FromStrings(strs...)
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
b.add[StructuredMetadataLabel] = labels.FromStrings("toreplace", "buzz", "fzz", "bzz")
b.add[ParsedLabel] = labels.FromStrings("pzz", "pvzz")
expected := labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "buzz", "pzz", "pvzz")
actual := b.UnsortedLabels(nil)
require.ElementsMatch(t, expected, actual)

b.Reset()
b.add[StructuredMetadataLabel] = labels.FromStrings("fzz", "bzz")
b.add[ParsedLabel] = labels.FromStrings("toreplace", "buzz", "pzz", "pvzz")
expected = labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "buzz", "pzz", "pvzz")
actual = b.UnsortedLabels(nil)
sort.Sort(expected)
sort.Sort(actual)
assert.Equal(t, expected, actual)

b.Reset()
b.add[StructuredMetadataLabel] = labels.FromStrings("fzz", "bzz", "toreplacezz", "test")
b.add[ParsedLabel] = labels.FromStrings("toreplacezz", "buzz", "pzz", "pvzz")
expected = labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "fuzz", "pzz", "pvzz", "toreplacezz", "buzz")
actual = b.UnsortedLabels(nil)
sort.Sort(expected)
sort.Sort(actual)
assert.Equal(t, expected, actual)
}

func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func ExtractLabelFiltersBeforeParser(e Expr) []*LabelFilterExpr {
)

visitor := &DepthFirstTraversal{
VisitLabelFilterFn: func(v RootVisitor, e *LabelFilterExpr) {
VisitLabelFilterFn: func(_ RootVisitor, e *LabelFilterExpr) {
if !foundParseStage {
filters = append(filters, e)
}
Expand All @@ -94,13 +94,13 @@ func ExtractLabelFiltersBeforeParser(e Expr) []*LabelFilterExpr {
// expression is added without updating this list, blooms can silently
// misbehave.

VisitLogfmtParserFn: func(v RootVisitor, e *LogfmtParserExpr) { foundParseStage = true },
VisitLabelParserFn: func(v RootVisitor, e *LabelParserExpr) { foundParseStage = true },
VisitJSONExpressionParserFn: func(v RootVisitor, e *JSONExpressionParser) { foundParseStage = true },
VisitLogfmtExpressionParserFn: func(v RootVisitor, e *LogfmtExpressionParser) { foundParseStage = true },
VisitLabelFmtFn: func(v RootVisitor, e *LabelFmtExpr) { foundParseStage = true },
VisitKeepLabelFn: func(v RootVisitor, e *KeepLabelsExpr) { foundParseStage = true },
VisitDropLabelsFn: func(v RootVisitor, e *DropLabelsExpr) { foundParseStage = true },
VisitLogfmtParserFn: func(_ RootVisitor, _ *LogfmtParserExpr) { foundParseStage = true },
VisitLabelParserFn: func(_ RootVisitor, _ *LabelParserExpr) { foundParseStage = true },
VisitJSONExpressionParserFn: func(_ RootVisitor, _ *JSONExpressionParser) { foundParseStage = true },
VisitLogfmtExpressionParserFn: func(_ RootVisitor, _ *LogfmtExpressionParser) { foundParseStage = true },
VisitLabelFmtFn: func(_ RootVisitor, _ *LabelFmtExpr) { foundParseStage = true },
VisitKeepLabelFn: func(_ RootVisitor, _ *KeepLabelsExpr) { foundParseStage = true },
VisitDropLabelsFn: func(_ RootVisitor, _ *DropLabelsExpr) { foundParseStage = true },
}
e.Accept(visitor)
return filters
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func (t *Loki) setupModuleManager() error {
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing},
Store: {Overrides, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing},
Querier: {Store, Ring, Server, IngesterQuerier, PatternRingClient, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing},
Expand Down
Loading

0 comments on commit 92c756d

Please sign in to comment.