Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(loki): include structured_metadata size while asserting rate limit #14571

3 changes: 2 additions & 1 deletion docs/sources/get-started/labels/structured-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ See the [Promtail: Structured metadata stage](https://grafana.com/docs/loki/<LOK
With Loki version 1.2.0, support for structured metadata has been added to the Logstash output plugin. For more information, see [logstash](https://grafana.com/docs/loki/<LOKI_VERSION>/send-data/logstash/).

{{% admonition type="warning" %}}
There are defaults for how much structured metadata can be attached per log line.
Structured metadata size is taken into account while asserting ingestion rate limiting.
Along with that, there are separate limits on how much structured metadata can be attached per log line.
```
# Maximum size accepted for structured metadata per log line.
# CLI flag: -limits.max-structured-metadata-size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ It is recommended that Loki operators set up alerts or dashboards with these met

### Terminology

- **sample**: a log line
- **sample**: a log line with [structured metadata]({{< relref "../get-started/labels/structured-metadata" >}})
- **stream**: samples with a unique combination of labels
- **active stream**: streams that are present in the ingesters - these have recently received log lines within the `chunk_idle_period` period (default: 30m)

Expand Down
3 changes: 2 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3260,7 +3260,8 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -distributor.ingestion-rate-limit-strategy
[ingestion_rate_strategy: <string> | default = "global"]

# Per-user ingestion rate limit in sample size per second. Units in MB.
# Per-user ingestion rate limit in sample size per second. Sample size includes
# size of the logs line and the size of structured metadata labels. Units in MB.
# CLI flag: -distributor.ingestion-rate-limit-mb
[ingestion_rate_mb: <float> | default = 4]

Expand Down
14 changes: 4 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries)))
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(bytes))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(discardedBytes))
continue
}

Expand Down Expand Up @@ -527,7 +524,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

n++
validatedLineSize += len(entry.Line)
validatedLineSize += util.EntryTotalSize(&entry)
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
validatedLineCount++
pushSize += len(entry.Line)
}
Expand Down Expand Up @@ -706,10 +703,7 @@ func (d *Distributor) trackDiscardedData(
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}
discardedStreamBytes := util.EntriesTotalSize(stream.Entries)

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAdd(ctx, tenantID, reason, lbs, float64(discardedStreamBytes))
Expand Down
58 changes: 33 additions & 25 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
Expand Down Expand Up @@ -54,7 +55,12 @@ var (
)

func TestDistributor(t *testing.T) {
ingestionRateLimit := 0.000096 // 100 Bytes/s limit
lineSize := 10
// detected_level label will be added to all log entries
strucutredMetadataSize := len(constants.LevelLabel) + len(constants.LogLevelUnknown)
entryTotalSize := lineSize + strucutredMetadataSize
ingestionRateLimit := datasize.ByteSize(400)
ingestionRateLimitMB := ingestionRateLimit.MBytes() // 400 Bytes/s limit

for i, tc := range []struct {
lines int
Expand All @@ -72,7 +78,7 @@ func TestDistributor(t *testing.T) {
{
lines: 100,
streams: 1,
expectedErrors: []error{httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 100, 1000)},
expectedErrors: []error{httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", ingestionRateLimit, 100, 100*entryTotalSize)},
},
{
lines: 100,
Expand Down Expand Up @@ -104,15 +110,15 @@ func TestDistributor(t *testing.T) {
t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.IngestionRateMB = ingestionRateLimitMB
limits.IngestionBurstSizeMB = ingestionRateLimitMB
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

distributors, _ := prepare(t, 1, 5, limits, nil)

var request logproto.PushRequest
for i := 0; i < tc.streams; i++ {
req := makeWriteRequest(tc.lines, 10)
req := makeWriteRequest(tc.lines, lineSize)
request.Streams = append(request.Streams, req.Streams[0])
}

Expand Down Expand Up @@ -1178,37 +1184,37 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
"local strategy: limit should be set to each distributor": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRateMB: 10 * (1.0 / float64(bytesInMB)),
ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)),
ingestionRateMB: datasize.ByteSize(100).MBytes(),
ingestionBurstSizeMB: datasize.ByteSize(100).MBytes(),
pushes: []testPush{
{bytes: 5, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 10, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 10, 1, 1)},
{bytes: 50, expectedError: nil},
{bytes: 60, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 60)},
{bytes: 50, expectedError: nil},
{bytes: 40, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 40)},
},
},
"global strategy: limit should be evenly shared across distributors": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRateMB: 10 * (1.0 / float64(bytesInMB)),
ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)),
ingestionRateMB: datasize.ByteSize(200).MBytes(),
ingestionBurstSizeMB: datasize.ByteSize(100).MBytes(),
pushes: []testPush{
{bytes: 3, expectedError: nil},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 3)},
{bytes: 2, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 1)},
{bytes: 60, expectedError: nil},
{bytes: 50, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 50)},
{bytes: 40, expectedError: nil},
{bytes: 30, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 30)},
},
},
"global strategy: burst should set to each distributor": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRateMB: 10 * (1.0 / float64(bytesInMB)),
ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)),
ingestionRateMB: datasize.ByteSize(100).MBytes(),
ingestionBurstSizeMB: datasize.ByteSize(200).MBytes(),
pushes: []testPush{
{bytes: 15, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 1)},
{bytes: 150, expectedError: nil},
{bytes: 60, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 50, 1, 60)},
{bytes: 50, expectedError: nil},
{bytes: 30, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 50, 1, 30)},
},
},
}
Expand All @@ -1223,12 +1229,14 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {

distributors, _ := prepare(t, testData.distributors, 5, limits, nil)
for _, push := range testData.pushes {
request := makeWriteRequest(1, push.bytes)
// each log entry will be added structured metadata label `detected_level:"unknown"` that adds additional 21 bytes on distributor side.
structuredMetadataSize := 21
request := makeWriteRequest(1, push.bytes-structuredMetadataSize)
response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.NoError(t, err)
assert.Equal(t, success, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
Expand Down
43 changes: 19 additions & 24 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -82,25 +83,28 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
validation.LineLengthHist.Observe(float64(len(entry.Line)))
structuredMetadataCount := len(entry.StructuredMetadata)
structuredMetadataSizeBytes := util.StructuredMetadataSize(entry.StructuredMetadata)
entrySize := float64(len(entry.Line) + structuredMetadataSizeBytes)

if vCtx.rejectOldSample && ts < vCtx.rejectOldSampleMaxAge {
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize)
}
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}

if ts > vCtx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize)
}
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}
Expand All @@ -111,43 +115,37 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize)
}
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

if len(entry.StructuredMetadata) > 0 {
if structuredMetadataCount > 0 {
if !vCtx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize)
}
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

var structuredMetadataSizeBytes, structuredMetadataCount int
for _, metadata := range entry.StructuredMetadata {
structuredMetadataSizeBytes += len(metadata.Name) + len(metadata.Value)
structuredMetadataCount++
}

if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize)
}
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize)
}

if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize)
}
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount)
}
Expand Down Expand Up @@ -207,10 +205,7 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (b
}

func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(len(stream.Entries)))
bytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes))
}
6 changes: 2 additions & 4 deletions pkg/ingester-rf1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/validation"
Expand Down Expand Up @@ -269,10 +270,7 @@ func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logp
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
bytes := util.EntriesTotalSize(pushReqStream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
Expand Down
11 changes: 8 additions & 3 deletions pkg/ingester-rf1/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/flagext"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -246,14 +247,18 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
}

lineBytes := len(entries[i].Line)
metadataBytes := 0
for _, metadata := range entries[i].StructuredMetadata {
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
metadataBytes += len(metadata.Name) + len(metadata.Value)
}
totalBytes += lineBytes

now := time.Now()
if !rateLimitWholeStream && !s.limiter.AllowN(now, len(entries[i].Line)) {
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(lineBytes)}})
s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e)
rateLimitedSamples++
rateLimitedBytes += lineBytes
rateLimitedBytes += lineBytes + metadataBytes
continue
}

Expand All @@ -263,7 +268,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
outOfOrderSamples++
outOfOrderBytes += lineBytes
outOfOrderBytes += lineBytes + metadataBytes
continue
}

Expand All @@ -288,7 +293,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
failedEntriesWithError = make([]entryWithError, 0, len(toStore))
for i := 0; i < len(toStore); i++ {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}})
rateLimitedBytes += len(toStore[i].Line)
rateLimitedBytes += util.EntryTotalSize(toStore[i])
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,7 @@ func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logp
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
bytes := util.EntriesTotalSize(pushReqStream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
Expand Down
Loading
Loading