Skip to content

Commit

Permalink
Merge branch 'main' into index-stats-perf-improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Aug 27, 2024
2 parents 6ab0a22 + 1535183 commit a37cb92
Show file tree
Hide file tree
Showing 22 changed files with 329 additions and 85 deletions.
110 changes: 62 additions & 48 deletions docs/sources/operations/query-acceleration-blooms.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ through each log line in the chunks checking if the string `traceID=3c0e3dcd33e7

With accelerated filtering, Loki is able to skip most of the chunks and only process the ones where we have a
statistical confidence that the string might be present.
The underlying blooms are built by the new [Bloom Compactor](#bloom-compactor) component
The underlying blooms are built by the [Bloom Builder](#bloom-planner-and-builder) component
and served by the new [Bloom Gateway](#bloom-gateway) component.

## Enable Query Acceleration with Blooms
Expand All @@ -43,74 +43,87 @@ and querying the bloom filters that only pays off at large scale deployments.
{{< /admonition >}}

To start building and using blooms you need to:
- Deploy the [Bloom Compactor](#bloom-compactor) component and enable the component in the [Bloom Compactor config][compactor-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][gateway-cfg].
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Enable blooms filtering and compaction for each tenant individually, or for all of them by default.

```yaml
bloom_compactor:
# Configuration block for the bloom creation.
bloom_build:
enabled: true
planner:
planning_interval: 6h
builder:
planner_address: bloom-planner.<namespace>.svc.cluster.local.:9095

# Configuration block for bloom filtering.
bloom_gateway:
enabled: true
client:
addresses: dnssrvnoa+_bloom-gateway-grpc._tcp.bloom-gateway-headless.<namespace>.svc.cluster.local

# Enable blooms filtering and compaction for all tenants by default
# Enable blooms creation and filtering for all tenants by default
# or do it on a per-tenant basis.
limits_config:
bloom_creation_enabled: true
bloom_split_series_keyspace_by: 1024
bloom_gateway_enable_filtering: true
bloom_compactor_enable_compaction: true
```
For more configuration options refer to the [Bloom Gateways][gateway-cfg], [Bloom Compactor][compactor-cfg] and
For more configuration options refer to the [Bloom Gateway][bloom-gateway-cfg], [Bloom Build][bloom-build-cfg] and
[per tenant-limits][tenant-limits] configuration docs.
We strongly recommend reading the whole documentation for this experimental feature before using it.
## Bloom Compactor
The Bloom Compactor component builds blooms from the chunks in the object store.
The resulting blooms are grouped in bloom blocks spanning multiple streams (also known as series) and chunks from a given day.
## Bloom Planner and Builder
Building bloom filters from the chunks in the object storage is done by two components: the Bloom Planner and the Bloom
Builder, where the planner creates tasks for bloom building, and sends the tasks to the builders to process and
upload the resulting blocks.
Bloom filters are grouped in bloom blocks spanning multiple streams (also known as series) and chunks from a given day.
To learn more about how blocks and metadata files are organized, refer to the
[Building and querying blooms](#building-and-querying-blooms) section below.
Bloom Compactors are horizontally scalable and use a [ring] for sharding tenants and stream fingerprints,
as well as determining which compactor should apply [blooms retention](#retention).
Each compactor owns a configurable number of contiguous streams fingerprint ranges for a tenant.
The compactor builds blooms for all the chunks from the tenant streams whose fingerprint
falls within its owned key-space ranges.
The Bloom Planner runs as a single instance and calculates the gaps in fingerprint ranges for a certain time period for
a tenant for which bloom filters need to be built. It dispatches these tasks to the available builders.
The planner also applies the [blooms retention](#retention).
You can find all the configuration options for this component in the [Configure section for the Bloom Compactor][compactor-cfg].
The Bloom Builder is a stateless horizontally scalable component and can be scaled independently of the planner to fulfill
the processing demand of the created tasks.
You can find all the configuration options for these components in the [Configure section for the Bloom Builder][bloom-build-cfg].
Refer to the [Enable Query Acceleration with Blooms](#enable-query-acceleration-with-blooms) section below for
a configuration snippet enabling this feature.
### Retention
One Bloom Compactor from all those running will apply retention. Retention is disabled by default.
The instance owning the smallest token in the ring owns retention.
Retention is applied to all tenants. The retention for each tenant is the longest of its [configured][tenant-limits]
The Bloom Planner applies bloom block retention on object storage. Retention is disabled by default.
When enabled, retention is applied to all tenants. The retention for each tenant is the longest of its [configured][tenant-limits]
general retention (`retention_period`) and the streams retention (`retention_stream`).

For example, in the following example, tenant A has a bloom retention of 30 days,
and tenant B a bloom retention of 40 days.
For example, in the following example, tenant A has a bloom retention of 30 days, and tenant B a bloom retention of 40 days.

```yaml
overrides:
"A":
retention: 30d
retention_period: 30d
"B":
retention: 30d
retention_period: 30d
retention_stream:
- selector: '{namespace="prod"}'
priority: 1
period: 40d
```

### Sizing
Compactors build blocks concurrently. Concurrency is [configured][compactor-cfg] via `-bloom-compactor.worker-parallelism`.
Each worker will build bloom blocks from streams and chunks.
### Sizing and configuration
The single planner instance runs the planning phase for bloom blocks for each tenant in the given interval
and puts the created tasks to an internal task queue.
Builders process tasks sequentially by pulling them from the queue. The amount of builder replicas required to complete
all pending tasks before the next planning iteration depends on the value of `-bloom-build.planner.bloom_split_series_keyspace_by`,
the amount of tenants, and the log volume of the streams.

The maximum block size is configured per tenant via `-bloom-compactor.max-block-size`.
Note that the actual block size might exceed this limit given that we append streams blooms to the block until the
The actual block size might exceed this limit given that we append streams blooms to the block until the
block is larger than the configured maximum size. Blocks are created in memory and as soon as they are written to the
object store they are freed. Chunks and TSDB files are downloaded from the object store to the file system.
We estimate that compactors are able to process 4 MB worth of data per second per core.
We estimate that builders are able to process 4MB worth of data per second per core.

## Bloom Gateway
Bloom Gateways handle chunks filtering requests from the [index gateway](https://grafana.com/docs/loki/<LOKI_VERSION>/get-started/components/#index-gateway).
Expand All @@ -126,10 +139,10 @@ and even distribution of the stream fingerprints across Bloom Gateway instances.
You can find all the configuration options for this component in the Configure section for the [Bloom Gateways][gateway-cfg].
Refer to the [Enable Query Acceleration with Blooms](#enable-query-acceleration-with-blooms) section below for a configuration snippet enabling this feature.

### Sizing
Bloom Gateways use their local filesystem as a Least Recently Used (LRU) cache for blooms that are
### Sizing and configuration
Bloom Gateways use their local file system as a Least Recently Used (LRU) cache for blooms that are
downloaded from object storage. The size of the blooms depend on the ingest volume and the log content cardinality,
as well as on compaction settings of the blooms, namely n-gram length, skip-factor, and false-positive-rate.
as well as on build settings of the blooms, namely n-gram length, skip-factor, and false-positive-rate.
With default settings, bloom filters make up roughly 3% of the chunk data.

Example calculation for storage requirements of blooms for a single tenant.
Expand Down Expand Up @@ -168,20 +181,22 @@ Bloom filters are built per stream and aggregated together into block files.
Streams are assigned to blocks by their fingerprint, following the same ordering scheme as Loki’s TSDB and sharding calculation.
This gives a data locality benefit when querying as streams in the same shard are likely to be in the same block.
In addition to blocks, compactors maintain a list of metadata files containing references to bloom blocks and the
TSDB index files they were built from. They also contain tombstones for old blocks which are outdated and
can be deleted in future iterations. Gateways and compactors use these metadata files to discover existing blocks.
In addition to blocks, builders maintain a list of metadata files containing references to bloom blocks and the
TSDB index files they were built from. Gateways and the planner use these metadata files to discover existing blocks.
Every `-bloom-compactor.compaction-interval`, compactors will load the latest TSDB files for all tenants for
which bloom compaction is enabled, and compare the TSDB files with the latest bloom metadata files.
If there are new TSDB files or any of them have changed, the compactor will process all the streams and chunks pointed
by the TSDB file. In case of changes for a previously processed TSDB file,
compactors will try to reuse blooms from existing blocks instead of building new ones from scratch.
Every `-bloom-build.planner.interval`, the planner will load the latest TSDB files for all tenants for
which bloom building is enabled, and compares the TSDB files with the latest bloom metadata files.
If there are new TSDB files or any of them have changed, the planner will create a task for the streams and chunks
referenced by the TSDB file.
For a given stream, the compactor owning that stream will iterate through all the log lines inside its new
chunks and build a bloom for the stream. For each log line, we compute its [n-grams](https://en.wikipedia.org/wiki/N-gram#:~:text=An%20n%2Dgram%20is%20a,pairs%20extracted%20from%20a%20genome.)
and append to the bloom both the hash for each n-gram and the hash for each n-gram plus the chunk identifier.
The former allows gateways to skip whole streams while the latter is for skipping individual chunks.
The builder pulls a task from the planner's queue and processes the containing streams and chunks.
For a given stream, the builder will iterate through all the log lines inside its new chunks and build a bloom for the
stream. In case of changes for a previously processed TSDB file, builders will try to reuse blooms from existing blocks
instead of building new ones from scratch.
The builder computes [n-grams](https://en.wikipedia.org/wiki/N-gram#:~:text=An%20n%2Dgram%20is%20a,pairs%20extracted%20from%20a%20genome.)
for each log line of each chunk of a stream and appends both the hash of each n-gram and the hash of each n-gram plus
the chunk identifier to the bloom. The former allows gateways to skip whole streams while the latter is for skipping
individual chunks.
For example, given a log line `abcdef` in the chunk `c6dj8g`, we compute its n-grams: `abc`, `bcd`, `cde`, `def`.
And append to the stream bloom the following hashes: `hash("abc")`, `hash("abc" + "c6dj8g")` ... `hash("def")`, `hash("def" + "c6dj8g")`.
Expand Down Expand Up @@ -209,8 +224,8 @@ Loki will check blooms for any log filtering expression within a query that sati
the first filter (`|= "level=error"`) will benefit from blooms but the second one (`|= "traceID=3ksn8d4jj3"`) will not.
## Query sharding
Query acceleration does not just happen while processing chunks,
but also happens from the query planning phase where the query frontend applies [query sharding](https://lokidex.com/posts/tsdb/#sharding).
Query acceleration does not just happen while processing chunks, but also happens from the query planning phase where
the query frontend applies [query sharding](https://lokidex.com/posts/tsdb/#sharding).
Loki 3.0 introduces a new {per-tenant configuration][tenant-limits] flag `tsdb_sharding_strategy` which defaults to computing
shards as in previous versions of Loki by using the index stats to come up with the closest power of two that would
optimistically divide the data to process in shards of roughly the same size. Unfortunately,
Expand All @@ -221,9 +236,8 @@ Query acceleration introduces a new sharding strategy: `bounded`, which uses blo
processed right away during the planning phase in the query frontend,
as well as evenly distributes the amount of chunks each sharded query will need to process.
[ring]: https://grafana.com/docs/loki/<LOKI_VERSION>/get-started/hash-rings/
[tenant-limits]: https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#limits_config
[gateway-cfg]: https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#bloom_gateway
[compactor-cfg]: https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#bloom_compactor
[bloom-gateway-cfg]: https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#bloom_gateway
[bloom-build-cfg]: https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#bloom_build
[microservices]: https://grafana.com/docs/loki/<LOKI_VERSION>/get-started/deployment-modes/#microservices-mode
[ssd]: https://grafana.com/docs/loki/<LOKI_VERSION>/get-started/deployment-modes/#simple-scalable
11 changes: 11 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4057,6 +4057,17 @@ otlp_config:
# Configuration for log attributes to store them as Structured Metadata or
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until
[block_ingestion_until: <time> | default = 0]

# HTTP status code to return when ingestion is blocked. If 200, the ingestion
# will be blocked without returning an error to the client. By Default, a custom
# status code (260) is returned to the client along with an error message.
# CLI flag: -limits.block-ingestion-status-code
[block_ingestion_status_code: <int> | default = 260]
```
### local_storage_config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/google/renameio/v2 v2.0.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.0
github.com/gorilla/websocket v1.5.3
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0
github.com/grafana/go-gelf/v2 v2.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 h1:iMShjkEYATnBMbEa2wV4QiK5PU2trw24FOCON3v7+K4=
Expand Down
17 changes: 17 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,23 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

now := time.Now()

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
validation.DiscardedSamples.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineSize))

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}

return nil, httpgrpc.Errorf(retStatusCode, err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
Expand Down
54 changes: 54 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,60 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushIngestionBlocked(t *testing.T) {
for _, tc := range []struct {
name string
blockUntil time.Time
blockStatusCode int
expectError bool
expectedStatusCode int
}{
{
name: "not configured",
expectedStatusCode: http.StatusOK,
},
{
name: "not blocked",
blockUntil: time.Now().Add(-1 * time.Hour),
expectedStatusCode: http.StatusOK,
},
{
name: "blocked",
blockUntil: time.Now().Add(1 * time.Hour),
blockStatusCode: 456,
expectError: true,
expectedStatusCode: 456,
},
{
name: "blocked with status code 200",
blockUntil: time.Now().Add(1 * time.Hour),
blockStatusCode: http.StatusOK,
expectError: false,
expectedStatusCode: http.StatusOK,
},
} {
t.Run(tc.name, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.BlockIngestionUntil = flagext.Time(tc.blockUntil)
limits.BlockIngestionStatusCode = tc.blockStatusCode

distributors, _ := prepare(t, 1, 5, limits, nil)
request := makeWriteRequest(1, 1024)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation.Limits, factory func(addr string) (ring_client.PoolClient, error)) ([]*Distributor, []mockIngester) {
t.Helper()

Expand Down
3 changes: 3 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ type Limits interface {
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
}
Loading

0 comments on commit a37cb92

Please sign in to comment.