diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 01447fb71e8c6..eef02ae112f3b 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2709,9 +2709,17 @@ shard_streams: # CLI flag: -index-gateway.shard-size [index_gateway_shard_size: | default = 0] -# Allow user to send structured metadata (non-indexed labels) in push payload. +# Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] + +# Maximum size accepted for structured metadata per log line. +# CLI flag: -limits.max-structured-metadata-size +[max_structured_metadata_size: | default = 64KB] + +# Maximum number of structured metadata entries per log line. +# CLI flag: -limits.max-structured-metadata-entries-count +[max_structured_metadata_entries_count: | default = 128] ``` ### frontend_worker diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 78da901ded3e5..c3909ce69f651 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -28,4 +28,6 @@ type Limits interface { IngestionRateBytes(userID string) float64 IngestionBurstSizeBytes(userID string) int AllowStructuredMetadata(userID string) bool + MaxStructuredMetadataSize(userID string) int + MaxStructuredMetadataCount(userID string) int } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index eeef4fb080e26..7fe76fae78231 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -41,7 +41,9 @@ type validationContext struct { incrementDuplicateTimestamps bool - allowStructuredMetadata bool + allowStructuredMetadata bool + maxStructuredMetadataSize int + maxStructuredMetadataCount int userID string } @@ -59,6 +61,8 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val maxLabelValueLength: v.MaxLabelValueLength(userID), incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID), allowStructuredMetadata: v.AllowStructuredMetadata(userID), + maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), + maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), } } @@ -93,10 +97,30 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) } - if !ctx.allowStructuredMetadata && len(entry.StructuredMetadata) > 0 { - validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Add(float64(len(entry.Line))) - return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels) + if len(entry.StructuredMetadata) > 0 { + if !ctx.allowStructuredMetadata { + validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Inc() + validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Add(float64(len(entry.Line))) + return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels) + } + + var structuredMetadataSizeBytes, structuredMetadataCount int + for _, metadata := range entry.StructuredMetadata { + structuredMetadataSizeBytes += len(metadata.Name) + len(metadata.Value) + structuredMetadataCount++ + } + + if maxSize := ctx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { + validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Inc() + validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Add(float64(len(entry.Line))) + return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, ctx.maxStructuredMetadataSize) + } + + if maxCount := ctx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { + validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Inc() + validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Add(float64(len(entry.Line))) + return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, ctx.maxStructuredMetadataCount) + } } return nil diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 85c4b7a60f3fe..038f1dc4c5b78 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -95,6 +95,30 @@ func TestValidator_ValidateEntry(t *testing.T) { logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}}, fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabels), }, + { + "structured metadata too big", + "test", + fakeLimits{ + &validation.Limits{ + AllowStructuredMetadata: true, + MaxStructuredMetadataSize: 4, + }, + }, + logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}}, + fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabels, 6, 4), + }, + { + "structured metadata too many", + "test", + fakeLimits{ + &validation.Limits{ + AllowStructuredMetadata: true, + MaxStructuredMetadataEntriesCount: 1, + }, + }, + logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}, {Name: "too", Value: "many"}}}, + fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabels, 2, 1), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index c38358fc4f831..ee394f3d82633 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -52,6 +52,9 @@ const ( defaultPerStreamBurstLimit = 5 * defaultPerStreamRateLimit DefaultPerTenantQueryTimeout = "1m" + + defaultMaxStructuredMetadataSize = "64kb" + defaultMaxStructuredMetadataCount = 128 ) // Limits describe all the limits for users; can be used to describe global default @@ -180,7 +183,9 @@ type Limits struct { IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` - AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata (non-indexed labels) in push payload."` + AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` + MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` + MaxStructuredMetadataEntriesCount int `yaml:"max_structured_metadata_entries_count" json:"max_structured_metadata_entries_count" doc:"description=Maximum number of structured metadata entries per log line."` } type StreamRetention struct { @@ -292,6 +297,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint") f.BoolVar(&l.AllowStructuredMetadata, "validation.allow-structured-metadata", false, "Allow user to send structured metadata (non-indexed labels) in push payload.") + _ = l.MaxStructuredMetadataSize.Set(defaultMaxStructuredMetadataSize) + f.Var(&l.MaxStructuredMetadataSize, "limits.max-structured-metadata-size", "Maximum size accepted for structured metadata per entry. Default: 64 kb. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.") + f.IntVar(&l.MaxStructuredMetadataEntriesCount, "limits.max-structured-metadata-entries-count", defaultMaxStructuredMetadataCount, "Maximum number of structured metadata entries per log line. Default: 128. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.") + } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -771,6 +780,14 @@ func (o *Overrides) AllowStructuredMetadata(userID string) bool { return o.getOverridesForUser(userID).AllowStructuredMetadata } +func (o *Overrides) MaxStructuredMetadataSize(userID string) int { + return o.getOverridesForUser(userID).MaxStructuredMetadataSize.Val() +} + +func (o *Overrides) MaxStructuredMetadataCount(userID string) int { + return o.getOverridesForUser(userID).MaxStructuredMetadataEntriesCount +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 51915f9724de9..9cdef90385dff 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -62,6 +62,10 @@ const ( DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'" DisallowedStructuredMetadata = "disallowed_structured_metadata" DisallowedStructuredMetadataErrorMsg = "stream '%s' includes structured metadata, but this feature is disallowed. Please see `limits_config.structured_metadata` or contact your Loki administrator to enable it." + StructuredMetadataTooLarge = "structured_metadata_too_large" + StructuredMetadataTooLargeErrorMsg = "stream '%s' has structured metadata too large: '%d' bytes, limit: '%d' bytes. Please see `limits_config.structured_metadata_max_size` or contact your Loki administrator to increase it." + StructuredMetadataTooMany = "structured_metadata_too_many" + StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it." ) type ErrStreamRateLimit struct {