Skip to content

Commit

Permalink
add the ability to inject log parsers to the distributor
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle committed Feb 21, 2024
1 parent 6e4a9f3 commit 04cfa8d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 79 deletions.
3 changes: 3 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/grafana/loki/pkg/loghttp/push"
"math"
"net/http"
"sort"
Expand Down Expand Up @@ -121,6 +122,8 @@ type Distributor struct {
// Push failures rate limiter.
writeFailuresManager *writefailures.Manager

RequestParserWrapper push.RequestParserWrapper

// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendTimeouts *prometheus.CounterVec
Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if d.RequestParserWrapper != nil {
pushRequestParser = d.RequestParserWrapper(pushRequestParser)
}

req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package distributor

import (
"context"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -54,3 +58,28 @@ func TestDistributorRingHandler(t *testing.T) {
require.NotContains(t, string(body), "<th>Instance ID</th>")
})
}

func TestRequestParserWrapping(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.RejectOldSamples = false
distributors, _ := prepare(t, 1, 3, limits, nil)

var called bool
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
called = true
return requestParser
}

ctx := user.InjectOrgID(context.Background(), "test-user")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
require.NoError(t, err)

distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser)

require.True(t, called)
}

func stubParser(_ string, _ *http.Request, _ push.TenantsRetention, _ push.Limits) (*logproto.PushRequest, *push.Stats, error) {
return &logproto.PushRequest{}, &push.Stats{}, nil
}
32 changes: 16 additions & 16 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func init() {

func newPushStats() *Stats {
return &Stats{
logLinesBytes: map[time.Duration]int64{},
structuredMetadataBytes: map[time.Duration]int64{},
LogLinesBytes: map[time.Duration]int64{},
StructuredMetadataBytes: map[time.Duration]int64{},
}
}

Expand All @@ -55,11 +55,11 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe
}

func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
pushStats.contentEncoding = r.Header.Get(contentEnc)
pushStats.ContentEncoding = r.Header.Get(contentEnc)
// bodySize should always reflect the compressed size of the request body
bodySize := loki_util.NewSizeReader(r.Body)
var body io.Reader = bodySize
if pushStats.contentEncoding == gzipContentEncoding {
if pushStats.ContentEncoding == gzipContentEncoding {
r, err := gzip.NewReader(bodySize)
if err != nil {
return plog.NewLogs(), err
Expand All @@ -74,12 +74,12 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return plog.NewLogs(), err
}

pushStats.bodySize = bodySize.Size()
pushStats.BodySize = bodySize.Size()

req := plogotlp.NewExportRequest()

pushStats.contentType = r.Header.Get(contentType)
switch pushStats.contentType {
pushStats.ContentType = r.Header.Get(contentType)
switch pushStats.ContentType {
case pbContentType:
err := req.UnmarshalProto(buf)
if err != nil {
Expand Down Expand Up @@ -139,7 +139,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
})

if err := streamLabels.Validate(); err != nil {
stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err))
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
continue
}
labelsStr := streamLabels.String()
Expand All @@ -149,11 +149,11 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
pushRequestsByStream[labelsStr] = logproto.Stream{
Labels: labelsStr,
}
stats.streamLabelsSize += int64(labelsSize(logproto.FromLabelsToLabelAdapters(lbs)))
stats.StreamLabelsSize += int64(labelsSize(logproto.FromLabelsToLabelAdapters(lbs)))
}

resourceAttributesAsStructuredMetadataSize := labelsSize(resourceAttributesAsStructuredMetadata)
stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(resourceAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(resourceAttributesAsStructuredMetadataSize)

for j := 0; j < sls.Len(); j++ {
scope := sls.At(j).Scope()
Expand Down Expand Up @@ -203,7 +203,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

scopeAttributesAsStructuredMetadataSize := labelsSize(scopeAttributesAsStructuredMetadata)
stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(scopeAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(scopeAttributesAsStructuredMetadataSize)
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)

Expand All @@ -223,11 +223,11 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
stream.Entries = append(stream.Entries, entry)
pushRequestsByStream[labelsStr] = stream

stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
stats.logLinesBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(len(entry.Line))
stats.numLines++
if entry.Timestamp.After(stats.mostRecentEntryTimestamp) {
stats.mostRecentEntryTimestamp = entry.Timestamp
stats.StructuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
stats.LogLinesBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(len(entry.Line))
stats.NumLines++
if entry.Timestamp.After(stats.MostRecentEntryTimestamp) {
stats.MostRecentEntryTimestamp = entry.Timestamp
}
}
}
Expand Down
60 changes: 30 additions & 30 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
numLines: 1,
logLinesBytes: map[time.Duration]int64{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
},
structuredMetadataBytes: map[time.Duration]int64{
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
},
streamLabelsSize: 21,
mostRecentEntryTimestamp: now,
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
},
},
{
Expand Down Expand Up @@ -107,15 +107,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
numLines: 1,
logLinesBytes: map[time.Duration]int64{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
},
structuredMetadataBytes: map[time.Duration]int64{
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
},
streamLabelsSize: 27,
mostRecentEntryTimestamp: now,
StreamLabelsSize: 27,
MostRecentEntryTimestamp: now,
},
},
{
Expand Down Expand Up @@ -143,15 +143,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
numLines: 1,
logLinesBytes: map[time.Duration]int64{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
},
structuredMetadataBytes: map[time.Duration]int64{
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
},
streamLabelsSize: 47,
mostRecentEntryTimestamp: now,
StreamLabelsSize: 47,
MostRecentEntryTimestamp: now,
},
},
{
Expand Down Expand Up @@ -218,15 +218,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
numLines: 2,
logLinesBytes: map[time.Duration]int64{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
},
structuredMetadataBytes: map[time.Duration]int64{
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 37,
},
streamLabelsSize: 21,
mostRecentEntryTimestamp: now,
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
},
},
{
Expand Down Expand Up @@ -302,15 +302,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
numLines: 2,
logLinesBytes: map[time.Duration]int64{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
},
structuredMetadataBytes: map[time.Duration]int64{
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 97,
},
streamLabelsSize: 21,
mostRecentEntryTimestamp: now,
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
},
},
{
Expand Down Expand Up @@ -445,15 +445,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
numLines: 2,
logLinesBytes: map[time.Duration]int64{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
},
structuredMetadataBytes: map[time.Duration]int64{
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 113,
},
streamLabelsSize: 42,
mostRecentEntryTimestamp: now,
StreamLabelsSize: 42,
MostRecentEntryTimestamp: now,
},
},
} {
Expand Down
Loading

0 comments on commit 04cfa8d

Please sign in to comment.