diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f47148fa42b0d..cdc7fd6cc1488 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/grafana/loki/pkg/loghttp/push" "math" "net/http" "sort" @@ -121,6 +122,8 @@ type Distributor struct { // Push failures rate limiter. writeFailuresManager *writefailures.Manager + RequestParserWrapper push.RequestParserWrapper + // metrics ingesterAppends *prometheus.CounterVec ingesterAppendTimeouts *prometheus.CounterVec diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index ce242355e077b..d0012ff1d07e2 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -34,6 +34,11 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe http.Error(w, err.Error(), http.StatusBadRequest) return } + + if d.RequestParserWrapper != nil { + pushRequestParser = d.RequestParserWrapper(pushRequestParser) + } + req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser) if err != nil { if d.tenantConfigs.LogPushRequest(tenantID) { diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index 9b72ea85c748b..9a1cc057f40ba 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -1,6 +1,10 @@ package distributor import ( + "context" + "github.com/grafana/dskit/user" + "github.com/grafana/loki/pkg/loghttp/push" + "github.com/grafana/loki/pkg/logproto" "io" "net/http" "net/http/httptest" @@ -54,3 +58,28 @@ func TestDistributorRingHandler(t *testing.T) { require.NotContains(t, string(body), "Instance ID") }) } + +func TestRequestParserWrapping(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.RejectOldSamples = false + distributors, _ := prepare(t, 1, 3, limits, nil) + + var called bool + distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser { + called = true + return requestParser + } + + ctx := user.InjectOrgID(context.Background(), "test-user") + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil) + require.NoError(t, err) + + distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser) + + require.True(t, called) +} + +func stubParser(_ string, _ *http.Request, _ push.TenantsRetention, _ push.Limits) (*logproto.PushRequest, *push.Stats, error) { + return &logproto.PushRequest{}, &push.Stats{}, nil +} diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index c25477a984e24..2882c6b7c512f 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -38,8 +38,8 @@ func init() { func newPushStats() *Stats { return &Stats{ - logLinesBytes: map[time.Duration]int64{}, - structuredMetadataBytes: map[time.Duration]int64{}, + LogLinesBytes: map[time.Duration]int64{}, + StructuredMetadataBytes: map[time.Duration]int64{}, } } @@ -55,11 +55,11 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe } func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { - pushStats.contentEncoding = r.Header.Get(contentEnc) + pushStats.ContentEncoding = r.Header.Get(contentEnc) // bodySize should always reflect the compressed size of the request body bodySize := loki_util.NewSizeReader(r.Body) var body io.Reader = bodySize - if pushStats.contentEncoding == gzipContentEncoding { + if pushStats.ContentEncoding == gzipContentEncoding { r, err := gzip.NewReader(bodySize) if err != nil { return plog.NewLogs(), err @@ -74,12 +74,12 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return plog.NewLogs(), err } - pushStats.bodySize = bodySize.Size() + pushStats.BodySize = bodySize.Size() req := plogotlp.NewExportRequest() - pushStats.contentType = r.Header.Get(contentType) - switch pushStats.contentType { + pushStats.ContentType = r.Header.Get(contentType) + switch pushStats.ContentType { case pbContentType: err := req.UnmarshalProto(buf) if err != nil { @@ -139,7 +139,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants }) if err := streamLabels.Validate(); err != nil { - stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err)) + stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err)) continue } labelsStr := streamLabels.String() @@ -149,11 +149,11 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants pushRequestsByStream[labelsStr] = logproto.Stream{ Labels: labelsStr, } - stats.streamLabelsSize += int64(labelsSize(logproto.FromLabelsToLabelAdapters(lbs))) + stats.StreamLabelsSize += int64(labelsSize(logproto.FromLabelsToLabelAdapters(lbs))) } resourceAttributesAsStructuredMetadataSize := labelsSize(resourceAttributesAsStructuredMetadata) - stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(resourceAttributesAsStructuredMetadataSize) + stats.StructuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(resourceAttributesAsStructuredMetadataSize) for j := 0; j < sls.Len(); j++ { scope := sls.At(j).Scope() @@ -203,7 +203,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants } scopeAttributesAsStructuredMetadataSize := labelsSize(scopeAttributesAsStructuredMetadata) - stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(scopeAttributesAsStructuredMetadataSize) + stats.StructuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(scopeAttributesAsStructuredMetadataSize) for k := 0; k < logs.Len(); k++ { log := logs.At(k) @@ -223,11 +223,11 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants stream.Entries = append(stream.Entries, entry) pushRequestsByStream[labelsStr] = stream - stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize) - stats.logLinesBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(len(entry.Line)) - stats.numLines++ - if entry.Timestamp.After(stats.mostRecentEntryTimestamp) { - stats.mostRecentEntryTimestamp = entry.Timestamp + stats.StructuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize) + stats.LogLinesBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(len(entry.Line)) + stats.NumLines++ + if entry.Timestamp.After(stats.MostRecentEntryTimestamp) { + stats.MostRecentEntryTimestamp = entry.Timestamp } } } diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index badb6cd000e4b..c1485522666cc 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -71,15 +71,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, expectedStats: Stats{ - numLines: 1, - logLinesBytes: map[time.Duration]int64{ + NumLines: 1, + LogLinesBytes: map[time.Duration]int64{ time.Hour: 9, }, - structuredMetadataBytes: map[time.Duration]int64{ + StructuredMetadataBytes: map[time.Duration]int64{ time.Hour: 0, }, - streamLabelsSize: 21, - mostRecentEntryTimestamp: now, + StreamLabelsSize: 21, + MostRecentEntryTimestamp: now, }, }, { @@ -107,15 +107,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, expectedStats: Stats{ - numLines: 1, - logLinesBytes: map[time.Duration]int64{ + NumLines: 1, + LogLinesBytes: map[time.Duration]int64{ time.Hour: 9, }, - structuredMetadataBytes: map[time.Duration]int64{ + StructuredMetadataBytes: map[time.Duration]int64{ time.Hour: 0, }, - streamLabelsSize: 27, - mostRecentEntryTimestamp: now, + StreamLabelsSize: 27, + MostRecentEntryTimestamp: now, }, }, { @@ -143,15 +143,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, expectedStats: Stats{ - numLines: 1, - logLinesBytes: map[time.Duration]int64{ + NumLines: 1, + LogLinesBytes: map[time.Duration]int64{ time.Hour: 9, }, - structuredMetadataBytes: map[time.Duration]int64{ + StructuredMetadataBytes: map[time.Duration]int64{ time.Hour: 0, }, - streamLabelsSize: 47, - mostRecentEntryTimestamp: now, + StreamLabelsSize: 47, + MostRecentEntryTimestamp: now, }, }, { @@ -218,15 +218,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, expectedStats: Stats{ - numLines: 2, - logLinesBytes: map[time.Duration]int64{ + NumLines: 2, + LogLinesBytes: map[time.Duration]int64{ time.Hour: 26, }, - structuredMetadataBytes: map[time.Duration]int64{ + StructuredMetadataBytes: map[time.Duration]int64{ time.Hour: 37, }, - streamLabelsSize: 21, - mostRecentEntryTimestamp: now, + StreamLabelsSize: 21, + MostRecentEntryTimestamp: now, }, }, { @@ -302,15 +302,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, expectedStats: Stats{ - numLines: 2, - logLinesBytes: map[time.Duration]int64{ + NumLines: 2, + LogLinesBytes: map[time.Duration]int64{ time.Hour: 26, }, - structuredMetadataBytes: map[time.Duration]int64{ + StructuredMetadataBytes: map[time.Duration]int64{ time.Hour: 97, }, - streamLabelsSize: 21, - mostRecentEntryTimestamp: now, + StreamLabelsSize: 21, + MostRecentEntryTimestamp: now, }, }, { @@ -445,15 +445,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, expectedStats: Stats{ - numLines: 2, - logLinesBytes: map[time.Duration]int64{ + NumLines: 2, + LogLinesBytes: map[time.Duration]int64{ time.Hour: 26, }, - structuredMetadataBytes: map[time.Duration]int64{ + StructuredMetadataBytes: map[time.Duration]int64{ time.Hour: 113, }, - streamLabelsSize: 42, - mostRecentEntryTimestamp: now, + StreamLabelsSize: 42, + MostRecentEntryTimestamp: now, }, }, } { diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 15b7bba0a78c9..e66d90cc46be2 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -4,6 +4,7 @@ import ( "compress/flate" "compress/gzip" "fmt" + "github.com/go-kit/log/level" "io" "math" "mime" @@ -12,7 +13,6 @@ import ( "github.com/dustin/go-humanize" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" @@ -62,18 +62,23 @@ type Limits interface { OTLPConfig(userID string) OTLPConfig } +type RequestParserWrapper func(inner RequestParser) RequestParser + type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) type Stats struct { - errs []error - numLines int64 - logLinesBytes map[time.Duration]int64 - structuredMetadataBytes map[time.Duration]int64 - streamLabelsSize int64 - mostRecentEntryTimestamp time.Time - contentType string - contentEncoding string - bodySize int64 + Errs []error + NumLines int64 + LogLinesBytes map[time.Duration]int64 + StructuredMetadataBytes map[time.Duration]int64 + StreamLabelsSize int64 + MostRecentEntryTimestamp time.Time + ContentType string + ContentEncoding string + BodySize int64 + + // Extra is a place for a wrapped perser to record any interesting stats as key-value pairs to be logged + Extra []any } func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser) (*logproto.PushRequest, error) { @@ -86,7 +91,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete entriesSize int64 structuredMetadataSize int64 ) - for retentionPeriod, size := range pushStats.logLinesBytes { + for retentionPeriod, size := range pushStats.LogLinesBytes { var retentionHours string if retentionPeriod > 0 { retentionHours = fmt.Sprintf("%d", int64(math.Floor(retentionPeriod.Hours()))) @@ -97,7 +102,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete entriesSize += size } - for retentionPeriod, size := range pushStats.structuredMetadataBytes { + for retentionPeriod, size := range pushStats.StructuredMetadataBytes { var retentionHours string if retentionPeriod > 0 { retentionHours = fmt.Sprintf("%d", int64(math.Floor(retentionPeriod.Hours()))) @@ -113,25 +118,28 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete } // incrementing tenant metrics if we have a tenant. - if pushStats.numLines != 0 && userID != "" { - linesIngested.WithLabelValues(userID).Add(float64(pushStats.numLines)) + if pushStats.NumLines != 0 && userID != "" { + linesIngested.WithLabelValues(userID).Add(float64(pushStats.NumLines)) } - linesReceivedStats.Inc(pushStats.numLines) + linesReceivedStats.Inc(pushStats.NumLines) - level.Debug(logger).Log( + logValues := []interface{}{ "msg", "push request parsed", "path", r.URL.Path, - "contentType", pushStats.contentType, - "contentEncoding", pushStats.contentEncoding, - "bodySize", humanize.Bytes(uint64(pushStats.bodySize)), + "contentType", pushStats.ContentType, + "contentEncoding", pushStats.ContentEncoding, + "bodySize", humanize.Bytes(uint64(pushStats.BodySize)), "streams", len(req.Streams), - "entries", pushStats.numLines, - "streamLabelsSize", humanize.Bytes(uint64(pushStats.streamLabelsSize)), + "entries", pushStats.NumLines, + "streamLabelsSize", humanize.Bytes(uint64(pushStats.StreamLabelsSize)), "entriesSize", humanize.Bytes(uint64(entriesSize)), "structuredMetadataSize", humanize.Bytes(uint64(structuredMetadataSize)), - "totalSize", humanize.Bytes(uint64(entriesSize+pushStats.streamLabelsSize)), - "mostRecentLagMs", time.Since(pushStats.mostRecentEntryTimestamp).Milliseconds(), - ) + "totalSize", humanize.Bytes(uint64(entriesSize + pushStats.StreamLabelsSize)), + "mostRecentLagMs", time.Since(pushStats.MostRecentEntryTimestamp).Milliseconds(), + } + logValues = append(logValues, pushStats.Extra...) + level.Debug(logger).Log(logValues...) + return req, nil } @@ -200,12 +208,12 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe } } - pushStats.bodySize = bodySize.Size() - pushStats.contentType = contentType - pushStats.contentEncoding = contentEncoding + pushStats.BodySize = bodySize.Size() + pushStats.ContentType = contentType + pushStats.ContentEncoding = contentEncoding for _, s := range req.Streams { - pushStats.streamLabelsSize += int64(len(s.Labels)) + pushStats.StreamLabelsSize += int64(len(s.Labels)) var retentionPeriod time.Duration if tenantsRetention != nil { lbs, err := syntax.ParseLabels(s.Labels) @@ -215,15 +223,15 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) } for _, e := range s.Entries { - pushStats.numLines++ + pushStats.NumLines++ var entryLabelsSize int64 for _, l := range e.StructuredMetadata { entryLabelsSize += int64(len(l.Name) + len(l.Value)) } - pushStats.logLinesBytes[retentionPeriod] += int64(len(e.Line)) - pushStats.structuredMetadataBytes[retentionPeriod] += entryLabelsSize - if e.Timestamp.After(pushStats.mostRecentEntryTimestamp) { - pushStats.mostRecentEntryTimestamp = e.Timestamp + pushStats.LogLinesBytes[retentionPeriod] += int64(len(e.Line)) + pushStats.StructuredMetadataBytes[retentionPeriod] += entryLabelsSize + if e.Timestamp.After(pushStats.MostRecentEntryTimestamp) { + pushStats.MostRecentEntryTimestamp = e.Timestamp } } }