Skip to content

Commit

Permalink
fix: align semantics of metric and log query label extraction (#11587)
Browse files Browse the repository at this point in the history
both metric and log queries use the first extracted label when multiple values are requested for the same label

Fixes #11647

(cherry picked from commit 9759c13)
  • Loading branch information
trevorwhitney authored and grafana-delivery-bot[bot] committed Jan 11, 2024
1 parent cbf4ffa commit 5283e07
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules.
* [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations.
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested.

##### Changes

Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,13 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
return "", false
}

if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
_, alwaysExtract := keys[sanitized]
if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
return sanitized, true
})

if !ok {
continue
}
Expand Down Expand Up @@ -530,6 +532,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
}
}
}

if l.strict && l.dec.Err() != nil {
addErrLabel(errLogfmt, l.dec.Err(), lbs)
return line, true
Expand Down
31 changes: 15 additions & 16 deletions pkg/logql/log/parser_hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ type Hints struct {
}

func (p *Hints) ShouldExtract(key string) bool {
if len(p.requiredLabels) == 0 {
return true
}

for _, l := range p.extracted {
if l == key {
return false
Expand All @@ -74,7 +70,7 @@ func (p *Hints) ShouldExtract(key string) bool {
}
}

return false
return len(p.requiredLabels) == 0
}

func (p *Hints) ShouldExtractPrefix(prefix string) bool {
Expand All @@ -95,19 +91,25 @@ func (p *Hints) NoLabels() bool {
}

func (p *Hints) RecordExtracted(key string) {
for _, l := range p.requiredLabels {
if l == key {
p.extracted = append(p.extracted, key)
return
}
}
p.extracted = append(p.extracted, key)
}

func (p *Hints) AllRequiredExtracted() bool {
if len(p.requiredLabels) == 0 {
if len(p.requiredLabels) == 0 || len(p.extracted) < len(p.requiredLabels) {
return false
}
return len(p.extracted) == len(p.requiredLabels)

found := 0
for _, l := range p.requiredLabels {
for _, e := range p.extracted {
if l == e {
found++
break
}
}
}

return len(p.requiredLabels) == found
}

func (p *Hints) Reset() {
Expand Down Expand Up @@ -172,9 +174,6 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool,
return ph
}

ph.requiredLabels = hints
ph.shouldPreserveError = containsError(hints)

return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)}
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/logql/log/parser_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ var (
"response": {
"status": 204,
"latency_seconds": "30.001"
}
},
"message": {
"message": "foo",
}
}`)

packedLine = []byte(`{
Expand Down Expand Up @@ -58,14 +61,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
1.0,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
Expand Down Expand Up @@ -114,14 +117,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
30.001,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
Expand Down Expand Up @@ -214,6 +217,13 @@ func Test_ParserHints(t *testing.T) {
0,
``,
},
{
`sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`,
jsonLine,
true,
1,
`{app="nginx", message_message="foo"}`,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {
Expand Down

0 comments on commit 5283e07

Please sign in to comment.