Skip to content

Commit

Permalink
Merge branch 'main' into fix-ipv6-port-join
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis authored Dec 6, 2023
2 parents acd2393 + a0b462d commit 38ed511
Show file tree
Hide file tree
Showing 18 changed files with 1,002 additions and 20 deletions.
20 changes: 20 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,21 @@ client:
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]
results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# bloom-gateway-client.cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -bloom-gateway-client.cache.compression
[compression: <string> | default = ""]
# Flag to control whether to cache bloom gateway client requests/responses.
# CLI flag: -bloom-gateway-client.cache_results
[cache_results: <boolean> | default = false]
# Number of workers to use for filtering chunks concurrently.
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
Expand Down Expand Up @@ -3028,6 +3043,10 @@ shard_streams:
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]

# Interval for computing the cache key in the Bloom Gateway.
# CLI flag: -bloom-gateway.cache-key-interval
[bloom_gateway_cache_key_interval: <duration> | default = 15m]

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
Expand Down Expand Up @@ -4233,6 +4252,7 @@ The TLS configuration.

The cache block configures the cache backend. The supported CLI flags `<prefix>` used to reference this configuration block are:

- `bloom-gateway-client.cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.volume-results-cache`
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/query/template_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ Examples:

```template
{{ default "-" "" }} // output: -
{{ default "" "foo" }} // output: foo
{{ default "-" "foo" }} // output: foo
```

Example of a query to print a `-` if the `http_request_headers_x_forwarded_for` label is empty:
Expand Down
13 changes: 9 additions & 4 deletions docs/sources/send-data/promtail/cloud/ecs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,26 @@ The `log_router` container image is the [Fluent bit Loki docker image][fluentbit
"logConfiguration": {
"logDriver": "awsfirelens",
"options": {
"Name": "grafana-loki",
"Url": "https://<userid>:<grafancloud apikey>@<grafanacloud host>/loki/api/v1/push",
"Name": "loki",
"Host": "<grafanacloud host>",
"Http_User": "<userid>",
"Labels": "{job=\"firelens\"}",
"RemoveKeys": "container_id,ecs_task_arn",
"LabelKeys": "container_name,ecs_task_definition,source,ecs_cluster",
"LineFormat": "key_value"
}
},
"secretOptions": [{
"name": "Http_Passwd",
"valueFrom": "data.aws_secretsmanager_secret.grafana_cloud_loki_http_password.id"
}]
},
"name": "sample-app"
}
```

The second container is our `sample-app`, a simple [alpine][alpine] container that prints to stdout welcoming messages. To send those logs to Loki, we will configure this container to use the log driver `awsfirelens`.

Go ahead and replace the `Url` property with your [GrafanaCloud][GrafanaCloud] credentials, you can find them in your [account][grafanacloud account] in the Loki instance page. If you're running your own Loki instance replace completely the URL (e.g `http://my-loki.com:3100/loki/api/v1/push`).
Go ahead and replace the `Host` and `HTTP_User` property with your [GrafanaCloud][GrafanaCloud] credentials, you can find them in your [account][grafanacloud account] in the Loki instance page. If you're running your own Loki instance replace completely the URL (for example, `http://my-loki.com:3100/loki/api/v1/push`).

We include plain text credentials in `options` for simplicity. However, this exposes credentials in your ECS task definition and in any version-controlled configuration. Mitigate this issue by using a secret store such as [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html), combined with the `secretOptions` configuration option for [injecting sensitive data in a log configuration](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/specifying-sensitive-data-secrets.html#secrets-logconfig).

Expand Down
217 changes: 217 additions & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package bloomgateway

import (
"context"
"flag"
"sort"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
)

const (
cacheParalellism = 1
)

type CacheConfig struct {
resultscache.Config `yaml:",inline"`
}

// RegisterFlags registers flags.
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("bloom-gateway-client.cache.", f)
}

func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(f, prefix)
}

type CacheLimits interface {
resultscache.Limits
BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}

type keyGen struct {
CacheLimits
}

func newCacheKeyGen(limits CacheLimits) keyGen {
return keyGen{limits}
}

func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string {
return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r)
}

type extractor struct{}

func newExtractor() extractor {
return extractor{}
}

// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds.
// We remove chunks that are not within the given time range.
func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64) resultscache.Response {
res := r.(*logproto.FilterChunkRefResponse)

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(res.ChunkRefs))
for _, chunkRef := range res.ChunkRefs {
refs := make([]*logproto.ShortRef, 0, len(chunkRef.Refs))
for _, ref := range chunkRef.Refs {
if model.Time(end) < ref.From || ref.Through <= model.Time(start) {
continue
}
refs = append(refs, ref)
}
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.Tenant,
Refs: refs,
})
}
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: chunkRefs,
}
}

type merger struct{}

func newMerger() merger {
return merger{}
}

// MergeResponse merges responses from multiple requests into a single Response
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
size += len(res.ChunkRefs)
}

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs = append(chunkRefs, res.ChunkRefs...)
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
}, nil
}

// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
if len(chunkRefs) <= 1 {
return chunkRefs
}

sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})

var lastDiffFP int
for i := 1; i < len(chunkRefs); i++ {
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
} else {
lastDiffFP++
chunkRefs[lastDiffFP] = chunkRefs[i]
}
}
return chunkRefs[:lastDiffFP+1]
}

// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
if len(refs) <= 1 {
return refs
}

sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
return a.Checksum == b.Checksum
})
}

type ClientCache struct {
cache *resultscache.ResultsCache
limits CacheLimits
logger log.Logger
}

func NewBloomGatewayClientCacheMiddleware(
logger log.Logger,
next logproto.BloomGatewayClient,
c cache.Cache,
limits CacheLimits,
cacheGen resultscache.CacheGenNumberLoader,
retentionEnabled bool,
) *ClientCache {
nextAsHandler := resultscache.HandlerFunc(func(ctx context.Context, cacheReq resultscache.Request) (resultscache.Response, error) {
req := cacheReq.(requestWithGrpcCallOptions)
return next.FilterChunkRefs(ctx, req.FilterChunkRefRequest, req.grpcCallOptions...)
})

resultsCache := resultscache.NewResultsCache(
logger,
c,
nextAsHandler,
newCacheKeyGen(limits),
limits,
newMerger(),
newExtractor(),
nil,
nil,
func(_ context.Context, _ []string, _ resultscache.Request) int {
return cacheParalellism
},
cacheGen,
retentionEnabled,
)

return &ClientCache{
cache: resultsCache,
limits: limits,
logger: logger,
}
}

func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
grpcCallOptions: opts,
}
res, err := c.cache.Do(ctx, cacheReq)
if err != nil {
return nil, err
}

return res.(*logproto.FilterChunkRefResponse), nil
}

type requestWithGrpcCallOptions struct {
*logproto.FilterChunkRefRequest
grpcCallOptions []grpc.CallOption
}

func (r requestWithGrpcCallOptions) WithStartEndForCache(start time.Time, end time.Time) resultscache.Request {
return requestWithGrpcCallOptions{
FilterChunkRefRequest: r.FilterChunkRefRequest.WithStartEndForCache(start, end).(*logproto.FilterChunkRefRequest),
grpcCallOptions: r.grpcCallOptions,
}
}
Loading

0 comments on commit 38ed511

Please sign in to comment.