Skip to content

Commit

Permalink
feat!: enable structure metadata by default. (grafana#12453)
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Welch <[email protected]>
  • Loading branch information
slim-bean authored and rhnasc committed Apr 12, 2024
1 parent 741a065 commit bc04a85
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 116 deletions.
4 changes: 2 additions & 2 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,7 @@ shard_streams:

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
[allow_structured_metadata: <boolean> | default = true]

# Maximum size accepted for structured metadata per log line.
# CLI flag: -limits.max-structured-metadata-size
Expand Down Expand Up @@ -4792,7 +4792,7 @@ The `period_config` block configures what index schemas should be used for from
# gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc.
[object_store: <string> | default = ""]
# The schema version to use, current recommended schema is v12.
# The schema version to use, current recommended schema is v13.
[schema: <string> | default = ""]
# Configures how the index is updated and stored.
Expand Down
58 changes: 0 additions & 58 deletions docs/sources/configure/examples/configuration-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,61 +404,3 @@ memberlist:

```


## 16-(Deprecated)-Cassandra-Snippet.yaml

```yaml

# This is a partial config that uses the local filesystem for chunk storage and Cassandra for index storage
# WARNING - DEPRECATED: The Cassandra index store is deprecated and will be removed in a future release.

schema_config:
configs:
- from: 2020-05-15
store: cassandra
object_store: filesystem
schema: v12
index:
prefix: cassandra_table
period: 168h

storage_config:
cassandra:
username: cassandra
password: cassandra
addresses: 127.0.0.1
auth: true
keyspace: lokiindex

filesystem:
directory: /tmp/loki/chunks


```


## 17-(Deprecated)-S3-And-DynamoDB-Snippet.yaml

```yaml

# This partial configuration uses S3 for chunk storage and uses DynamoDB for index storage
# WARNING - DEPRECATED: The DynamoDB index store is deprecated and will be removed in a future release.

schema_config:
configs:
- from: 2020-05-15
store: aws
object_store: s3
schema: v12
index:
prefix: loki_

storage_config:
aws:
s3: s3://access_key:secret_access_key@region/bucket_name
dynamodb:
dynamodb_url: dynamodb://access_key:secret_access_key@region


```

This file was deleted.

This file was deleted.

8 changes: 8 additions & 0 deletions docs/sources/setup/upgrade/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ This new metric will provide a more clear signal that there is an issue with ing
| `legacy-read-mode` | false | true | Deprecated. It will be removed in the next minor release. |
{{% /responsive-table %}}

#### Structured Metadata, Open Telemetry, Schemas and Indexes

A flagship feature of Loki 3.0 is native support for the Open Telemetry Protocol (OTLP). This is made possible by a new feature in Loki called [Structured Metadata]({{< relref "../../get-started/labels/structured-metadata" >}}), a place for metadata which doesn't belong in labels or log lines. OTel resources and attributes are often a great example of data which doesn't belong in the index nor in the log line.

Structured Metadata is enabled by default in Loki 3.0, however, it requires your active schema be using both the `TSDB` index type AND the `v13` storage schema. If you are not using both of these you have two options:
* Upgrade your index version and schema version before updating to 3.0, see [schema config upgrade]({{< relref "../../operations/storage/schema#changing-the-schema" >}}).
* Disable Structured Metadata (and therefor OTLP support) and upgrade to 3.0 and perform the schema migration after. This can be done by setting `allow_structured_metadata: false` in the `limits_config` section or set the command line argument `-validation.allow-structured-metadata=false`.

#### Automatic stream sharding is enabled by default

Automatic stream sharding helps keep the write load of high volume streams balanced across ingesters and helps to avoid hot-spotting. Check out the [operations page](https://grafana.com/docs/loki/latest/operations/automatic-stream-sharding/) for more information
Expand Down
2 changes: 1 addition & 1 deletion integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type pushRequest struct {
}

func TestMicroServicesDeleteRequest(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithBoltDBAndBoltDB, func(c *cluster.Cluster) {
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
Expand Down
30 changes: 23 additions & 7 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
)

func TestMicroServicesIngestQuery(t *testing.T) {
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
Expand Down Expand Up @@ -227,10 +229,12 @@ func TestMicroServicesIngestQueryWithSchemaChange(t *testing.T) {
"compactor",
"-target=compactor",
"-compactor.compaction-interval=1s",
"-validation.allow-structured-metadata=false",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
"-validation.allow-structured-metadata=false",
)
)
require.NoError(t, clu.Run())
Expand All @@ -241,11 +245,13 @@ func TestMicroServicesIngestQueryWithSchemaChange(t *testing.T) {
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-validation.allow-structured-metadata=false",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-validation.allow-structured-metadata=false",
)
)
require.NoError(t, clu.Run())
Expand All @@ -258,12 +264,14 @@ func TestMicroServicesIngestQueryWithSchemaChange(t *testing.T) {
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
"-validation.allow-structured-metadata=false",
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-validation.allow-structured-metadata=false",
)
)
require.NoError(t, clu.Run())
Expand Down Expand Up @@ -420,10 +428,12 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
"compactor",
"-target=compactor",
"-compactor.compaction-interval=1s",
"-validation.allow-structured-metadata=false",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
"-validation.allow-structured-metadata=false",
)
)
require.NoError(t, clu.Run())
Expand All @@ -434,11 +444,13 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-validation.allow-structured-metadata=false",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-validation.allow-structured-metadata=false",
)
)
require.NoError(t, clu.Run())
Expand All @@ -451,12 +463,14 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
"-validation.allow-structured-metadata=false",
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-validation.allow-structured-metadata=false",
)
)
require.NoError(t, clu.Run())
Expand All @@ -472,12 +486,12 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLine("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineA", time.Now().Add(-48*time.Hour), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineB", time.Now().Add(-36*time.Hour), nil, map[string]string{"job": "fake"}))

// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLine("lineC", now, map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", now, map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", now, nil, map[string]string{"job": "fake"}))

})

Expand Down Expand Up @@ -527,7 +541,9 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
}

func TestSchedulerRing(t *testing.T) {
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
Expand Down Expand Up @@ -645,7 +661,7 @@ func TestSchedulerRing(t *testing.T) {
}

func TestOTLPLogsIngestQuery(t *testing.T) {
clu := cluster.New(nil, func(c *cluster.Cluster) {
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
Expand Down
4 changes: 3 additions & 1 deletion integration/loki_rule_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func TestRemoteRuleEval(t *testing.T) {
// In this test we stub out a remote-write receiver and check that the expected data is sent to it.
// Both the local and the remote rule evaluation modes should produce the same result.
func testRuleEval(t *testing.T, mode string) {
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
t.Cleanup(func() {
assert.NoError(t, clu.Cleanup())
})
Expand Down
4 changes: 3 additions & 1 deletion integration/loki_simple_scalable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
)

func TestSimpleScalable_IngestQuery(t *testing.T) {
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
Expand Down
4 changes: 3 additions & 1 deletion integration/loki_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
)

func TestSingleBinaryIngestQuery(t *testing.T) {
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
Expand Down
4 changes: 3 additions & 1 deletion integration/per_request_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
)

func TestPerRequestLimits(t *testing.T) {
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func TestCrossComponentValidation(t *testing.T) {
},
} {
tc.base.RegisterFlags(flag.NewFlagSet(tc.desc, 0))
// This test predates the newer schema required for structured metadata
tc.base.LimitsConfig.AllowStructuredMetadata = false
err := tc.base.Validate()
if tc.err {
require.NotNil(t, err)
Expand Down
24 changes: 24 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loki
import (
"bytes"
"context"
stdlib_errors "errors"
"flag"
"fmt"
"net/http"
Expand Down Expand Up @@ -273,6 +274,29 @@ func (c *Config) Validate() error {
}
}

var errs []error

// Schema version 13 is required to use structured metadata
p := config.ActivePeriodConfig(c.SchemaConfig.Configs)
version, err := c.SchemaConfig.Configs[p].VersionAsInt()
if err != nil {
return err
}
if c.LimitsConfig.AllowStructuredMetadata && version < 13 {
errs = append(errs, fmt.Errorf("CONFIG ERROR: schema v13 is required to store Structured Metadata and use native OTLP ingestion, your schema version is %s. Set `allow_structured_metadata: false` in the `limits_config` section or set the command line argument `-validation.allow-structured-metadata=false` and restart Loki. Then proceed to update to schema v13 or newer before re-enabling this config, search for 'Storage Schema' in the docs for the schema update procedure", c.SchemaConfig.Configs[p].Schema))
}
// TSDB index is required to use structured metadata
if c.LimitsConfig.AllowStructuredMetadata && c.SchemaConfig.Configs[p].IndexType != config.TSDBType {
errs = append(errs, fmt.Errorf("CONFIG ERROR: `tsdb` index type is required to store Structured Metadata and use native OTLP ingestion, your index type is `%s` (defined in the `store` parameter of the schema_config). Set `allow_structured_metadata: false` in the `limits_config` section or set the command line argument `-validation.allow-structured-metadata=false` and restart Loki. Then proceed to update the schema to use index type `tsdb` before re-enabling this config, search for 'Storage Schema' in the docs for the schema update procedure", c.SchemaConfig.Configs[p].IndexType))
}

if len(errs) > 1 {
errs = append([]error{fmt.Errorf("MULTIPLE CONFIG ERRORS FOUND, PLEASE READ CAREFULLY")}, errs...)
return stdlib_errors.Join(errs...)
} else if len(errs) == 1 {
return errs[0]
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type PeriodConfig struct {
IndexType string `yaml:"store" doc:"description=store and object_store below affect which <storage_config> key is used. Which index to use. Either tsdb or boltdb-shipper. Following stores are deprecated: aws, aws-dynamo, gcp, gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc."`
// type of object client to use.
ObjectType string `yaml:"object_store" doc:"description=Which store to use for the chunks. Either aws (alias s3), azure, gcs, alibabacloud, bos, cos, swift, filesystem, or a named_store (refer to named_stores_config). Following stores are deprecated: aws-dynamo, gcp, gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc."`
Schema string `yaml:"schema" doc:"description=The schema version to use, current recommended schema is v12."`
Schema string `yaml:"schema" doc:"description=The schema version to use, current recommended schema is v13."`
IndexTables IndexPeriodicTableConfig `yaml:"index" doc:"description=Configures how the index is updated and stored."`
ChunkTables PeriodicTableConfig `yaml:"chunks" doc:"description=Configured how the chunks are updated and stored."`
RowShards uint32 `yaml:"row_shards" doc:"default=16|description=How many shards will be created. Only used if schema is v10 or greater."`
Expand Down
2 changes: 1 addition & 1 deletion pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint")

f.BoolVar(&l.AllowStructuredMetadata, "validation.allow-structured-metadata", false, "Allow user to send structured metadata (non-indexed labels) in push payload.")
f.BoolVar(&l.AllowStructuredMetadata, "validation.allow-structured-metadata", true, "Allow user to send structured metadata (non-indexed labels) in push payload.")
_ = l.MaxStructuredMetadataSize.Set(defaultMaxStructuredMetadataSize)
f.Var(&l.MaxStructuredMetadataSize, "limits.max-structured-metadata-size", "Maximum size accepted for structured metadata per entry. Default: 64 kb. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.")
f.IntVar(&l.MaxStructuredMetadataEntriesCount, "limits.max-structured-metadata-entries-count", defaultMaxStructuredMetadataCount, "Maximum number of structured metadata entries per log line. Default: 128. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.")
Expand Down

0 comments on commit bc04a85

Please sign in to comment.