Skip to content

Commit

Permalink
feat: Support usage trackers for received and discarded bytes. (grafa…
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies authored Feb 24, 2024
1 parent 2f54f8d commit 6578a00
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11840](https://github.com/grafana/loki/pull/11840) **jeschkies**: Allow custom usage trackers for ingested and discarded bytes metric.
* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results
* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods.
* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests.
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *Compactor) ownsTenant(tenant string) ([]v1.FingerprintBounds, bool, err
return nil, false, nil
}

// TOOD(owen-d): use <ReadRing>.GetTokenRangesForInstance()
// TODO(owen-d): use <ReadRing>.GetTokenRangesForInstance()
// when it's supported for non zone-aware rings
// instead of doing all this manually

Expand Down
47 changes: 35 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
Expand Down Expand Up @@ -126,6 +127,8 @@ type Distributor struct {
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter

usageTracker push.UsageTracker
}

// New a distributor creates.
Expand All @@ -138,6 +141,7 @@ func New(
registerer prometheus.Registerer,
metricsNamespace string,
tee Tee,
usageTracker push.UsageTracker,
logger log.Logger,
) (*Distributor, error) {
factory := cfg.factory
Expand All @@ -153,7 +157,7 @@ func New(
return client.New(internalCfg, addr)
}

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, usageTracker)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,6 +189,7 @@ func New(
healthyInstancesCount: atomic.NewUint32(0),
rateLimitStrat: rateLimitStrat,
tee: tee,
usageTracker: usageTracker,
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_ingester_appends_total",
Expand Down Expand Up @@ -337,7 +342,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)

stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
Expand All @@ -354,7 +360,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize := 0
prevTs := stream.Entries[0].Timestamp
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
if err := d.validator.ValidateEntry(validationContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
Expand Down Expand Up @@ -412,6 +418,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize))

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAdd(tenantID, validation.RateLimited, lbs, float64(discardedStreamBytes))
}
}
}

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
d.writeFailuresManager.Log(tenantID, err)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
Expand Down Expand Up @@ -684,30 +708,29 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
}

type labelData struct {
labels string
hash uint64
ls labels.Labels
hash uint64
}

func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, uint64, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
labelVal := val.(labelData)
return labelVal.labels, labelVal.hash, nil
return labelVal.ls, labelVal.ls.String(), labelVal.hash, nil
}

ls, err := syntax.ParseLabels(key)
if err != nil {
return "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}

if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", 0, err
return nil, "", 0, err
}

lsVal := ls.String()
lsHash := ls.Hash()

d.labelCache.Add(key, labelData{lsVal, lsHash})
return lsVal, lsHash, nil
d.labelCache.Add(key, labelData{ls, lsHash})
return ls, ls.String(), lsHash, nil
}

// shardCountFor returns the right number of shards to be used by the given stream.
Expand Down
10 changes: 5 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func TestStreamShard(t *testing.T) {
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(t, err)

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, nil)
require.NoError(t, err)

d := Distributor{
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestStreamShardAcrossCalls(t *testing.T) {
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(t, err)

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, nil)
require.NoError(t, err)

t.Run("it generates 4 shards across 2 calls when calculated shards = 2 * entries per call", func(t *testing.T) {
Expand Down Expand Up @@ -721,7 +721,7 @@ func BenchmarkShardStream(b *testing.B) {
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(b, err)

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, nil)
require.NoError(b, err)

distributorBuilder := func(shards int) *Distributor {
Expand Down Expand Up @@ -788,7 +788,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
Expand Down Expand Up @@ -1159,7 +1159,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, log.NewNopLogger())
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand Down
26 changes: 23 additions & 3 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)
Expand All @@ -18,13 +19,14 @@ const (

type Validator struct {
Limits
usageTracker push.UsageTracker
}

func NewValidator(l Limits) (*Validator, error) {
func NewValidator(l Limits, t push.UsageTracker) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
return &Validator{l, t}, nil
}

type validationContext struct {
Expand Down Expand Up @@ -67,7 +69,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
}

// ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error {
func (v Validator) ValidateEntry(ctx validationContext, labels labels.Labels, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
validation.LineLengthHist.Observe(float64(len(entry.Line)))

Expand All @@ -77,13 +79,19 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.GreaterThanMaxSampleAge, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}

if ts > ctx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.TooFarInFuture, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}

Expand All @@ -94,13 +102,19 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.LineTooLong, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

if len(entry.StructuredMetadata) > 0 {
if !ctx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.DisallowedStructuredMetadata, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

Expand All @@ -113,12 +127,18 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
if maxSize := ctx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.StructuredMetadataTooLarge, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, ctx.maxStructuredMetadataSize)
}

if maxCount := ctx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.StructuredMetadataTooMany, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, ctx.maxStructuredMetadataCount)
}
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

var (
testStreamLabels = "FIXME"
testTime = time.Now()
testStreamLabels = labels.Labels{{Name: "my", Value: "label"}}
testStreamLabelsString = testStreamLabels.String()
testTime = time.Now()
)

type fakeLimits struct {
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg,
testStreamLabels,
testStreamLabelsString,
testTime.Add(-time.Hour*5).Format(timeFormat),
testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge
),
Expand All @@ -71,7 +72,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)),
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabelsString, testTime.Add(time.Hour*5).Format(timeFormat)),
},
{
"line too long",
Expand All @@ -82,7 +83,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabelsString, 11),
},
{
"disallowed structured metadata",
Expand All @@ -93,7 +94,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}},
fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabels),
fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabelsString),
},
{
"structured metadata too big",
Expand All @@ -105,7 +106,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}},
fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabels, 6, 4),
fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabelsString, 6, 4),
},
{
"structured metadata too many",
Expand All @@ -117,7 +118,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}, {Name: "too", Value: "many"}}},
fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabels, 2, 1),
fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabelsString, 2, 1),
},
}
for _, tt := range tests {
Expand All @@ -126,7 +127,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
flagext.DefaultValues(l)
o, err := validation.NewOverrides(*l, tt.overrides)
assert.NoError(t, err)
v, err := NewValidator(o)
v, err := NewValidator(o, nil)
assert.NoError(t, err)

err = v.ValidateEntry(v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry)
Expand Down Expand Up @@ -224,7 +225,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
flagext.DefaultValues(l)
o, err := validation.NewOverrides(*l, tt.overrides)
assert.NoError(t, err)
v, err := NewValidator(o)
v, err := NewValidator(o, nil)
assert.NoError(t, err)

err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
Expand Down
Loading

0 comments on commit 6578a00

Please sign in to comment.