diff --git a/.github/workflows/snyk-pr-comment.yml b/.github/workflows/snyk-pr-comment.yml
index 9eb86f069fc0c..c54e9c55c3b58 100644
--- a/.github/workflows/snyk-pr-comment.yml
+++ b/.github/workflows/snyk-pr-comment.yml
@@ -36,7 +36,11 @@ jobs:
continue-on-error: true
id: should-comment
run: |
- if [[ $(wc -l < snyk.txt) -gt 1 ]]; then exit 0; fi
+ if [[ $(wc -l < snyk.txt) -gt 1 ]]; then
+ echo "\nTo see more details on these vulnerabilities, and how/where to fix them, please run `make scan-vulnerabilities` on your branch. If these were not introduced by your PR, please considering fixing them in `main` via a subsequent PR. Thanks!" >> snyk.txt
+ exit 0;
+ fi
+
exit 1
- name: Comment on PR with Snyk scan results
diff --git a/.github/workflows/trivy-pr-comment.yml b/.github/workflows/trivy-pr-comment.yml
index c57264a790bcd..ca69cb1b3ba7d 100644
--- a/.github/workflows/trivy-pr-comment.yml
+++ b/.github/workflows/trivy-pr-comment.yml
@@ -19,12 +19,21 @@ jobs:
make loki-image
echo "IMAGE_TAG=${IMAGE_TAG}" >> $GITHUB_ENV
- - name: Run Trivy vulnerability scanner
+ - name: Run Trivy image scanner
uses: aquasecurity/trivy-action@master
with:
image-ref: "docker.io/grafana/loki:${{ env.IMAGE_TAG }}"
format: "json"
- output: "trivy.json"
+ output: "trivy-image.json"
+ severity: "CRITICAL,HIGH"
+
+ - name: Run Trivy fs scanner
+ uses: aquasecurity/trivy-action@master
+ with:
+ scan-type: "fs"
+ scan-ref: "go.mod"
+ format: "json"
+ output: "trivy-fs.json"
severity: "CRITICAL,HIGH"
- name: Prepare Trivy Message
@@ -35,13 +44,19 @@ jobs:
uses: sergeysova/jq-action@v2
continue-on-error: true
with:
- cmd: jq -r '.Results[] | select(.Vulnerabilities != null) | .Vulnerabilities[] | "* **\(.Severity)** [\(.Title)](\(.PrimaryURL)) in `\(.PkgName)` v\(.InstalledVersion). Fixed in v\(.FixedVersion)"' trivy.json >> trivy.txt
+ cmd: |
+ jq -r '.Results[] | select(.Vulnerabilities != null) | .Target as $target | .Type as $type | .Vulnerabilities[] | "* **\(.Severity)**, Target: \($target), Type: \($type) [\(.Title)](\(.PrimaryURL)) in `\(.PkgName)` v\(.InstalledVersion). Fixed in v\(.FixedVersion)"' trivy-image.json >> trivy.txt
+ jq -r '.Results[] | select(.Vulnerabilities != null) | .Target as $target | .Type as $type | .Vulnerabilities[] | "* **\(.Severity)**, Target: \($target), Type: \($type) [\(.Title)](\(.PrimaryURL)) in `\(.PkgName)` v\(.InstalledVersion). Fixed in v\(.FixedVersion)"' trivy-fs.json >> trivy.text
- name: Determine whether to comment
continue-on-error: true
id: should-comment
run: |
- if [[ $(wc -l < trivy.txt) -gt 1 ]]; then exit 0; fi
+ if [[ $(wc -l < trivy.txt) -gt 1 ]]; then
+ echo "\nTo see more details on these vulnerabilities, and how/where to fix them, please run `make scan-vulnerabilities` on your branch. If these were not introduced by your PR, please considering fixing them in `main` via a subsequent PR. Thanks!" >> trivy.txt
+ exit 0;
+ fi
+
exit 1
- name: Comment on PR with Trivy scan results
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bc5d00fa5b907..3ee05edfb42b2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@
##### Enhancements
+* [11363](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown
* [11363](https://github.com/grafana/loki/pull/11363) **kavirajk**: bugfix(memcached): Make memcached batch fetch truely context aware.
* [11319](https://github.com/grafana/loki/pull/11319) **someStrangerFromTheAbyss**: Helm: Add extraContainers to the write pods.
* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging.
@@ -48,11 +49,13 @@
##### Changes
+* [11490](https://github.com/grafana/loki/pull/11490) **andresperezl**: Helm: Use `/ingester/shutdown` for `preStop` hook in write pods.
* [10366](https://github.com/grafana/loki/pull/10366) **shantanualsi** Upgrade thanos objstore, dskit and other modules
* [10451](https://github.com/grafana/loki/pull/10451) **shantanualsi** Upgrade thanos `objstore`
* [10814](https://github.com/grafana/loki/pull/10814) **shantanualsi,kaviraj** Upgrade prometheus to v0.47.1 and dskit
* [10959](https://github.com/grafana/loki/pull/10959) **slim-bean** introduce a backoff wait on subquery retries.
* [11121](https://github.com/grafana/loki/pull/11121) **periklis** Ensure all lifecycler cfgs ref a valid IPv6 addr and port combination
+* [10650](https://github.com/grafana/loki/pull/10650) **matthewpi** Ensure the frontend uses a valid IPv6 addr and port combination
#### Promtail
diff --git a/Makefile b/Makefile
index 0b73cfee6d410..23a22d3e55bce 100644
--- a/Makefile
+++ b/Makefile
@@ -838,6 +838,7 @@ dev-k3d-down:
.PHONY: trivy
trivy: loki-image
trivy i $(IMAGE_PREFIX)/loki:$(IMAGE_TAG)
+ trivy fs go.mod
# Synk is also used to scan for vulnerabilities, and detects things that trivy might miss
.PHONY: snyk
diff --git a/clients/pkg/promtail/discovery/consulagent/consul.go b/clients/pkg/promtail/discovery/consulagent/consul.go
index 5395e5153bd62..f38bc6e3dfe08 100644
--- a/clients/pkg/promtail/discovery/consulagent/consul.go
+++ b/clients/pkg/promtail/discovery/consulagent/consul.go
@@ -8,7 +8,6 @@ package consulagent
import (
"context"
"encoding/json"
- "fmt"
"net"
"net/http"
"strconv"
@@ -527,9 +526,9 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
// since the service may be registered remotely through a different node.
var addr string
if srvCheck.Service.Address != "" {
- addr = net.JoinHostPort(srvCheck.Service.Address, fmt.Sprintf("%d", srvCheck.Service.Port))
+ addr = net.JoinHostPort(srvCheck.Service.Address, strconv.Itoa(srvCheck.Service.Port))
} else {
- addr = net.JoinHostPort(member.Addr, fmt.Sprintf("%d", srvCheck.Service.Port))
+ addr = net.JoinHostPort(member.Addr, strconv.Itoa(srvCheck.Service.Port))
}
labels := model.LabelSet{
@@ -560,7 +559,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
// Add all key/value pairs from the service's tagged addresses as their own labels.
for k, v := range srvCheck.Service.TaggedAddresses {
name := strutil.SanitizeLabelName(k)
- address := fmt.Sprintf("%s:%d", v.Address, v.Port)
+ address := net.JoinHostPort(v.Address, strconv.Itoa(v.Port))
labels[taggedAddressesLabel+model.LabelName(name)] = model.LabelValue(address)
}
diff --git a/docs/sources/get-started/quick-start.md b/docs/sources/get-started/quick-start.md
index be9c2e1fe3e3a..3fd404d18dbf2 100644
--- a/docs/sources/get-started/quick-start.md
+++ b/docs/sources/get-started/quick-start.md
@@ -19,7 +19,7 @@ The Docker Compose configuration instantiates the following components, each in
- **Minio** an S3-compatible object store which Loki uses to store its index and chunks.
- **Grafana** which provides visualization of the log lines captured within Loki.
-![Getting started sample application](get-started-flog.png)
+{{< figure max-width="75%" src="/media/docs/loki/get-started-flog.png" caption="Getting started sample application" alt="Getting started sample application">}}
## Installing Loki and collecting sample logs
@@ -80,7 +80,7 @@ Once you have collected logs, you will want to view them. You can view your log
The test environment includes [Grafana](https://grafana.com/docs/grafana/latest/), which you can use to query and observe the sample logs generated by the flog application. You can access the Grafana cluster by navigating to [http://localhost:3000](http://localhost:3000). The Grafana instance provided with this demo has a Loki [datasource](https://grafana.com/docs/grafana/latest/datasources/loki/) already configured.
-![Grafana Explore](grafana-query-builder.png)
+{{< figure src="/media/docs/loki/grafana-query-builder.png" caption="Query builder in Grafana Explore" alt="Grafana Explore">}}
1. From the Grafana main menu, click the **Explore** icon (1) to launch the Explore tab. To learn more about Explore, refer the [Explore](https://grafana.com/docs/grafana/latest/explore/) documentation.
diff --git a/docs/sources/reference/api.md b/docs/sources/reference/api.md
index fabac533b32bd..666cc43f91c2b 100644
--- a/docs/sources/reference/api.md
+++ b/docs/sources/reference/api.md
@@ -988,7 +988,7 @@ This API endpoint is usually used by Kubernetes-specific scale down automations
## Flush in-memory chunks and shut down
```
-POST /ingester/shutdown
+GET, POST /ingester/shutdown
```
`/ingester/shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md
index e650d0fca2fc2..631aadacac029 100644
--- a/docs/sources/setup/install/helm/reference.md
+++ b/docs/sources/setup/install/helm/reference.md
@@ -2252,6 +2252,7 @@ null
},
"s3": {
"accessKeyId": null,
+ "backoff_config": {},
"endpoint": null,
"http_config": {},
"insecure": false,
@@ -2285,6 +2286,15 @@ null
"type": "s3"
}
+
+
+
+ loki.storage.s3.backoff_config |
+ object |
+ Check https://grafana.com/docs/loki/latest/configure/#s3_storage_config for more info on how to provide a backoff_config |
+
+{}
+
|
diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go
index 5111223eaf71f..a4d03ed10a673 100644
--- a/integration/loki_micro_services_test.go
+++ b/integration/loki_micro_services_test.go
@@ -146,11 +146,16 @@ func TestMicroServicesIngestQuery(t *testing.T) {
assert.ElementsMatch(t, []map[string]string{{"job": "fake"}}, resp)
})
- t.Run("stats error", func(t *testing.T) {
+ t.Run("series error", func(t *testing.T) {
_, err := cliQueryFrontend.Series(context.Background(), `{job="fake"}|= "search"`)
require.ErrorContains(t, err, "status code 400: only label matchers are supported")
})
+ t.Run("stats error", func(t *testing.T) {
+ _, err := cliQueryFrontend.Stats(context.Background(), `{job="fake"}|= "search"`)
+ require.ErrorContains(t, err, "status code 400: only label matchers are supported")
+ })
+
t.Run("per-request-limits", func(t *testing.T) {
queryLimitsPolicy := client.InjectHeadersOption(map[string][]string{querylimits.HTTPHeaderQueryLimitsKey: {`{"maxQueryLength": "1m"}`}})
cliQueryFrontendLimited := client.New(tenantID, "", tQueryFrontend.HTTPURL(), queryLimitsPolicy)
diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go
index 13fc0d9bf3e3e..bf7fa50087b46 100644
--- a/pkg/bloomcompactor/bloomcompactor.go
+++ b/pkg/bloomcompactor/bloomcompactor.go
@@ -42,6 +42,10 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
+ "path/filepath"
+
+ "github.com/google/uuid"
+
"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
@@ -489,9 +493,23 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return level.Error(logger).Log("msg", "failed to compact new chunks", "err", err)
}
+ archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
+
+ blockToUpload, err := c.compressBloomBlock(storedBlock, archivePath, localDst, logger)
+ if err != nil {
+ level.Error(logger).Log("msg", "putting blocks to storage", "err", err)
+ return err
+ }
+ defer func() {
+ err = os.Remove(archivePath)
+ if err != nil {
+ level.Error(logger).Log("msg", "removing archive file", "err", err, "file", archivePath)
+ }
+ }()
+
// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
- storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{storedBlock})
+ storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
if err != nil {
level.Error(logger).Log("msg", "putting blocks to storage", "err", err)
return err
@@ -539,3 +557,21 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
return nil
}
+
+func (c *Compactor) compressBloomBlock(storedBlock bloomshipper.Block, archivePath, localDst string, logger log.Logger) (bloomshipper.Block, error) {
+ blockToUpload := bloomshipper.Block{}
+ archiveFile, err := os.Create(archivePath)
+ if err != nil {
+ return blockToUpload, err
+ }
+
+ err = v1.TarGz(archiveFile, v1.NewDirectoryBlockReader(localDst))
+ if err != nil {
+ level.Error(logger).Log("msg", "creating bloom block archive file", "err", err)
+ return blockToUpload, err
+ }
+
+ blockToUpload.BlockRef = storedBlock.BlockRef
+ blockToUpload.Data = archiveFile
+ return blockToUpload, nil
+}
diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go
index 963b5cc4302b5..98dde0b915154 100644
--- a/pkg/distributor/distributor.go
+++ b/pkg/distributor/distributor.go
@@ -99,6 +99,7 @@ type Distributor struct {
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
+ tee Tee
rateStore RateStore
shardTracker *ShardTracker
@@ -136,6 +137,7 @@ func New(
overrides Limits,
registerer prometheus.Registerer,
metricsNamespace string,
+ tee Tee,
logger log.Logger,
) (*Distributor, error) {
factory := cfg.factory
@@ -182,6 +184,7 @@ func New(
shardTracker: NewShardTracker(),
healthyInstancesCount: atomic.NewUint32(0),
rateLimitStrat: rateLimitStrat,
+ tee: tee,
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_ingester_appends_total",
@@ -272,9 +275,14 @@ func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}
+type KeyedStream struct {
+ HashKey uint32
+ Stream logproto.Stream
+}
+
// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
- stream logproto.Stream
+ KeyedStream
minSuccess int
maxFailures int
succeeded atomic.Int32
@@ -305,8 +313,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
- streams := make([]streamTracker, 0, len(req.Streams))
- keys := make([]uint32, 0, len(req.Streams))
+ streams := make([]KeyedStream, 0, len(req.Streams))
validatedLineSize := 0
validatedLineCount := 0
@@ -379,12 +386,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
if shardStreamsCfg.Enabled {
- derivedKeys, derivedStreams := d.shardStream(stream, pushSize, tenantID)
- keys = append(keys, derivedKeys...)
- streams = append(streams, derivedStreams...)
+ streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
} else {
- keys = append(keys, lokiring.TokenFor(tenantID, stream.Labels))
- streams = append(streams, streamTracker{stream: stream})
+ streams = append(streams, KeyedStream{
+ HashKey: lokiring.TokenFor(tenantID, stream.Labels),
+ Stream: stream,
+ })
}
}
}()
@@ -410,9 +417,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}
+ // Nil check for performance reasons, to avoid dynamic lookup and/or no-op
+ // function calls that cannot be inlined.
+ if d.tee != nil {
+ d.tee.Duplicate(streams)
+ }
+
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc
+ streamTrackers := make([]streamTracker, len(streams))
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
@@ -425,16 +439,19 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}()
}
- for i, key := range keys {
- replicationSet, err := d.ingestersRing.Get(key, ring.WriteNoExtend, descs[:0], nil, nil)
+ for i, stream := range streams {
+ replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil)
if err != nil {
return err
}
- streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
- streams[i].maxFailures = replicationSet.MaxErrors
+ streamTrackers[i] = streamTracker{
+ KeyedStream: stream,
+ minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors,
+ maxFailures: replicationSet.MaxErrors,
+ }
for _, ingester := range replicationSet.Instances {
- streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streams[i])
+ streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i])
ingesterDescs[ingester.Addr] = ingester
}
}
@@ -475,13 +492,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// streams and their associated keys for hashing to ingesters.
//
// The number of shards is limited by the number of entries.
-func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) ([]uint32, []streamTracker) {
+func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) []KeyedStream {
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
logger := log.With(util_log.WithUserID(tenantID, d.logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg)
if shardCount <= 1 {
- return []uint32{lokiring.TokenFor(tenantID, stream.Labels)}, []streamTracker{{stream: stream}}
+ return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), Stream: stream}}
}
d.streamShardCount.Inc()
@@ -492,31 +509,30 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream)
}
-func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg *shardstreams.Config, stream logproto.Stream) ([]uint32, []streamTracker) {
- derivedKeys, derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg)
+func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg *shardstreams.Config, stream logproto.Stream) []KeyedStream {
+ derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg)
for i := 0; i < len(stream.Entries); i++ {
streamIndex := i % len(derivedStreams)
- entries := append(derivedStreams[streamIndex].stream.Entries, stream.Entries[i])
- derivedStreams[streamIndex].stream.Entries = entries
+ entries := append(derivedStreams[streamIndex].Stream.Entries, stream.Entries[i])
+ derivedStreams[streamIndex].Stream.Entries = entries
}
- return derivedKeys, derivedStreams
+ return derivedStreams
}
-func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg *shardstreams.Config) ([]uint32, []streamTracker) {
+func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg *shardstreams.Config) []KeyedStream {
var (
streamLabels = labelTemplate(stream.Labels, d.logger)
streamPattern = streamLabels.String()
- derivedKeys = make([]uint32, 0, totalShards)
- derivedStreams = make([]streamTracker, 0, totalShards)
+ derivedStreams = make([]KeyedStream, 0, totalShards)
streamCount = streamCount(totalShards, stream)
)
if totalShards <= 0 {
level.Error(d.logger).Log("msg", "attempt to create shard with zeroed total shards", "org_id", tenantID, "stream", stream.Labels, "entries_len", len(stream.Entries))
- return derivedKeys, derivedStreams
+ return derivedStreams
}
entriesPerShard := int(math.Ceil(float64(len(stream.Entries)) / float64(totalShards)))
@@ -525,8 +541,10 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
shardNum := (startShard + i) % totalShards
shard := d.createShard(streamLabels, streamPattern, shardNum, entriesPerShard)
- derivedKeys = append(derivedKeys, lokiring.TokenFor(tenantID, shard.Labels))
- derivedStreams = append(derivedStreams, streamTracker{stream: shard})
+ derivedStreams = append(derivedStreams, KeyedStream{
+ HashKey: lokiring.TokenFor(tenantID, shard.Labels),
+ Stream: shard,
+ })
if shardStreamsCfg.LoggingEnabled {
level.Info(d.logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
@@ -534,7 +552,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
}
d.shardTracker.SetLastShardNum(tenantID, stream.Hash, startShard+streamCount)
- return derivedKeys, derivedStreams
+ return derivedStreams
}
func streamCount(totalShards int, stream logproto.Stream) int {
@@ -649,7 +667,7 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
- req.Streams[i] = s.stream
+ req.Streams[i] = s.Stream
}
_, err = c.(logproto.PusherClient).Push(ctx, req)
diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go
index e7899f7ea593c..5a03fe98e94cc 100644
--- a/pkg/distributor/distributor_test.go
+++ b/pkg/distributor/distributor_test.go
@@ -616,16 +616,16 @@ func TestStreamShard(t *testing.T) {
shardTracker: NewShardTracker(),
}
- _, derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake")
+ derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake")
require.Len(t, derivedStreams, tc.wantDerivedStreamSize)
for _, s := range derivedStreams {
// Generate sorted labels
- lbls, err := syntax.ParseLabels(s.stream.Labels)
+ lbls, err := syntax.ParseLabels(s.Stream.Labels)
require.NoError(t, err)
- require.Equal(t, lbls.Hash(), s.stream.Hash)
- require.Equal(t, lbls.String(), s.stream.Labels)
+ require.Equal(t, lbls.Hash(), s.Stream.Hash)
+ require.Equal(t, lbls.String(), s.Stream.Labels)
}
})
}
@@ -661,23 +661,23 @@ func TestStreamShardAcrossCalls(t *testing.T) {
shardTracker: NewShardTracker(),
}
- _, derivedStreams := d.shardStream(baseStream, streamRate, "fake")
+ derivedStreams := d.shardStream(baseStream, streamRate, "fake")
require.Len(t, derivedStreams, 2)
for i, s := range derivedStreams {
- require.Len(t, s.stream.Entries, 1)
- lbls, err := syntax.ParseLabels(s.stream.Labels)
+ require.Len(t, s.Stream.Entries, 1)
+ lbls, err := syntax.ParseLabels(s.Stream.Labels)
require.NoError(t, err)
require.Equal(t, lbls[0].Value, fmt.Sprint(i))
}
- _, derivedStreams = d.shardStream(baseStream, streamRate, "fake")
+ derivedStreams = d.shardStream(baseStream, streamRate, "fake")
require.Len(t, derivedStreams, 2)
for i, s := range derivedStreams {
- require.Len(t, s.stream.Entries, 1)
- lbls, err := syntax.ParseLabels(s.stream.Labels)
+ require.Len(t, s.Stream.Entries, 1)
+ lbls, err := syntax.ParseLabels(s.Stream.Labels)
require.NoError(t, err)
require.Equal(t, lbls[0].Value, fmt.Sprint(i+2))
@@ -1153,7 +1153,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)
- d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, log.NewNopLogger())
+ d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
@@ -1247,3 +1247,65 @@ type fakeRateStore struct {
func (s *fakeRateStore) RateFor(_ string, _ uint64) (int64, float64) {
return s.rate, s.pushRate
}
+
+type mockTee struct {
+ mu sync.Mutex
+ duplicated [][]KeyedStream
+}
+
+func (mt *mockTee) Duplicate(streams []KeyedStream) {
+ mt.mu.Lock()
+ defer mt.mu.Unlock()
+ mt.duplicated = append(mt.duplicated, streams)
+}
+
+func TestDistributorTee(t *testing.T) {
+ data := []*logproto.PushRequest{
+ {
+ Streams: []logproto.Stream{
+ {
+ Labels: "{job=\"foo\"}",
+ Entries: []logproto.Entry{
+ {Timestamp: time.Unix(123456, 0), Line: "line 1"},
+ {Timestamp: time.Unix(123457, 0), Line: "line 2"},
+ },
+ },
+ },
+ },
+ {
+ Streams: []logproto.Stream{
+ {
+ Labels: "{job=\"foo\"}",
+ Entries: []logproto.Entry{
+ {Timestamp: time.Unix(123458, 0), Line: "line 3"},
+ {Timestamp: time.Unix(123459, 0), Line: "line 4"},
+ },
+ },
+ {
+ Labels: "{job=\"bar\"}",
+ Entries: []logproto.Entry{
+ {Timestamp: time.Unix(123458, 0), Line: "line 5"},
+ {Timestamp: time.Unix(123459, 0), Line: "line 6"},
+ },
+ },
+ },
+ },
+ }
+
+ limits := &validation.Limits{}
+ flagext.DefaultValues(limits)
+ limits.RejectOldSamples = false
+ distributors, _ := prepare(t, 1, 3, limits, nil)
+
+ tee := mockTee{}
+ distributors[0].tee = &tee
+
+ for i, td := range data {
+ _, err := distributors[0].Push(ctx, td)
+ require.NoError(t, err)
+
+ for j, streams := range td.Streams {
+ assert.Equal(t, tee.duplicated[i][j].Stream.Entries, streams.Entries)
+ }
+ }
+}
diff --git a/pkg/distributor/tee.go b/pkg/distributor/tee.go
new file mode 100644
index 0000000000000..9ac48083956e1
--- /dev/null
+++ b/pkg/distributor/tee.go
@@ -0,0 +1,6 @@
+package distributor
+
+// Tee imlpementations can duplicate the log streams to another endpoint.
+type Tee interface {
+ Duplicate([]KeyedStream)
+}
diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go
index 8286b66cb12fd..2cf46d921ce94 100644
--- a/pkg/ingester/checkpoint_test.go
+++ b/pkg/ingester/checkpoint_test.go
@@ -452,7 +452,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
- inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil)
+ inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
@@ -499,7 +499,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := range instances {
- inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil)
+ inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil)
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go
index 29875c4847aad..f5215971ba39b 100644
--- a/pkg/ingester/ingester.go
+++ b/pkg/ingester/ingester.go
@@ -12,6 +12,8 @@ import (
"sync"
"time"
+ lokilog "github.com/grafana/loki/pkg/logql/log"
+
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
@@ -99,7 +101,10 @@ type Config struct {
WAL WALConfig `yaml:"wal,omitempty" doc:"description=The ingester WAL (Write Ahead Log) records incoming logs and stores them on the local file systems in order to guarantee persistence of acknowledged data in the event of a process crash."`
- ChunkFilterer chunk.RequestChunkFilterer `yaml:"-"`
+ ChunkFilterer chunk.RequestChunkFilterer `yaml:"-"`
+ PipelineWrapper lokilog.PipelineWrapper `yaml:"-"`
+ SampleExtractorWrapper lokilog.SampleExtractorWrapper `yaml:"-"`
+
// Optional wrapper that can be used to modify the behaviour of the ingester
Wrapper Wrapper `yaml:"-"`
@@ -227,7 +232,9 @@ type Ingester struct {
wal WAL
- chunkFilter chunk.RequestChunkFilterer
+ chunkFilter chunk.RequestChunkFilterer
+ extractorWrapper lokilog.SampleExtractorWrapper
+ pipelineWrapper lokilog.PipelineWrapper
streamRateCalculator *StreamRateCalculator
@@ -304,6 +311,14 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.SetChunkFilterer(i.cfg.ChunkFilterer)
}
+ if i.cfg.PipelineWrapper != nil {
+ i.SetPipelineWrapper(i.cfg.PipelineWrapper)
+ }
+
+ if i.cfg.SampleExtractorWrapper != nil {
+ i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
+ }
+
return i, nil
}
@@ -311,6 +326,14 @@ func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}
+func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
+ i.extractorWrapper = wrapper
+}
+
+func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
+ i.pipelineWrapper = wrapper
+}
+
// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
@@ -837,7 +860,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
- inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator, i.writeLogManager)
+ inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager)
if err != nil {
return nil, err
}
diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go
index 14306b01dc4af..b91502094bf24 100644
--- a/pkg/ingester/instance.go
+++ b/pkg/ingester/instance.go
@@ -10,6 +10,8 @@ import (
"syscall"
"time"
+ "github.com/grafana/loki/pkg/logql/log"
+
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/opentracing/opentracing-go"
@@ -109,6 +111,8 @@ type instance struct {
metrics *ingesterMetrics
chunkFilter chunk.RequestChunkFilterer
+ pipelineWrapper log.PipelineWrapper
+ extractorWrapper log.SampleExtractorWrapper
streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
@@ -126,6 +130,8 @@ func newInstance(
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
chunkFilter chunk.RequestChunkFilterer,
+ pipelineWrapper log.PipelineWrapper,
+ extractorWrapper log.SampleExtractorWrapper,
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
) (*instance, error) {
@@ -153,7 +159,9 @@ func newInstance(
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,
- chunkFilter: chunkFilter,
+ chunkFilter: chunkFilter,
+ pipelineWrapper: pipelineWrapper,
+ extractorWrapper: extractorWrapper,
streamRateCalculator: streamRateCalculator,
@@ -419,6 +427,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
return nil, err
}
+ if i.pipelineWrapper != nil {
+ pipeline = i.pipelineWrapper.Wrap(pipeline, expr.String())
+ }
+
stats := stats.FromContext(ctx)
var iters []iter.EntryIterator
@@ -464,6 +476,10 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
return nil, err
}
+ if i.extractorWrapper != nil {
+ extractor = i.extractorWrapper.Wrap(extractor, expr.String())
+ }
+
stats := stats.FromContext(ctx)
var iters []iter.SampleIterator
diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go
index 492f78404cb92..b9cbc835a80d9 100644
--- a/pkg/ingester/instance_test.go
+++ b/pkg/ingester/instance_test.go
@@ -10,6 +10,8 @@ import (
"testing"
"time"
+ "github.com/grafana/loki/pkg/logql/log"
+
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
@@ -65,7 +67,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
- i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
// avoid entries from the future.
@@ -93,7 +95,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
- inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
const (
@@ -145,7 +147,7 @@ func TestGetStreamRates(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
- inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.NoError(t, err)
const (
@@ -239,7 +241,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
- inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
lbls := makeRandomLabels()
@@ -284,7 +286,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards
- instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
currentTime := time.Now()
@@ -493,7 +495,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
- i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
for n := 0; n < b.N; n++ {
@@ -537,7 +539,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
- inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
expr, err := syntax.ParseLogSelector(`{namespace="foo",pod="bar",instance=~"10.*"}`, true)
require.NoError(b, err)
t, err := newTailer("foo", expr, nil, 10)
@@ -671,6 +673,172 @@ func Test_ChunkFilter(t *testing.T) {
}
}
+func Test_PipelineWrapper(t *testing.T) {
+ instance := defaultInstance(t)
+
+ wrapper := &testPipelineWrapper{
+ pipeline: newMockPipeline(),
+ }
+ instance.pipelineWrapper = wrapper
+
+ it, err := instance.Query(context.TODO(),
+ logql.SelectLogParams{
+ QueryRequest: &logproto.QueryRequest{
+ Selector: `{job="3"}`,
+ Limit: uint32(2),
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, 100000000),
+ Direction: logproto.BACKWARD,
+ Plan: &plan.QueryPlan{
+ AST: syntax.MustParseExpr(`{job="3"}`),
+ },
+ },
+ },
+ )
+ require.NoError(t, err)
+ defer it.Close()
+
+ for it.Next() {
+ // Consume the iterator
+ require.NoError(t, it.Error())
+ }
+
+ require.Equal(t, `{job="3"}`, wrapper.query)
+ require.Equal(t, 10, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
+}
+
+type testPipelineWrapper struct {
+ query string
+ pipeline *mockPipeline
+}
+
+func (t *testPipelineWrapper) Wrap(pipeline log.Pipeline, query string) log.Pipeline {
+ t.query = query
+ t.pipeline.wrappedExtractor = pipeline
+ return t.pipeline
+}
+
+func newMockPipeline() *mockPipeline {
+ return &mockPipeline{
+ sp: &mockStreamPipeline{},
+ }
+}
+
+type mockPipeline struct {
+ wrappedExtractor log.Pipeline
+ sp *mockStreamPipeline
+}
+
+func (p *mockPipeline) ForStream(l labels.Labels) log.StreamPipeline {
+ sp := p.wrappedExtractor.ForStream(l)
+ p.sp.wrappedSP = sp
+ return p.sp
+}
+
+func (p *mockPipeline) Reset() {}
+
+// A stub always returns the same data
+type mockStreamPipeline struct {
+ wrappedSP log.StreamPipeline
+ called int
+}
+
+func (p *mockStreamPipeline) BaseLabels() log.LabelsResult {
+ return p.wrappedSP.BaseLabels()
+}
+
+func (p *mockStreamPipeline) Process(ts int64, line []byte, lbs ...labels.Label) ([]byte, log.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.Process(ts, line, lbs...)
+}
+
+func (p *mockStreamPipeline) ProcessString(ts int64, line string, lbs ...labels.Label) (string, log.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.ProcessString(ts, line, lbs...)
+}
+
+func Test_ExtractorWrapper(t *testing.T) {
+ instance := defaultInstance(t)
+
+ wrapper := &testExtractorWrapper{
+ extractor: newMockExtractor(),
+ }
+ instance.extractorWrapper = wrapper
+
+ it, err := instance.QuerySample(context.TODO(),
+ logql.SelectSampleParams{
+ SampleQueryRequest: &logproto.SampleQueryRequest{
+ Selector: `sum(count_over_time({job="3"}[1m]))`,
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, 100000000),
+ Plan: &plan.QueryPlan{
+ AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
+ },
+ },
+ },
+ )
+ require.NoError(t, err)
+ defer it.Close()
+
+ for it.Next() {
+ // Consume the iterator
+ require.NoError(t, it.Error())
+ }
+
+ require.Equal(t, `sum(count_over_time({job="3"}[1m]))`, wrapper.query)
+ require.Equal(t, 10, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
+}
+
+type testExtractorWrapper struct {
+ query string
+ extractor *mockExtractor
+}
+
+func (t *testExtractorWrapper) Wrap(extractor log.SampleExtractor, query string) log.SampleExtractor {
+ t.query = query
+ t.extractor.wrappedExtractor = extractor
+ return t.extractor
+}
+
+func newMockExtractor() *mockExtractor {
+ return &mockExtractor{
+ sp: &mockStreamExtractor{},
+ }
+}
+
+type mockExtractor struct {
+ wrappedExtractor log.SampleExtractor
+ sp *mockStreamExtractor
+}
+
+func (p *mockExtractor) ForStream(l labels.Labels) log.StreamSampleExtractor {
+ sp := p.wrappedExtractor.ForStream(l)
+ p.sp.wrappedSP = sp
+ return p.sp
+}
+
+func (p *mockExtractor) Reset() {}
+
+// A stub always returns the same data
+type mockStreamExtractor struct {
+ wrappedSP log.StreamSampleExtractor
+ called int
+}
+
+func (p *mockStreamExtractor) BaseLabels() log.LabelsResult {
+ return p.wrappedSP.BaseLabels()
+}
+
+func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) (float64, log.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.Process(ts, line, lbs...)
+}
+
+func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) (float64, log.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.ProcessString(ts, line, lbs...)
+}
+
func Test_QueryWithDelete(t *testing.T) {
instance := defaultInstance(t)
@@ -824,7 +992,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("invalid push returns error", func(t *testing.T) {
- i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@@ -843,7 +1011,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("valid push returns no error", func(t *testing.T) {
- i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
+ i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@@ -1174,6 +1342,8 @@ func defaultInstance(t *testing.T) *instance {
NilMetrics,
nil,
nil,
+ nil,
+ nil,
NewStreamRateCalculator(),
nil,
)
diff --git a/pkg/logql/log/base b/pkg/logql/log/base
new file mode 100644
index 0000000000000..6a3c59eeed9c1
--- /dev/null
+++ b/pkg/logql/log/base
@@ -0,0 +1,12 @@
+goos: linux
+goarch: amd64
+pkg: github.com/grafana/loki/pkg/logql/log
+cpu: AMD Ryzen 9 5950X 16-Core Processor
+Benchmark_Pipeline/pipeline_bytes-32 153394 7749 ns/op 5082 B/op 37 allocs/op
+Benchmark_Pipeline/pipeline_string-32 153319 7691 ns/op 5146 B/op 38 allocs/op
+Benchmark_Pipeline/line_extractor_bytes-32 148069 7922 ns/op 5151 B/op 37 allocs/op
+Benchmark_Pipeline/line_extractor_string-32 149070 7900 ns/op 5149 B/op 37 allocs/op
+Benchmark_Pipeline/label_extractor_bytes-32 146860 7952 ns/op 5148 B/op 37 allocs/op
+Benchmark_Pipeline/label_extractor_string-32 147686 7940 ns/op 5148 B/op 37 allocs/op
+PASS
+ok github.com/grafana/loki/pkg/logql/log 190.270s
diff --git a/pkg/logql/log/cpu-2.out b/pkg/logql/log/cpu-2.out
new file mode 100644
index 0000000000000..ee5e47d024649
Binary files /dev/null and b/pkg/logql/log/cpu-2.out differ
diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go
index e28f5a119a48b..9257834eee345 100644
--- a/pkg/logql/log/fmt.go
+++ b/pkg/logql/log/fmt.go
@@ -221,7 +221,10 @@ func (lf *LineFormatter) Process(ts int64, line []byte, lbs *LabelsBuilder) ([]b
lf.currentLine = line
lf.currentTs = ts
- if err := lf.Template.Execute(lf.buf, lbs.Map()); err != nil {
+ // map now is taking from a pool
+ m := lbs.Map()
+ defer smp.Put(m)
+ if err := lf.Template.Execute(lf.buf, m); err != nil {
lbs.SetErr(errTemplateFormat)
lbs.SetErrorDetails(err.Error())
return line, true
@@ -380,7 +383,8 @@ func (lf *LabelsFormatter) Process(ts int64, l []byte, lbs *LabelsBuilder) ([]by
lf.currentLine = l
lf.currentTs = ts
- var data interface{}
+ var m = smp.Get()
+ defer smp.Put(m)
for _, f := range lf.formats {
if f.Rename {
v, category, ok := lbs.GetWithCategory(f.Value)
@@ -391,10 +395,10 @@ func (lf *LabelsFormatter) Process(ts int64, l []byte, lbs *LabelsBuilder) ([]by
continue
}
lf.buf.Reset()
- if data == nil {
- data = lbs.Map()
+ if len(m) == 0 {
+ lbs.IntoMap(m)
}
- if err := f.tmpl.Execute(lf.buf, data); err != nil {
+ if err := f.tmpl.Execute(lf.buf, m); err != nil {
lbs.SetErr(errTemplateFormat)
lbs.SetErrorDetails(err.Error())
continue
diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go
index 7bc313c8c302c..567c446b8c008 100644
--- a/pkg/logql/log/labels.go
+++ b/pkg/logql/log/labels.go
@@ -3,6 +3,7 @@ package log
import (
"fmt"
"sort"
+ "sync"
"github.com/prometheus/prometheus/model/labels"
@@ -437,6 +438,52 @@ func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCat
return buf
}
+type stringMapPool struct {
+ pool sync.Pool
+}
+
+func newStringMapPool() *stringMapPool {
+ return &stringMapPool{
+ pool: sync.Pool{
+ New: func() interface{} {
+ return make(map[string]string)
+ },
+ },
+ }
+}
+
+func (s *stringMapPool) Get() map[string]string {
+ m := s.pool.Get().(map[string]string)
+ clear(m)
+ return m
+}
+
+func (s *stringMapPool) Put(m map[string]string) {
+ s.pool.Put(m)
+}
+
+var smp = newStringMapPool()
+
+// puts labels entries into an existing map, it is up to the caller to
+// properly clear the map if it is going to be reused
+func (b *LabelsBuilder) IntoMap(m map[string]string) {
+ if !b.hasDel() && !b.hasAdd() && !b.HasErr() {
+ if b.baseMap == nil {
+ b.baseMap = b.base.Map()
+ for k, v := range b.baseMap {
+ m[k] = v
+ }
+ }
+ return
+ }
+ b.buf = b.UnsortedLabels(b.buf)
+ // todo should we also cache maps since limited by the result ?
+ // Maps also don't create a copy of the labels.
+ for _, l := range b.buf {
+ m[l.Name] = l.Value
+ }
+}
+
func (b *LabelsBuilder) Map() map[string]string {
if !b.hasDel() && !b.hasAdd() && !b.HasErr() {
if b.baseMap == nil {
@@ -447,7 +494,8 @@ func (b *LabelsBuilder) Map() map[string]string {
b.buf = b.UnsortedLabels(b.buf)
// todo should we also cache maps since limited by the result ?
// Maps also don't create a copy of the labels.
- res := make(map[string]string, len(b.buf))
+ res := smp.Get()
+ clear(res)
for _, l := range b.buf {
res[l.Name] = l.Value
}
diff --git a/pkg/logql/log/last b/pkg/logql/log/last
new file mode 100644
index 0000000000000..b96c8a7ea6968
--- /dev/null
+++ b/pkg/logql/log/last
@@ -0,0 +1,12 @@
+goos: linux
+goarch: amd64
+pkg: github.com/grafana/loki/pkg/logql/log
+cpu: AMD Ryzen 9 5950X 16-Core Processor
+Benchmark_Pipeline/pipeline_bytes-32 172089 6673 ns/op 1447 B/op 35 allocs/op
+Benchmark_Pipeline/pipeline_string-32 179335 6670 ns/op 1511 B/op 36 allocs/op
+Benchmark_Pipeline/line_extractor_bytes-32 172724 6853 ns/op 1513 B/op 35 allocs/op
+Benchmark_Pipeline/line_extractor_string-32 170238 6866 ns/op 1513 B/op 35 allocs/op
+Benchmark_Pipeline/label_extractor_bytes-32 169942 7004 ns/op 1513 B/op 35 allocs/op
+Benchmark_Pipeline/label_extractor_string-32 168538 6982 ns/op 1513 B/op 35 allocs/op
+PASS
+ok github.com/grafana/loki/pkg/logql/log 7.650s
diff --git a/pkg/logql/log/log.test b/pkg/logql/log/log.test
new file mode 100755
index 0000000000000..6f07362505cce
Binary files /dev/null and b/pkg/logql/log/log.test differ
diff --git a/pkg/logql/log/mapPool b/pkg/logql/log/mapPool
new file mode 100644
index 0000000000000..b28f0868202d9
--- /dev/null
+++ b/pkg/logql/log/mapPool
@@ -0,0 +1,12 @@
+goos: linux
+goarch: amd64
+pkg: github.com/grafana/loki/pkg/logql/log
+cpu: AMD Ryzen 9 5950X 16-Core Processor
+Benchmark_Pipeline/pipeline_bytes-32 183506 6356 ns/op 1422 B/op 33 allocs/op
+Benchmark_Pipeline/pipeline_string-32 182916 6377 ns/op 1486 B/op 34 allocs/op
+Benchmark_Pipeline/line_extractor_bytes-32 173494 6676 ns/op 1489 B/op 33 allocs/op
+Benchmark_Pipeline/line_extractor_string-32 175282 6797 ns/op 1489 B/op 33 allocs/op
+Benchmark_Pipeline/label_extractor_bytes-32 172417 6954 ns/op 1489 B/op 33 allocs/op
+Benchmark_Pipeline/label_extractor_string-32 173976 6749 ns/op 1489 B/op 33 allocs/op
+PASS
+ok github.com/grafana/loki/pkg/logql/log 7.650s
diff --git a/pkg/logql/log/mapPool-2 b/pkg/logql/log/mapPool-2
new file mode 100644
index 0000000000000..ddcaab6d065a0
--- /dev/null
+++ b/pkg/logql/log/mapPool-2
@@ -0,0 +1,145 @@
+goos: linux
+goarch: amd64
+pkg: github.com/grafana/loki/pkg/logql/log
+cpu: AMD Ryzen 9 5950X 16-Core Processor
+Benchmark_LineFilter/default_true_foo.*-32 38151235 33.71 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_foo.*-32 137415068 8.667 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_foo.*-32 36657951 34.40 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_foo.*-32 122026106 10.14 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_(?i)foo-32 685251 1844 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_(?i)foo-32 6627051 172.0 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_(?i)foo-32 670104 1869 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_(?i)foo-32 6760113 177.3 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_.*foo.*-32 542223 2356 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_.*foo.*-32 138826058 8.461 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_.*foo.*-32 527907 2340 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_.*foo.*-32 122260276 9.920 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_.*foo-32 535429 2230 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_.*foo-32 143817646 8.291 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_.*foo-32 554611 2180 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_.*foo-32 128610806 9.361 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_foo|bar-32 7713898 154.0 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_foo|bar-32 56178351 22.09 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_foo|bar-32 7733646 156.7 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_foo|bar-32 50851335 23.65 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_foo|bar|buzz-32 7446480 165.1 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_foo|bar|buzz-32 47580012 24.81 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_foo|bar|buzz-32 7711191 168.0 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_foo|bar|buzz-32 44084361 27.07 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_foo|(bar|buzz)-32 6743854 187.5 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_foo|(bar|buzz)-32 49474693 24.89 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_foo|(bar|buzz)-32 6355500 187.8 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_foo|(bar|buzz)-32 44581922 27.50 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_foo|bar.*|buzz-32 1375010 895.3 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_foo|bar.*|buzz-32 48195949 25.21 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_foo|bar.*|buzz-32 1346467 881.6 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_foo|bar.*|buzz-32 43916170 27.01 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_.*foo.*|bar|uzz-32 688521 1740 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_.*foo.*|bar|uzz-32 49346318 24.23 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_.*foo.*|bar|uzz-32 691778 1706 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_.*foo.*|bar|uzz-32 44886036 26.88 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_((f.*)|foobar.*)|.*buzz-32 1225946 990.5 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_((f.*)|foobar.*)|.*buzz-32 26295878 48.02 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_((f.*)|foobar.*)|.*buzz-32 1216020 996.3 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_((f.*)|foobar.*)|.*buzz-32 23552031 50.87 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_(?P.*foo.*|bar)-32 745039 1608 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_(?P.*foo.*|bar)-32 56299695 21.59 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_(?P.*foo.*|bar)-32 743581 1600 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_(?P.*foo.*|bar)-32 51396477 23.81 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_(\s|")+(?i)bar-32 592923 2031 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_(\s|")+(?i)bar-32 594696 2039 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_(\s|")+(?i)bar-32 593332 2049 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_(\s|")+(?i)bar-32 595462 2047 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_(node:24)_buzz*-32 34552183 35.38 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_(node:24)_buzz*-32 34696462 34.55 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_(node:24)_buzz*-32 33236223 35.99 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_(node:24)_buzz*-32 33668626 35.78 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_(HTTP/.*\"|HEAD|GET)_(2..|5..)-32 483506 2491 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_(HTTP/.*\"|HEAD|GET)_(2..|5..)-32 544736 2233 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_(HTTP/.*\"|HEAD|GET)_(2..|5..)-32 487765 2386 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_(HTTP/.*\"|HEAD|GET)_(2..|5..)-32 505735 2401 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_"@l":"(Warning|Error|Fatal)"-32 31796106 38.14 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_"@l":"(Warning|Error|Fatal)"-32 32254904 36.92 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_"@l":"(Warning|Error|Fatal)"-32 31895373 36.64 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_"@l":"(Warning|Error|Fatal)"-32 31727758 38.02 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_true_(?:)foo|fatal|exception-32 347808 3469 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_true_(?:)foo|fatal|exception-32 23139100 52.71 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/default_false_(?:)foo|fatal|exception-32 347662 3438 ns/op 0 B/op 0 allocs/op
+Benchmark_LineFilter/simplified_false_(?:)foo|fatal|exception-32 21433672 55.89 ns/op 0 B/op 0 allocs/op
+Benchmark_IPFilter/127.0.0.1-32 219172 5644 ns/op 416 B/op 9 allocs/op
+Benchmark_IPFilter/192.168.0.1-192.189.10.12-32 209308 5724 ns/op 415 B/op 9 allocs/op
+Benchmark_IPFilter/192.168.4.5/16-32 218125 5401 ns/op 416 B/op 9 allocs/op
+BenchmarkLineLabelFilters/foo-32 131752836 9.117 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(foo)-32 135408832 9.053 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(foo|ba)-32 89578142 13.30 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(foo|ba|ar)-32 67785404 17.53 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(foo|(ba|ar))-32 67866519 17.71 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/foo.*-32 79162518 14.94 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*foo.*-32 79812434 15.00 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(.*)(foo).*-32 81738111 14.58 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(foo.*|.*ba)-32 68743644 17.68 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(foo.*|.*bar.*)-32 67988179 17.80 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*foo.*|bar-32 65331152 17.52 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*foo|bar-32 68870803 17.54 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?:.*foo.*|bar)-32 67384971 17.38 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?P.*foo.*|bar)-32 67459786 17.38 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*foo.*|bar|buzz-32 68189577 17.57 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*foo.*|bar|uzz-32 59306607 20.23 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/foo|bar|b|buzz|zz-32 38298559 31.95 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/f|foo|foobar-32 73199528 16.04 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/f.*|foobar.*|.*buzz-32 62954108 19.15 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/((f.*)|foobar.*)|.*buzz-32 63847633 19.13 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*-32 597892452 2.002 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*|.*-32 599336366 2.028 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/.*||||-32 589706043 2.027 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/#00-32 597831002 2.045 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i)foo-32 132216960 9.082 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i)界-32 133050820 9.099 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i)ïB-32 132711472 9.095 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?:)foo|fatal|exception-32 59412402 20.25 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i)foo|fatal|exception-32 63390085 19.01 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i)f|foo|foobar-32 82988314 14.56 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i)f|fatal|e.*-32 68591342 17.30 ns/op 0 B/op 0 allocs/op
+BenchmarkLineLabelFilters/(?i).*foo.*-32 89400224 13.22 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/json/no_labels_hints-32 494869 2361 ns/op 280 B/op 18 allocs/op
+Benchmark_Parser/json/labels_hints-32 756115 1578 ns/op 176 B/op 12 allocs/op
+Benchmark_Parser/json/inline_stages-32 2256870 531.7 ns/op 64 B/op 4 allocs/op
+Benchmark_Parser/jsonParser-not_json_line/no_labels_hints-32 20541764 58.50 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/jsonParser-not_json_line/labels_hints-32 22325911 55.04 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/jsonParser-not_json_line/inline_stages-32 21640119 56.22 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/unpack/no_labels_hints-32 3555292 334.3 ns/op 80 B/op 4 allocs/op
+Benchmark_Parser/unpack/labels_hints-32 3595738 332.1 ns/op 80 B/op 4 allocs/op
+Benchmark_Parser/unpack/inline_stages-32 3591871 335.9 ns/op 80 B/op 4 allocs/op
+Benchmark_Parser/unpack-not_json_line/no_labels_hints-32 100000000 11.37 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/unpack-not_json_line/labels_hints-32 100000000 11.23 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/unpack-not_json_line/inline_stages-32 100000000 11.21 ns/op 0 B/op 0 allocs/op
+Benchmark_Parser/logfmt/no_labels_hints-32 742243 1539 ns/op 336 B/op 16 allocs/op
+Benchmark_Parser/logfmt/labels_hints-32 723044 1588 ns/op 336 B/op 16 allocs/op
+Benchmark_Parser/logfmt/inline_stages-32 2310529 516.1 ns/op 74 B/op 6 allocs/op
+Benchmark_Parser/regex_greedy/no_labels_hints-32 428275 2798 ns/op 193 B/op 2 allocs/op
+Benchmark_Parser/regex_greedy/labels_hints-32 423408 2831 ns/op 192 B/op 2 allocs/op
+Benchmark_Parser/regex_greedy/inline_stages-32 421518 2786 ns/op 193 B/op 2 allocs/op
+Benchmark_Parser/regex_status_digits/no_labels_hints-32 5650633 211.7 ns/op 51 B/op 2 allocs/op
+Benchmark_Parser/regex_status_digits/labels_hints-32 5602600 212.3 ns/op 51 B/op 2 allocs/op
+Benchmark_Parser/regex_status_digits/inline_stages-32 5722987 208.5 ns/op 51 B/op 2 allocs/op
+Benchmark_Parser/pattern/no_labels_hints-32 9893188 119.6 ns/op 35 B/op 2 allocs/op
+Benchmark_Parser/pattern/labels_hints-32 10805054 110.3 ns/op 32 B/op 1 allocs/op
+Benchmark_Parser/pattern/inline_stages-32 11540605 103.9 ns/op 3 B/op 1 allocs/op
+BenchmarkKeyExtraction/json-32 6076245 194.7 ns/op 5 B/op 1 allocs/op
+BenchmarkKeyExtraction/logfmt-32 8722742 136.3 ns/op 5 B/op 1 allocs/op
+BenchmarkKeyExtraction/logfmt-expression-32 3464037 346.2 ns/op 4 B/op 1 allocs/op
+BenchmarkJsonExpressionParser/json-expression-32 1308164 916.0 ns/op 112 B/op 10 allocs/op
+Benchmark_Pipeline/pipeline_bytes-32 183174 6456 ns/op 1453 B/op 34 allocs/op
+Benchmark_Pipeline/pipeline_string-32 180760 6470 ns/op 1517 B/op 35 allocs/op
+Benchmark_Pipeline/line_extractor_bytes-32 172455 6756 ns/op 1520 B/op 34 allocs/op
+Benchmark_Pipeline/line_extractor_string-32 174583 6741 ns/op 1520 B/op 34 allocs/op
+Benchmark_Pipeline/label_extractor_bytes-32 173482 6820 ns/op 1522 B/op 34 allocs/op
+Benchmark_Pipeline/label_extractor_string-32 171506 6833 ns/op 1520 B/op 34 allocs/op
+BenchmarkJSONParser-32 727914 1444 ns/op 904 B/op 11 allocs/op
+BenchmarkJSONParserInvalidLine-32 2764609 433.3 ns/op 248 B/op 7 allocs/op
+BenchmarkJSONExpressionParser-32 995520 1041 ns/op 744 B/op 11 allocs/op
+BenchmarkJSONExpressionParserInvalidLine-32 3884377 303.3 ns/op 184 B/op 7 allocs/op
+BenchmarkLogfmtParser-32 363714 3149 ns/op 1512 B/op 24 allocs/op
+BenchmarkLogfmtExpressionParser-32 567150 2071 ns/op 474 B/op 11 allocs/op
+PASS
+ok github.com/grafana/loki/pkg/logql/log 195.769s
diff --git a/pkg/logql/log/mapPool-3 b/pkg/logql/log/mapPool-3
new file mode 100644
index 0000000000000..e1153e18a87d2
--- /dev/null
+++ b/pkg/logql/log/mapPool-3
@@ -0,0 +1,12 @@
+goos: linux
+goarch: amd64
+pkg: github.com/grafana/loki/pkg/logql/log
+cpu: AMD Ryzen 9 5950X 16-Core Processor
+Benchmark_Pipeline/pipeline_bytes-32 180068 6571 ns/op 1455 B/op 34 allocs/op
+Benchmark_Pipeline/pipeline_string-32 179401 6597 ns/op 1518 B/op 35 allocs/op
+Benchmark_Pipeline/line_extractor_bytes-32 158112 6923 ns/op 1521 B/op 34 allocs/op
+Benchmark_Pipeline/line_extractor_string-32 150291 7061 ns/op 1521 B/op 34 allocs/op
+Benchmark_Pipeline/label_extractor_bytes-32 166911 7037 ns/op 1521 B/op 34 allocs/op
+Benchmark_Pipeline/label_extractor_string-32 169440 6983 ns/op 1522 B/op 34 allocs/op
+PASS
+ok github.com/grafana/loki/pkg/logql/log 7.548s
diff --git a/pkg/logql/log/mem-2.out b/pkg/logql/log/mem-2.out
new file mode 100644
index 0000000000000..c3a079a13b01d
Binary files /dev/null and b/pkg/logql/log/mem-2.out differ
diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go
index cd4ef3b8e7af7..f606ade6b6339 100644
--- a/pkg/logql/log/metrics_extraction.go
+++ b/pkg/logql/log/metrics_extraction.go
@@ -38,6 +38,12 @@ type StreamSampleExtractor interface {
ProcessString(ts int64, line string, structuredMetadata ...labels.Label) (float64, LabelsResult, bool)
}
+// SampleExtractorWrapper takes an extractor, wraps it is some desired functionality
+// and returns a new pipeline
+type SampleExtractorWrapper interface {
+ Wrap(extractor SampleExtractor, query string) SampleExtractor
+}
+
type lineSampleExtractor struct {
Stage
LineExtractor
diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go
index 31665e7b303ae..308d131dc5d11 100644
--- a/pkg/logql/log/pipeline.go
+++ b/pkg/logql/log/pipeline.go
@@ -35,6 +35,12 @@ type Stage interface {
RequiredLabelNames() []string
}
+// PipelineWrapper takes a pipeline, wraps it is some desired functionality and
+// returns a new pipeline
+type PipelineWrapper interface {
+ Wrap(pipeline Pipeline, query string) Pipeline
+}
+
// NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is.
func NewNoopPipeline() Pipeline {
return &noopPipeline{
diff --git a/pkg/logql/log/profile.out b/pkg/logql/log/profile.out
new file mode 100644
index 0000000000000..80c1e5fd1f6d3
Binary files /dev/null and b/pkg/logql/log/profile.out differ
diff --git a/pkg/logql/syntax/serialize.go b/pkg/logql/syntax/serialize.go
index 2d7a1d786fda7..53c4bef37d290 100644
--- a/pkg/logql/syntax/serialize.go
+++ b/pkg/logql/syntax/serialize.go
@@ -623,9 +623,7 @@ func decodeLabelFilter(iter *jsoniter.Iterator) log.LabelFilterer {
}
var matcher *labels.Matcher
- if name != "" && value != "" {
- matcher = labels.MustNewMatcher(t, name, value)
- }
+ matcher = labels.MustNewMatcher(t, name, value)
filter = log.NewStringLabelFilter(matcher)
diff --git a/pkg/logql/syntax/serialize_test.go b/pkg/logql/syntax/serialize_test.go
index 846e3988b852b..f4051caaf7ea1 100644
--- a/pkg/logql/syntax/serialize_test.go
+++ b/pkg/logql/syntax/serialize_test.go
@@ -50,6 +50,9 @@ func TestJSONSerializationRoundTrip(t *testing.T) {
"multiple post filters": {
query: `rate({app="foo"} | json | unwrap foo | latency >= 250ms or bytes > 42B or ( status_code < 500 and status_code > 200) or source = ip("") and user = "me" [1m])`,
},
+ "empty label filter string": {
+ query: `rate({app="foo"} |= "bar" | json | unwrap latency | path!="" [5m])`,
+ },
}
for name, test := range tests {
diff --git a/pkg/logql/syntax/visit.go b/pkg/logql/syntax/visit.go
new file mode 100644
index 0000000000000..70c931ad49467
--- /dev/null
+++ b/pkg/logql/syntax/visit.go
@@ -0,0 +1,285 @@
+package syntax
+
+type AcceptVisitor interface {
+ Accept(RootVisitor)
+}
+
+type RootVisitor interface {
+ SampleExprVisitor
+ LogSelectorExprVisitor
+ StageExprVisitor
+
+ VisitLogRange(*LogRange)
+}
+
+type SampleExprVisitor interface {
+ VisitBinOp(*BinOpExpr)
+ VisitVectorAggregation(*VectorAggregationExpr)
+ VisitRangeAggregation(*RangeAggregationExpr)
+ VisitLabelReplace(*LabelReplaceExpr)
+ VisitLiteral(*LiteralExpr)
+ VisitVector(*VectorExpr)
+}
+
+type LogSelectorExprVisitor interface {
+ VisitMatchers(*MatchersExpr)
+ VisitPipeline(*PipelineExpr)
+ VisitLiteral(*LiteralExpr)
+ VisitVector(*VectorExpr)
+}
+
+type StageExprVisitor interface {
+ VisitDecolorize(*DecolorizeExpr)
+ VisitDropLabels(*DropLabelsExpr)
+ VisitJSONExpressionParser(*JSONExpressionParser)
+ VisitKeepLabel(*KeepLabelsExpr)
+ VisitLabelFilter(*LabelFilterExpr)
+ VisitLabelFmt(*LabelFmtExpr)
+ VisitLabelParser(*LabelParserExpr)
+ VisitLineFilter(*LineFilterExpr)
+ VisitLineFmt(*LineFmtExpr)
+ VisitLogfmtExpressionParser(*LogfmtExpressionParser)
+ VisitLogfmtParser(*LogfmtParserExpr)
+}
+
+var _ RootVisitor = &DepthFirstTraversal{}
+
+type DepthFirstTraversal struct {
+ VisitBinOpFn func(v RootVisitor, e *BinOpExpr)
+ VisitDecolorizeFn func(v RootVisitor, e *DecolorizeExpr)
+ VisitDropLabelsFn func(v RootVisitor, e *DropLabelsExpr)
+ VisitJSONExpressionParserFn func(v RootVisitor, e *JSONExpressionParser)
+ VisitKeepLabelFn func(v RootVisitor, e *KeepLabelsExpr)
+ VisitLabelFilterFn func(v RootVisitor, e *LabelFilterExpr)
+ VisitLabelFmtFn func(v RootVisitor, e *LabelFmtExpr)
+ VisitLabelParserFn func(v RootVisitor, e *LabelParserExpr)
+ VisitLabelReplaceFn func(v RootVisitor, e *LabelReplaceExpr)
+ VisitLineFilterFn func(v RootVisitor, e *LineFilterExpr)
+ VisitLineFmtFn func(v RootVisitor, e *LineFmtExpr)
+ VisitLiteralFn func(v RootVisitor, e *LiteralExpr)
+ VisitLogRangeFn func(v RootVisitor, e *LogRange)
+ VisitLogfmtExpressionParserFn func(v RootVisitor, e *LogfmtExpressionParser)
+ VisitLogfmtParserFn func(v RootVisitor, e *LogfmtParserExpr)
+ VisitMatchersFn func(v RootVisitor, e *MatchersExpr)
+ VisitPipelineFn func(v RootVisitor, e *PipelineExpr)
+ VisitRangeAggregationFn func(v RootVisitor, e *RangeAggregationExpr)
+ VisitVectorFn func(v RootVisitor, e *VectorExpr)
+ VisitVectorAggregationFn func(v RootVisitor, e *VectorAggregationExpr)
+}
+
+// VisitBinOp implements RootVisitor.
+func (v *DepthFirstTraversal) VisitBinOp(e *BinOpExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitBinOpFn != nil {
+ v.VisitBinOpFn(v, e)
+ } else {
+ e.SampleExpr.Accept(v)
+ e.RHS.Accept(v)
+ }
+}
+
+// VisitDecolorize implements RootVisitor.
+func (v *DepthFirstTraversal) VisitDecolorize(e *DecolorizeExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitDecolorizeFn != nil {
+ v.VisitDecolorizeFn(v, e)
+ }
+}
+
+// VisitDropLabels implements RootVisitor.
+func (v *DepthFirstTraversal) VisitDropLabels(e *DropLabelsExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitDecolorizeFn != nil {
+ v.VisitDropLabelsFn(v, e)
+ }
+}
+
+// VisitJSONExpressionParser implements RootVisitor.
+func (v *DepthFirstTraversal) VisitJSONExpressionParser(e *JSONExpressionParser) {
+ if e == nil {
+ return
+ }
+ if v.VisitJSONExpressionParserFn != nil {
+ v.VisitJSONExpressionParserFn(v, e)
+ }
+}
+
+// VisitKeepLabel implements RootVisitor.
+func (v *DepthFirstTraversal) VisitKeepLabel(e *KeepLabelsExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitKeepLabelFn != nil {
+ v.VisitKeepLabelFn(v, e)
+ }
+}
+
+// VisitLabelFilter implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLabelFilter(e *LabelFilterExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLabelFilterFn != nil {
+ v.VisitLabelFilterFn(v, e)
+ }
+}
+
+// VisitLabelFmt implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLabelFmt(e *LabelFmtExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLabelFmtFn != nil {
+ v.VisitLabelFmtFn(v, e)
+ }
+}
+
+// VisitLabelParser implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLabelParser(e *LabelParserExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLabelParserFn != nil {
+ v.VisitLabelParserFn(v, e)
+ }
+}
+
+// VisitLabelReplace implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLabelReplace(e *LabelReplaceExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLabelReplaceFn != nil {
+ v.VisitLabelReplaceFn(v, e)
+ }
+}
+
+// VisitLineFilter implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLineFilter(e *LineFilterExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLineFilterFn != nil {
+ v.VisitLineFilterFn(v, e)
+ } else {
+ e.Left.Accept(v)
+ e.Or.Accept(v)
+ }
+}
+
+// VisitLineFmt implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLineFmt(e *LineFmtExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLineFmtFn != nil {
+ v.VisitLineFmtFn(v, e)
+ }
+}
+
+// VisitLiteral implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLiteral(e *LiteralExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLiteralFn != nil {
+ v.VisitLiteralFn(v, e)
+ }
+}
+
+// VisitLogRange implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLogRange(e *LogRange) {
+ if e == nil {
+ return
+ }
+ if v.VisitLogRangeFn != nil {
+ v.VisitLogRangeFn(v, e)
+ } else {
+ e.Left.Accept(v)
+ }
+}
+
+// VisitLogfmtExpressionParser implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLogfmtExpressionParser(e *LogfmtExpressionParser) {
+ if e == nil {
+ return
+ }
+ if v.VisitLogfmtExpressionParserFn != nil {
+ v.VisitLogfmtExpressionParserFn(v, e)
+ }
+}
+
+// VisitLogfmtParser implements RootVisitor.
+func (v *DepthFirstTraversal) VisitLogfmtParser(e *LogfmtParserExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitLogfmtParserFn != nil {
+ v.VisitLogfmtParserFn(v, e)
+ }
+}
+
+// VisitMatchers implements RootVisitor.
+func (v *DepthFirstTraversal) VisitMatchers(e *MatchersExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitMatchersFn != nil {
+ v.VisitMatchersFn(v, e)
+ }
+}
+
+// VisitPipeline implements RootVisitor.
+func (v *DepthFirstTraversal) VisitPipeline(e *PipelineExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitPipelineFn != nil {
+ v.VisitPipelineFn(v, e)
+ } else {
+ e.Left.Accept(v)
+ for i := range e.MultiStages {
+ e.MultiStages[i].Accept(v)
+ }
+ }
+}
+
+// VisitRangeAggregation implements RootVisitor.
+func (v *DepthFirstTraversal) VisitRangeAggregation(e *RangeAggregationExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitRangeAggregationFn != nil {
+ v.VisitRangeAggregationFn(v, e)
+ } else {
+ e.Left.Accept(v)
+ }
+}
+
+// VisitVector implements RootVisitor.
+func (v *DepthFirstTraversal) VisitVector(e *VectorExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitVectorFn != nil {
+ v.VisitVectorFn(v, e)
+ }
+}
+
+// VisitVectorAggregation implements RootVisitor.
+func (v *DepthFirstTraversal) VisitVectorAggregation(e *VectorAggregationExpr) {
+ if e == nil {
+ return
+ }
+ if v.VisitVectorAggregationFn != nil {
+ v.VisitVectorAggregationFn(v, e)
+ } else {
+ e.Left.Accept(v)
+ }
+}
diff --git a/pkg/logql/syntax/visit_test.go b/pkg/logql/syntax/visit_test.go
new file mode 100644
index 0000000000000..eeb040ce83a1a
--- /dev/null
+++ b/pkg/logql/syntax/visit_test.go
@@ -0,0 +1,48 @@
+package syntax
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestDepthFirstTraversalVisitor(t *testing.T) {
+
+ visited := [][2]string{}
+
+ visitor := &DepthFirstTraversal{
+ VisitLabelParserFn: func(v RootVisitor, e *LabelParserExpr) {
+ visited = append(visited, [2]string{fmt.Sprintf("%T", e), e.String()})
+ },
+ VisitLineFilterFn: func(v RootVisitor, e *LineFilterExpr) {
+ visited = append(visited, [2]string{fmt.Sprintf("%T", e), e.String()})
+ },
+ VisitLogfmtParserFn: func(v RootVisitor, e *LogfmtParserExpr) {
+ visited = append(visited, [2]string{fmt.Sprintf("%T", e), e.String()})
+ },
+ VisitMatchersFn: func(v RootVisitor, e *MatchersExpr) {
+ visited = append(visited, [2]string{fmt.Sprintf("%T", e), e.String()})
+ },
+ }
+
+ // Only expressions that have a Visit function defined are added to the list
+ expected := [][2]string{
+ {"*syntax.MatchersExpr", `{env="prod"}`},
+ {"*syntax.LineFilterExpr", `|= "foo" or "bar"`},
+ {"*syntax.LogfmtParserExpr", `| logfmt`},
+ {"*syntax.MatchersExpr", `{env="dev"}`},
+ {"*syntax.LineFilterExpr", `|~ "(foo|bar)"`},
+ {"*syntax.LabelParserExpr", `| json`},
+ }
+
+ query := `
+ sum by (container) (min_over_time({env="prod"} |= "foo" or "bar" | logfmt | unwrap duration [1m]))
+ /
+ sum by (container) (max_over_time({env="dev"} |~ "(foo|bar)" | json | unwrap duration [1m]))
+ `
+ expr, err := ParseExpr(query)
+ require.NoError(t, err)
+ expr.Accept(visitor)
+ require.Equal(t, expected, visited)
+}
diff --git a/pkg/logql/syntax/walk.go b/pkg/logql/syntax/walk.go
index c528c9ca63437..291ec8b31036f 100644
--- a/pkg/logql/syntax/walk.go
+++ b/pkg/logql/syntax/walk.go
@@ -1,7 +1,5 @@
package syntax
-import "fmt"
-
type WalkFn = func(e Expr)
func walkAll(f WalkFn, xs ...Walkable) {
@@ -13,120 +11,3 @@ func walkAll(f WalkFn, xs ...Walkable) {
type Walkable interface {
Walk(f WalkFn)
}
-
-type AcceptVisitor interface {
- Accept(RootVisitor)
-}
-
-type RootVisitor interface {
- SampleExprVisitor
- LogSelectorExprVisitor
- StageExprVisitor
-
- VisitLogRange(*LogRange)
-}
-
-type SampleExprVisitor interface {
- VisitBinOp(*BinOpExpr)
- VisitVectorAggregation(*VectorAggregationExpr)
- VisitRangeAggregation(*RangeAggregationExpr)
- VisitLabelReplace(*LabelReplaceExpr)
- VisitLiteral(*LiteralExpr)
- VisitVector(*VectorExpr)
-}
-
-type LogSelectorExprVisitor interface {
- VisitMatchers(*MatchersExpr)
- VisitPipeline(*PipelineExpr)
- VisitLiteral(*LiteralExpr)
- VisitVector(*VectorExpr)
-}
-
-type StageExprVisitor interface {
- VisitDecolorize(*DecolorizeExpr)
- VisitDropLabels(*DropLabelsExpr)
- VisitJSONExpressionParser(*JSONExpressionParser)
- VisitKeepLabel(*KeepLabelsExpr)
- VisitLabelFilter(*LabelFilterExpr)
- VisitLabelFmt(*LabelFmtExpr)
- VisitLabelParser(*LabelParserExpr)
- VisitLineFilter(*LineFilterExpr)
- VisitLineFmt(*LineFmtExpr)
- VisitLogfmtExpressionParser(*LogfmtExpressionParser)
- VisitLogfmtParser(*LogfmtParserExpr)
-}
-
-func Dispatch(root Expr, v RootVisitor) error {
- switch e := root.(type) {
- case SampleExpr:
- DispatchSampleExpr(e, v)
- case LogSelectorExpr:
- DispatchLogSelectorExpr(e, v)
- case StageExpr:
- DispatchStageExpr(e, v)
- case *LogRange:
- v.VisitLogRange(e)
- default:
- return fmt.Errorf("unpexpected root expression type: got (%T)", e)
- }
-
- return nil
-}
-
-func DispatchSampleExpr(expr SampleExpr, v SampleExprVisitor) {
- switch e := expr.(type) {
- case *BinOpExpr:
- v.VisitBinOp(e)
- case *VectorAggregationExpr:
- v.VisitVectorAggregation(e)
- case *RangeAggregationExpr:
- v.VisitRangeAggregation(e)
- case *LabelReplaceExpr:
- v.VisitLabelReplace(e)
- case *LiteralExpr:
- v.VisitLiteral(e)
- case *VectorExpr:
- v.VisitVector(e)
- }
-}
-
-func DispatchLogSelectorExpr(expr LogSelectorExpr, v LogSelectorExprVisitor) {
- switch e := expr.(type) {
- case *PipelineExpr:
- v.VisitPipeline(e)
- case *MatchersExpr:
- v.VisitMatchers(e)
- case *VectorExpr:
- v.VisitVector(e)
- case *LiteralExpr:
- v.VisitLiteral(e)
- }
-}
-
-func DispatchStageExpr(expr StageExpr, v StageExprVisitor) {
- switch e := expr.(type) {
- case *DecolorizeExpr:
- v.VisitDecolorize(e)
- case *DropLabelsExpr:
- v.VisitDropLabels(e)
- case *JSONExpressionParser:
- v.VisitJSONExpressionParser(e)
- case *KeepLabelsExpr:
- v.VisitKeepLabel(e)
- case *LabelFilterExpr:
- v.VisitLabelFilter(e)
- case *LabelFmtExpr:
- v.VisitLabelFmt(e)
- case *LabelParserExpr:
- v.VisitLabelParser(e)
- case *LineFilterExpr:
- v.VisitLineFilter(e)
- case *LineFmtExpr:
- v.VisitLineFmt(e)
- case *LogfmtExpressionParser:
- v.VisitLogfmtExpressionParser(e)
- case *LogfmtParserExpr:
- v.VisitLogfmtParser(e)
- }
-
-}
diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go
index 09f95d794ddc3..c3cdea5392d80 100644
--- a/pkg/loki/loki.go
+++ b/pkg/loki/loki.go
@@ -323,6 +323,7 @@ type Loki struct {
clientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
+ Tee distributor.Tee
HTTPAuthMiddleware middleware.Interface
Codec Codec
diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go
index e7848ef701a25..797e01a098d94 100644
--- a/pkg/loki/modules.go
+++ b/pkg/loki/modules.go
@@ -6,6 +6,7 @@ import (
"fmt"
"hash/fnv"
"math"
+ "net"
"net/http"
"net/http/httputil"
"net/url"
@@ -318,6 +319,7 @@ func (t *Loki) initDistributor() (services.Service, error) {
t.Overrides,
prometheus.DefaultRegisterer,
t.Cfg.MetricsNamespace,
+ t.Tee,
logger,
)
if err != nil {
@@ -579,7 +581,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare_shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PrepareShutdown)),
)
- t.Server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler(
+ t.Server.HTTP.Methods("POST", "GET").Path("/ingester/shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)),
)
return t.Ingester, nil
@@ -867,7 +869,7 @@ func (t *Loki) compactorAddress() (string, bool, error) {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)
if t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
// In single binary or read modes, this module depends on Server
- return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
+ return net.JoinHostPort(t.Cfg.Server.GRPCListenAddress, strconv.Itoa(t.Cfg.Server.GRPCListenPort)), true, nil
}
if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" {
diff --git a/pkg/lokifrontend/frontend/transport/handler.go b/pkg/lokifrontend/frontend/transport/handler.go
index 2a4781b3bc718..03332ee046771 100644
--- a/pkg/lokifrontend/frontend/transport/handler.go
+++ b/pkg/lokifrontend/frontend/transport/handler.go
@@ -25,6 +25,7 @@ import (
querier_stats "github.com/grafana/loki/pkg/querier/stats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
+ "github.com/grafana/loki/pkg/util/server"
)
const (
@@ -133,7 +134,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
queryResponseTime := time.Since(startTime)
if err != nil {
- writeError(w, err)
+ server.WriteError(err, w)
return
}
@@ -229,20 +230,6 @@ func formatQueryString(queryString url.Values) (fields []interface{}) {
return fields
}
-func writeError(w http.ResponseWriter, err error) {
- switch err {
- case context.Canceled:
- err = errCanceled
- case context.DeadlineExceeded:
- err = errDeadlineExceeded
- default:
- if util.IsRequestBodyTooLarge(err) {
- err = errRequestEntityTooLarge
- }
- }
- httpgrpc.WriteError(w, err)
-}
-
func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) {
if stats != nil {
parts := make([]string, 0)
diff --git a/pkg/lokifrontend/frontend/transport/handler_test.go b/pkg/lokifrontend/frontend/transport/handler_test.go
deleted file mode 100644
index 6f25963626712..0000000000000
--- a/pkg/lokifrontend/frontend/transport/handler_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package transport
-
-import (
- "context"
- "net/http"
- "net/http/httptest"
- "testing"
-
- "github.com/grafana/dskit/httpgrpc"
- "github.com/pkg/errors"
- "github.com/stretchr/testify/require"
-)
-
-func TestWriteError(t *testing.T) {
- for _, test := range []struct {
- status int
- err error
- }{
- {http.StatusInternalServerError, errors.New("unknown")},
- {http.StatusGatewayTimeout, context.DeadlineExceeded},
- {StatusClientClosedRequest, context.Canceled},
- {http.StatusBadRequest, httpgrpc.Errorf(http.StatusBadRequest, "")},
- } {
- t.Run(test.err.Error(), func(t *testing.T) {
- w := httptest.NewRecorder()
- writeError(w, test.err)
- require.Equal(t, test.status, w.Result().StatusCode)
- })
- }
-}
diff --git a/pkg/lokifrontend/frontend/v2/frontend.go b/pkg/lokifrontend/frontend/v2/frontend.go
index 695a054e42580..99e3e05ad83c9 100644
--- a/pkg/lokifrontend/frontend/v2/frontend.go
+++ b/pkg/lokifrontend/frontend/v2/frontend.go
@@ -5,7 +5,9 @@ import (
"flag"
"fmt"
"math/rand"
+ "net"
"net/http"
+ "strconv"
"sync"
"time"
@@ -136,7 +138,7 @@ type enqueueResult struct {
func NewFrontend(cfg Config, ring ring.ReadRing, log log.Logger, reg prometheus.Registerer, codec transport.Codec, metricsNamespace string) (*Frontend, error) {
requestsCh := make(chan *frontendRequest)
- schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), ring, requestsCh, log)
+ schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, net.JoinHostPort(cfg.Addr, strconv.Itoa(cfg.Port)), ring, requestsCh, log)
if err != nil {
return nil, err
}
diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go
index fa2de7590465f..b329e2e1dc54c 100644
--- a/pkg/querier/querier_mock_test.go
+++ b/pkg/querier/querier_mock_test.go
@@ -6,6 +6,8 @@ import (
"fmt"
"time"
+ "github.com/grafana/loki/pkg/logql/log"
+
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/dskit/grpcclient"
@@ -298,8 +300,9 @@ type storeMock struct {
func newStoreMock() *storeMock {
return &storeMock{}
}
-
-func (s *storeMock) SetChunkFilterer(chunk.RequestChunkFilterer) {}
+func (s *storeMock) SetChunkFilterer(chunk.RequestChunkFilterer) {}
+func (s *storeMock) SetExtractorWrapper(log.SampleExtractorWrapper) {}
+func (s *storeMock) SetPipelineWrapper(log.PipelineWrapper) {}
func (s *storeMock) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
args := s.Called(ctx, req)
diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go
index 5dba31f3eebc4..f95da0eba16d4 100644
--- a/pkg/querier/worker_service.go
+++ b/pkg/querier/worker_service.go
@@ -1,7 +1,8 @@
package querier
import (
- "fmt"
+ "net"
+ "strconv"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@@ -91,7 +92,7 @@ func InitWorkerService(
if cfg.GrpcListenAddress != "" {
listenAddress = cfg.GrpcListenAddress
}
- address := fmt.Sprintf("%s:%d", listenAddress, cfg.GrpcListenPort)
+ address := net.JoinHostPort(listenAddress, strconv.Itoa(cfg.GrpcListenPort))
level.Warn(util_log.Logger).Log(
"msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.",
"address", address)
diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go
index aab4631e86e4f..006106aa44a61 100644
--- a/pkg/queue/queue.go
+++ b/pkg/queue/queue.go
@@ -139,7 +139,7 @@ func (q *RequestQueue) ReleaseRequests(items []Request) {
// The caller is responsible for returning the dequeued requests back to the
// pool by calling ReleaseRequests(items).
func (q *RequestQueue) DequeueMany(ctx context.Context, last QueueIndex, consumerID string, maxItems int, maxWait time.Duration) ([]Request, QueueIndex, error) {
- // create a context for dequeuing with a max time we want to wait to fullfill the desired maxItems
+ // create a context for dequeuing with a max time we want to wait to fulfill the desired maxItems
dequeueCtx, cancel := context.WithTimeout(ctx, maxWait)
defer cancel()
diff --git a/pkg/storage/chunk/cache/memcached.go b/pkg/storage/chunk/cache/memcached.go
index f5f2c2e19d795..9b6150839cd29 100644
--- a/pkg/storage/chunk/cache/memcached.go
+++ b/pkg/storage/chunk/cache/memcached.go
@@ -11,7 +11,6 @@ import (
"github.com/go-kit/log"
instr "github.com/grafana/dskit/instrument"
"github.com/grafana/gomemcache/memcache"
- "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -20,10 +19,6 @@ import (
"github.com/grafana/loki/pkg/util/math"
)
-var (
- ErrMemcachedStoppedByClient = errors.New("cache is stopped by client")
-)
-
// MemcachedConfig is config to make a Memcached
type MemcachedConfig struct {
Expiration time.Duration `yaml:"expiration"`
@@ -55,16 +50,7 @@ type Memcached struct {
// So that any writer goroutine wouldn't write to it after closing `intputCh`
closed chan struct{}
- // stopped track if `inputCh` and `closed` chan need to closed. Reason being,
- // there are two entry points that can close these channels, when client calls
- // .Stop() explicitly, or passed context is cancelled.
- // So `Stop()` will make sure it's not closing the channels that are already closed, which may cause a panic.
- stopped sync.Once
-
logger log.Logger
-
- // NOTE: testFetchDelay should be used only for testing. See `SetTestFetchDelay()` method for more details.
- testFetchDelay chan struct{}
}
// NewMemcached makes a new Memcached.
@@ -108,12 +94,7 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg
batchID: input.batchID,
}
res.found, res.bufs, res.missed, res.err = c.fetch(input.ctx, input.keys)
- // NOTE: This check is needed because goroutines submitting work via `inputCh` may exit in-between because of context cancellation or timeout. This helps to close these worker goroutines to exit without hanging around.
- select {
- case <-c.closed:
- return
- case input.resultCh <- res:
- }
+ input.resultCh <- res
}
}()
@@ -189,33 +170,21 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b
func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
resultsCh := make(chan *result)
- var workerErr error // any error (timeout, context cancel) happened in worker go routine that we start in this method?
-
batchSize := c.cfg.BatchSize
go func() {
for i, j := 0, 0; i < len(keys); i += batchSize {
batchKeys := keys[i:math.Min(i+batchSize, len(keys))]
select {
- case <-ctx.Done():
- c.closeAndStop()
- workerErr = ctx.Err()
- return
case <-c.closed:
- workerErr = ErrMemcachedStoppedByClient
return
default:
- if c.testFetchDelay != nil {
- <-c.testFetchDelay
- }
-
c.inputCh <- &work{
keys: batchKeys,
ctx: ctx,
resultCh: resultsCh,
batchID: j,
}
-
j++
}
}
@@ -236,11 +205,9 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
// Also we do close(resultsCh) in the same goroutine so <-resultCh may never return.
select {
case <-c.closed:
- if workerErr != nil {
- err = workerErr
- }
return
- case result := <-resultsCh:
+ default:
+ result := <-resultsCh
results[result.batchID] = result
}
}
@@ -281,34 +248,16 @@ func (c *Memcached) Stop() {
if c.inputCh == nil {
return
}
- c.closeAndStop()
- c.wg.Wait()
-}
-// closeAndStop closes the `inputCh`, `closed` channel and update the `stopped` flag to true.
-// Assumes c.inputCh, c.closed channels are non-nil
-// Go routine safe and idempotent.
-func (c *Memcached) closeAndStop() {
- c.stopped.Do(func() {
- close(c.inputCh)
- close(c.closed)
- })
+ close(c.inputCh)
+ close(c.closed)
+ c.wg.Wait()
}
func (c *Memcached) GetCacheType() stats.CacheType {
return c.cacheType
}
-// Warning: SetTestFetchDelay should be used only for testing.
-// To introduce artifical delay between each batch fetch.
-// Helpful to test if each batch is respecting the `ctx` cancelled or `Stop()` called
-// in-between each batch
-// NOTE: It is exported method instead of internal method because,
-// test's uses `cache.SetTestFetchDelay` due to some cyclic dependencies in this package
-func (c *Memcached) SetTestFetchDelay(ch chan struct{}) {
- c.testFetchDelay = ch
-}
-
// HashKey hashes key into something you can store in memcached.
func HashKey(key string) string {
hasher := fnv.New64a()
diff --git a/pkg/storage/chunk/cache/memcached_client.go b/pkg/storage/chunk/cache/memcached_client.go
index b497b5c5917fd..f05763ba59d13 100644
--- a/pkg/storage/chunk/cache/memcached_client.go
+++ b/pkg/storage/chunk/cache/memcached_client.go
@@ -3,9 +3,9 @@ package cache
import (
"context"
"flag"
- "fmt"
"net"
"sort"
+ "strconv"
"strings"
"sync"
"time"
@@ -254,7 +254,7 @@ func (c *memcachedClient) updateMemcacheServers() error {
return err
}
for _, srv := range addrs {
- servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port))
+ servers = append(servers, net.JoinHostPort(srv.Target, strconv.Itoa(int(srv.Port))))
}
}
diff --git a/pkg/storage/chunk/cache/memcached_client_test.go b/pkg/storage/chunk/cache/memcached_client_test.go
index a6b5e868ff01d..d1b2008230b53 100644
--- a/pkg/storage/chunk/cache/memcached_client_test.go
+++ b/pkg/storage/chunk/cache/memcached_client_test.go
@@ -9,8 +9,6 @@ import (
type mockMemcache struct {
sync.RWMutex
contents map[string][]byte
-
- keysFetchedCount int
}
func newMockMemcache() *mockMemcache {
@@ -22,7 +20,6 @@ func newMockMemcache() *mockMemcache {
func (m *mockMemcache) GetMulti(keys []string, _ ...memcache.Option) (map[string]*memcache.Item, error) {
m.RLock()
defer m.RUnlock()
- m.keysFetchedCount += len(keys)
result := map[string]*memcache.Item{}
for _, k := range keys {
if c, ok := m.contents[k]; ok {
diff --git a/pkg/storage/chunk/cache/memcached_test.go b/pkg/storage/chunk/cache/memcached_test.go
index 6c62876cc6d80..4082c331a10e0 100644
--- a/pkg/storage/chunk/cache/memcached_test.go
+++ b/pkg/storage/chunk/cache/memcached_test.go
@@ -4,10 +4,8 @@ import (
"context"
"errors"
"fmt"
- "strconv"
"sync"
"testing"
- "time"
"github.com/go-kit/log"
"github.com/grafana/gomemcache/memcache"
@@ -19,178 +17,44 @@ import (
)
func TestMemcached_fetchKeysBatched(t *testing.T) {
- // This test checks for three things
+ // This test checks for two things
// 1. `c.inputCh` is closed when `c.Stop()` is triggered
// 2. Once `c.inputCh` is closed, no one should be writing to `c.inputCh` (thus shouldn't panic with "send to closed channel")
- // 3. Once the `ctx` is cancelled or timeout, it should stop fetching the keys.
- t.Run("inputCh is closed without panics when Stop() is triggered", func(t *testing.T) {
- client := newMockMemcache()
- m := cache.NewMemcached(cache.MemcachedConfig{
- BatchSize: 10,
- Parallelism: 5,
- }, client, "test", nil, log.NewNopLogger(), "test")
-
- var (
- wg sync.WaitGroup
- wait = make(chan struct{}) // chan to make goroutine wait till `m.Stop()` is called.
- ctx = context.Background()
- )
-
- wg.Add(1)
-
- // This goroutine is going to do some real "work" (writing to `c.inputCh`). We then do `m.Stop()` closing `c.inputCh`. We assert there shouldn't be any panics.
- go func() {
- defer wg.Done()
- <-wait
- assert.NotPanics(t, func() {
- keys := []string{"1", "2"}
- bufs := [][]byte{[]byte("1"), []byte("2")}
- err := m.Store(ctx, keys, bufs)
- require.NoError(t, err)
-
- _, _, _, err = m.Fetch(ctx, keys) // will try to write to `intputChan` and shouldn't panic
- require.NoError(t, err)
-
- })
- }()
-
- m.Stop()
- close(wait)
-
- wg.Wait()
-
- })
-
- t.Run("stop fetching when context cancelled", func(t *testing.T) {
- client := newMockMemcache()
- m := cache.NewMemcached(cache.MemcachedConfig{
- BatchSize: 10,
- Parallelism: 5,
- }, client, "test", nil, log.NewNopLogger(), "test")
+ client := newMockMemcache()
+ m := cache.NewMemcached(cache.MemcachedConfig{
+ BatchSize: 10,
+ Parallelism: 5,
+ }, client, "test", nil, log.NewNopLogger(), "test")
- var (
- wg sync.WaitGroup
- wait = make(chan struct{})
- ctx, ctxCancel = context.WithCancel(context.Background())
- )
-
- wg.Add(1)
-
- // This goroutine is going to do some real "work" (writing to `c.inputCh`). We then cancel passed context closing `c.inputCh`.
- // We assert there shouldn't be any panics and it stopped fetching keys.
- go func() {
- defer wg.Done()
- assert.NotPanics(t, func() {
- keys := []string{"1", "2"}
- bufs := [][]byte{[]byte("1"), []byte("2")}
- err := m.Store(ctx, keys, bufs)
- require.NoError(t, err)
- <-wait // wait before fetching
- _, _, _, err = m.Fetch(ctx, keys) // will try to write to `intputChan` and shouldn't panic
- require.ErrorIs(t, err, context.Canceled)
- })
- }()
-
- ctxCancel() // cancel even before single fetch is done.
- close(wait) // start the fetching
- wg.Wait()
- require.Equal(t, 0, client.keysFetchedCount) // client.GetMulti shouldn't have called because context is cancelled before.
- m.Stop() // cancelation and Stop() should be able to work.
+ var (
+ wg sync.WaitGroup
+ stopped = make(chan struct{}) // chan to make goroutine wait till `m.Stop()` is called.
+ ctx = context.Background()
+ )
- })
+ wg.Add(1)
- t.Run("stop fetching in-between, when context cancelled", func(t *testing.T) {
- client := newMockMemcache()
- m := cache.NewMemcached(cache.MemcachedConfig{
- BatchSize: 2, // Less batch size to create interleving between each batch
- Parallelism: 3, // means it starts 3 go routines to fetch whatever number of keys we give, fetching 2 keys in each fetch.
- }, client, "test", nil, log.NewNopLogger(), "test")
+ // This goroutine is going to do some real "work" (writing to `c.inputCh`). We then do `m.Stop()` closing `c.inputCh`. We assert there shouldn't be any panics.
+ go func() {
+ defer wg.Done()
+ <-stopped
+ assert.NotPanics(t, func() {
+ keys := []string{"1", "2"}
+ bufs := [][]byte{[]byte("1"), []byte("2")}
+ err := m.Store(ctx, keys, bufs)
+ require.NoError(t, err)
- var (
- wg sync.WaitGroup
- wait = make(chan struct{})
- ctx, ctxCancel = context.WithCancel(context.Background())
- numKeys = 1500
- waitBeforeFetch = 100 * time.Millisecond
- delayFetch = make(chan struct{})
- )
-
- m.SetTestFetchDelay(delayFetch)
- wg.Add(1)
-
- // This goroutine is going to do some real "work" (writing to `c.inputCh`). We then cancel passed context closing `c.inputCh`.
- // We assert there shouldn't be any panics and it stopped fetching keys.
- go func() {
- defer wg.Done()
- assert.NotPanics(t, func() {
- // these many keys, because we have
- // BatchSize: 2 and Paralleslism: 3
- // it starts 3 go routines to fetch 15 keys in total, fetching 2 keys in each fetch.
- keys, values := genKeysValues(numKeys)
- err := m.Store(ctx, keys, values)
- require.NoError(t, err)
- <-wait // wait before fetching
- _, _, _, err = m.Fetch(ctx, keys) // will try to write to `intputChan` and shouldn't panic
- require.ErrorIs(t, err, context.Canceled)
- })
- }()
-
- close(wait) // start the fetching
-
- go func() {
- // this waits for at least one batch fetch and cancel after fetching begins
- time.Sleep(waitBeforeFetch)
- close(delayFetch) // should have fetched **at least** one batch after closing this.
- ctxCancel()
- }()
-
- wg.Wait()
- require.NotEqual(t, client.keysFetchedCount, 0) // should have fetched some keys.
- require.Less(t, client.keysFetchedCount, numKeys) // but not all the keys because ctx cancelled in-between.
- m.Stop()
+ _, _, _, err = m.Fetch(ctx, keys) // will try to write to `intputChan` and shouldn't panic
+ require.NoError(t, err)
- })
+ })
+ }()
- t.Run("stop fetching when context timeout", func(t *testing.T) {
- cancelTimeout := 100 * time.Millisecond
+ m.Stop()
+ close(stopped)
- client := newMockMemcache()
- m := cache.NewMemcached(cache.MemcachedConfig{
- BatchSize: 10,
- Parallelism: 5,
- }, client, "test", nil, log.NewNopLogger(), "test")
-
- var (
- wg sync.WaitGroup
- wait = make(chan struct{})
- ctx, ctxCancel = context.WithTimeout(context.Background(), cancelTimeout)
- )
- wg.Add(1)
-
- // This goroutine is going to do some real "work" (writing to `c.inputCh`). We then wait till context timeout happens closing `c.inputCh`.
- // We assert there shouldn't be any panics and it stopped fetching keys.
- go func() {
- defer wg.Done()
- assert.NotPanics(t, func() {
- keys := []string{"1", "2"}
- bufs := [][]byte{[]byte("1"), []byte("2")}
- err := m.Store(ctx, keys, bufs)
- require.NoError(t, err)
- <-wait // wait before fetching
- _, _, _, err = m.Fetch(ctx, keys) // will try to write to `intputChan` and shouldn't panic
- require.ErrorIs(t, err, context.DeadlineExceeded)
- })
- }()
-
- time.Sleep(cancelTimeout + (5 * time.Millisecond)) // wait till context timeout
- close(wait) // start the fetching
- wg.Wait()
- require.Equal(t, 0, client.keysFetchedCount) // client.GetMulti shouldn't have called because context is timedout before.
- m.Stop() // cancelation and Stop() should be able to work.
- ctxCancel() // finally cancel context for cleanup sake
-
- })
+ wg.Wait()
}
func TestMemcached(t *testing.T) {
@@ -337,17 +201,3 @@ func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) {
}
}
}
-
-// generate `n` keys values with numerical value from 1-n (inclusive)
-func genKeysValues(n int) ([]string, [][]byte) {
- keys := make([]string, 0, n)
- values := make([][]byte, 0, n)
-
- for i := 0; i < n; i++ {
- s := strconv.Itoa(i + 1)
- keys = append(keys, s)
- values = append(values, []byte(s))
- }
-
- return keys, values
-}
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 6781dbbff8a3b..9de581958e9a8 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -6,6 +6,8 @@ import (
"math"
"time"
+ lokilog "github.com/grafana/loki/pkg/logql/log"
+
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@@ -57,10 +59,17 @@ type SchemaConfigProvider interface {
GetSchemaConfigs() []config.PeriodConfig
}
+type Instrumentable interface {
+ SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper)
+
+ SetPipelineWrapper(wrapper lokilog.PipelineWrapper)
+}
+
type Store interface {
stores.Store
SelectStore
SchemaConfigProvider
+ Instrumentable
}
type LokiStore struct {
@@ -84,6 +93,8 @@ type LokiStore struct {
logger log.Logger
chunkFilterer chunk.RequestChunkFilterer
+ extractorWrapper lokilog.SampleExtractorWrapper
+ pipelineWrapper lokilog.PipelineWrapper
congestionControllerFactory func(cfg congestion.Config, logger log.Logger, metrics *congestion.Metrics) congestion.Controller
metricsNamespace string
@@ -381,6 +392,14 @@ func (s *LokiStore) SetChunkFilterer(chunkFilterer chunk.RequestChunkFilterer) {
s.Store.SetChunkFilterer(chunkFilterer)
}
+func (s *LokiStore) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
+ s.extractorWrapper = wrapper
+}
+
+func (s *LokiStore) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
+ s.pipelineWrapper = wrapper
+}
+
// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries`
func (s *LokiStore) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*LazyChunk, error) {
userID, err := tenant.TenantID(ctx)
@@ -493,6 +512,10 @@ func (s *LokiStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) (
return nil, err
}
+ if s.pipelineWrapper != nil {
+ pipeline = s.pipelineWrapper.Wrap(pipeline, expr.String())
+ }
+
var chunkFilterer chunk.Filterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
@@ -531,6 +554,10 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, err
}
+ if s.extractorWrapper != nil {
+ extractor = s.extractorWrapper.Wrap(extractor, expr.String())
+ }
+
var chunkFilterer chunk.Filterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go
index fe0f1245be412..5d9243b30b191 100644
--- a/pkg/storage/store_test.go
+++ b/pkg/storage/store_test.go
@@ -14,6 +14,8 @@ import (
"testing"
"time"
+ lokilog "github.com/grafana/loki/pkg/logql/log"
+
"github.com/cespare/xxhash/v2"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
@@ -894,6 +896,162 @@ func Test_ChunkFilterer(t *testing.T) {
}
}
+func Test_PipelineWrapper(t *testing.T) {
+ s := &LokiStore{
+ Store: storeFixture,
+ cfg: Config{
+ MaxChunkBatchSize: 10,
+ },
+ chunkMetrics: NilMetrics,
+ }
+ wrapper := &testPipelineWrapper{
+ pipeline: newMockPipeline(),
+ }
+
+ s.SetPipelineWrapper(wrapper)
+ ctx = user.InjectOrgID(context.Background(), "test-user")
+ logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)})
+ if err != nil {
+ t.Errorf("store.SelectLogs() error = %v", err)
+ return
+ }
+ defer logit.Close()
+ for logit.Next() {
+ require.NoError(t, logit.Error()) // consume the iterator
+ }
+
+ require.Equal(t, "{foo=~\"ba.*\"}", wrapper.query)
+ require.Equal(t, 28, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
+}
+
+type testPipelineWrapper struct {
+ query string
+ pipeline *mockPipeline
+}
+
+func (t *testPipelineWrapper) Wrap(pipeline lokilog.Pipeline, query string) lokilog.Pipeline {
+ t.query = query
+ t.pipeline.wrappedExtractor = pipeline
+ return t.pipeline
+}
+
+func newMockPipeline() *mockPipeline {
+ return &mockPipeline{
+ sp: &mockStreamPipeline{},
+ }
+}
+
+type mockPipeline struct {
+ wrappedExtractor lokilog.Pipeline
+ sp *mockStreamPipeline
+}
+
+func (p *mockPipeline) ForStream(l labels.Labels) lokilog.StreamPipeline {
+ sp := p.wrappedExtractor.ForStream(l)
+ p.sp.wrappedSP = sp
+ return p.sp
+}
+
+func (p *mockPipeline) Reset() {}
+
+// A stub always returns the same data
+type mockStreamPipeline struct {
+ wrappedSP lokilog.StreamPipeline
+ called int
+}
+
+func (p *mockStreamPipeline) BaseLabels() lokilog.LabelsResult {
+ return p.wrappedSP.BaseLabels()
+}
+
+func (p *mockStreamPipeline) Process(ts int64, line []byte, lbs ...labels.Label) ([]byte, lokilog.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.Process(ts, line, lbs...)
+}
+
+func (p *mockStreamPipeline) ProcessString(ts int64, line string, lbs ...labels.Label) (string, lokilog.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.ProcessString(ts, line, lbs...)
+}
+
+func Test_SampleWrapper(t *testing.T) {
+ s := &LokiStore{
+ Store: storeFixture,
+ cfg: Config{
+ MaxChunkBatchSize: 10,
+ },
+ chunkMetrics: NilMetrics,
+ }
+ wrapper := &testExtractorWrapper{
+ extractor: newMockExtractor(),
+ }
+ s.SetExtractorWrapper(wrapper)
+
+ ctx = user.InjectOrgID(context.Background(), "test-user")
+ it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil)})
+ if err != nil {
+ t.Errorf("store.SelectSamples() error = %v", err)
+ return
+ }
+ defer it.Close()
+ for it.Next() {
+ require.NoError(t, it.Error()) // consume the iterator
+ }
+
+ require.Equal(t, "count_over_time({foo=~\"ba.*\"}[1s])", wrapper.query)
+ require.Equal(t, 28, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
+}
+
+type testExtractorWrapper struct {
+ query string
+ extractor *mockExtractor
+}
+
+func (t *testExtractorWrapper) Wrap(extractor lokilog.SampleExtractor, query string) lokilog.SampleExtractor {
+ t.query = query
+ t.extractor.wrappedExtractor = extractor
+ return t.extractor
+}
+
+func newMockExtractor() *mockExtractor {
+ return &mockExtractor{
+ sp: &mockStreamExtractor{},
+ }
+}
+
+type mockExtractor struct {
+ wrappedExtractor lokilog.SampleExtractor
+ sp *mockStreamExtractor
+}
+
+func (p *mockExtractor) ForStream(l labels.Labels) lokilog.StreamSampleExtractor {
+ sp := p.wrappedExtractor.ForStream(l)
+ p.sp.wrappedSP = sp
+ return p.sp
+}
+
+func (p *mockExtractor) Reset() {}
+
+// A stub always returns the same data
+type mockStreamExtractor struct {
+ wrappedSP lokilog.StreamSampleExtractor
+ called int
+}
+
+func (p *mockStreamExtractor) BaseLabels() lokilog.LabelsResult {
+ return p.wrappedSP.BaseLabels()
+}
+
+func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) (float64, lokilog.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.Process(ts, line, lbs...)
+}
+
+func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) (float64, lokilog.LabelsResult, bool) {
+ p.called++
+ return p.wrappedSP.ProcessString(ts, line, lbs...)
+}
+
func Test_store_GetSeries(t *testing.T) {
periodConfig := config.PeriodConfig{
From: config.DayTime{Time: 0},
diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md
index 96bebdf5aebc9..626523e1bae4d 100644
--- a/production/helm/loki/CHANGELOG.md
+++ b/production/helm/loki/CHANGELOG.md
@@ -13,6 +13,14 @@ Entries should include a reference to the pull request that introduced the chang
[//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.)
+## 5.41.4
+
+- [CHANGE] Use `/ingester/shutdown?terminate=false` for write `preStop` hook
+
+## 5.41.3
+
+- [FEATURE] Add support for defining an s3 backoff config.
+
## 5.41.2
- [FEATURE] Add ciliumnetworkpolicies.
diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml
index fc7e0fbacbc6e..095e2745a364a 100644
--- a/production/helm/loki/Chart.yaml
+++ b/production/helm/loki/Chart.yaml
@@ -3,7 +3,7 @@ name: loki
description: Helm chart for Grafana Loki in simple, scalable mode
type: application
appVersion: 2.9.3
-version: 5.41.2
+version: 5.41.4
home: https://grafana.github.io/helm-charts
sources:
- https://github.com/grafana/loki
diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md
index e1da365b5bf92..2857f553e13f7 100644
--- a/production/helm/loki/README.md
+++ b/production/helm/loki/README.md
@@ -1,6 +1,6 @@
# loki
-![Version: 5.41.2](https://img.shields.io/badge/Version-5.41.2-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square)
+![Version: 5.41.4](https://img.shields.io/badge/Version-5.41.4-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square)
Helm chart for Grafana Loki in simple, scalable mode
diff --git a/production/helm/loki/templates/_helpers.tpl b/production/helm/loki/templates/_helpers.tpl
index eb3bf470a6313..08e4dd063babe 100644
--- a/production/helm/loki/templates/_helpers.tpl
+++ b/production/helm/loki/templates/_helpers.tpl
@@ -245,7 +245,20 @@ s3:
ca_file: {{ . }}
{{- end}}
{{- end }}
+ {{- with .backoff_config}}
+ backoff_config:
+ {{- with .min_period }}
+ min_period: {{ . }}
+ {{- end}}
+ {{- with .max_period }}
+ max_period: {{ . }}
+ {{- end}}
+ {{- with .max_retries }}
+ max_retries: {{ . }}
+ {{- end}}
+ {{- end }}
{{- end -}}
+
{{- else if eq .Values.loki.storage.type "gcs" -}}
{{- with .Values.loki.storage.gcs }}
gcs:
diff --git a/production/helm/loki/templates/write/statefulset-write.yaml b/production/helm/loki/templates/write/statefulset-write.yaml
index 8c5e426d3ffd6..ca67038a16192 100644
--- a/production/helm/loki/templates/write/statefulset-write.yaml
+++ b/production/helm/loki/templates/write/statefulset-write.yaml
@@ -119,7 +119,7 @@ spec:
lifecycle:
preStop:
httpGet:
- path: "/ingester/flush_shutdown"
+ path: "/ingester/shutdown?terminate=false"
port: http-metrics
{{- end }}
volumeMounts:
diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml
index e82967a4efb3b..b8c09ee76465b 100644
--- a/production/helm/loki/values.yaml
+++ b/production/helm/loki/values.yaml
@@ -285,6 +285,8 @@ loki:
s3ForcePathStyle: false
insecure: false
http_config: {}
+ # -- Check https://grafana.com/docs/loki/latest/configure/#s3_storage_config for more info on how to provide a backoff_config
+ backoff_config: {}
gcs:
chunkBufferSize: 0
requestTimeout: "0s"
diff --git a/production/ksonnet/loki/bloom-compactor.libsonnet b/production/ksonnet/loki/bloom-compactor.libsonnet
new file mode 100644
index 0000000000000..d8c5e862fa106
--- /dev/null
+++ b/production/ksonnet/loki/bloom-compactor.libsonnet
@@ -0,0 +1,125 @@
+{
+ local k = import 'ksonnet-util/kausal.libsonnet',
+ local container = k.core.v1.container,
+ local containerPort = k.core.v1.containerPort,
+ local pvc = k.core.v1.persistentVolumeClaim,
+ local service = k.core.v1.service,
+ local statefulSet = k.apps.v1.statefulSet,
+ local volume = k.core.v1.volume,
+ local volumeMount = k.core.v1.volumeMount,
+
+ local name = 'bloom-compactor',
+
+ _config+:: {
+ bloom_compactor+: {
+ // number of replicas
+ replicas: if $._config.use_bloom_filters then 3 else 0,
+ // PVC config
+ pvc_size: if $._config.use_bloom_filters then error 'bloom_compactor.pvc_size needs to be defined' else '',
+ pvc_class: if $._config.use_bloom_filters then error 'bloom_compactor.pvc_class needs to be defined' else '',
+ },
+ loki+:
+ if $._config.use_bloom_filters
+ then
+ {
+ bloom_compactor: {
+ enabled: true,
+ working_directory: '/data/blooms',
+ compaction_interval: '15m',
+ max_compaction_parallelism: 1,
+ },
+ }
+ else {},
+ },
+
+ local cfg = self._config.bloom_compactor,
+
+ local volumeName = name + '-data',
+ local volumeMounts = [volumeMount.new(volumeName, '/data')],
+
+ bloom_compactor_args::
+ if $._config.use_bloom_filters
+ then
+ $._config.commonArgs {
+ target: 'bloom-compactor',
+ }
+ else {},
+
+ bloom_compactor_ports:: [
+ containerPort.new(name='http-metrics', port=$._config.http_listen_port),
+ containerPort.new(name='grpc', port=9095),
+ ],
+
+ bloom_compactor_data_pvc::
+ if $._config.use_bloom_filters
+ then
+ pvc.new(volumeName)
+ // set disk size
+ + pvc.mixin.spec.resources.withRequests({ storage: $._config.bloom_compactor.pvc_size })
+ // mount the volume as read-write by a single node
+ + pvc.mixin.spec.withAccessModes(['ReadWriteOnce'])
+ // set persistent volume storage class
+ + pvc.mixin.spec.withStorageClassName($._config.bloom_compactor.pvc_class)
+ else {},
+
+
+ bloom_compactor_container::
+ if $._config.use_bloom_filters
+ then
+ container.new(name, $._images.bloom_compactor)
+ // add default ports
+ + container.withPorts($.bloom_compactor_ports)
+ // add target specific CLI arguments
+ + container.withArgsMixin(k.util.mapToFlags($.bloom_compactor_args))
+ // mount the data pvc at given mountpoint
+ + container.withVolumeMountsMixin(volumeMounts)
+ // add globale environment variables
+ + container.withEnvMixin($._config.commonEnvs)
+ // add HTTP readiness probe
+ + container.mixin.readinessProbe.httpGet.withPath('/ready')
+ + container.mixin.readinessProbe.httpGet.withPort($._config.http_listen_port)
+ + container.mixin.readinessProbe.withTimeoutSeconds(1)
+ // define container resource requests
+ + k.util.resourcesRequests('2', '4Gi')
+ // define container resource limits
+ + k.util.resourcesLimits(null, '8Gi')
+ else {},
+
+ bloom_compactor_statefulset:
+ if $._config.use_bloom_filters
+ then
+ statefulSet.new(name, cfg.replicas, [$.bloom_compactor_container], $.bloom_compactor_data_pvc)
+ // add clusterIP service
+ + statefulSet.mixin.spec.withServiceName(name)
+ // perform rolling update when statefulset configuration changes
+ + statefulSet.mixin.spec.updateStrategy.withType('RollingUpdate')
+ // launch or terminate pods in parallel, *does not* affect upgrades
+ + statefulSet.mixin.spec.withPodManagementPolicy('Parallel')
+ // 10001 is the user/group ID assigned to Loki in the Dockerfile
+ + statefulSet.mixin.spec.template.spec.securityContext.withRunAsUser(10001)
+ + statefulSet.mixin.spec.template.spec.securityContext.withRunAsGroup(10001)
+ + statefulSet.mixin.spec.template.spec.securityContext.withFsGroup(10001)
+ // ensure statefulset is updated when loki config changes
+ + $.config_hash_mixin
+ // ensure no other workloads are scheduled
+ + k.util.antiAffinity
+ // mount the loki config.yaml
+ + k.util.configVolumeMount('loki', '/etc/loki/config')
+ // mount the runtime overrides.yaml
+ + k.util.configVolumeMount('overrides', '/etc/loki/overrides')
+ else {},
+
+ bloom_compactor_service:
+ if $._config.use_bloom_filters
+ then
+ k.util.serviceFor($.bloom_compactor_statefulset, $._config.service_ignored_labels)
+ else {},
+
+ bloom_compactor_headless_service:
+ if $._config.use_bloom_filters
+ then
+ k.util.serviceFor($.bloom_compactor_statefulset, $._config.service_ignored_labels)
+ + service.mixin.metadata.withName(name + '-headless')
+ + service.mixin.spec.withClusterIp('None')
+ else {},
+}
diff --git a/production/ksonnet/loki/bloom-gateway.libsonnet b/production/ksonnet/loki/bloom-gateway.libsonnet
new file mode 100644
index 0000000000000..925b4bb3217fd
--- /dev/null
+++ b/production/ksonnet/loki/bloom-gateway.libsonnet
@@ -0,0 +1,168 @@
+{
+ local k = import 'ksonnet-util/kausal.libsonnet',
+ local container = k.core.v1.container,
+ local containerPort = k.core.v1.containerPort,
+ local pvc = k.core.v1.persistentVolumeClaim,
+ local service = k.core.v1.service,
+ local statefulSet = k.apps.v1.statefulSet,
+ local volume = k.core.v1.volume,
+ local volumeMount = k.core.v1.volumeMount,
+
+ local name = 'bloom-gateway',
+
+ _config+:: {
+ bloom_gateway+: {
+ // number of replicas
+ replicas: if $._config.use_bloom_filters then 3 else 0,
+ // if true, the host needs to have local SSD disks mounted, otherwise PVCs are used
+ use_local_ssd: false,
+ // PVC config
+ pvc_size: if !self.use_local_ssd then error 'bloom_gateway.pvc_size needs to be defined when using PVC' else '',
+ pvc_class: if !self.use_local_ssd then error 'bloom_gateway.pvc_class needs to be defined when using PVC' else '',
+ // local SSD config
+ hostpath: if self.use_local_ssd then error 'bloom_gateway.hostpath needs to be defined when using local SSDs' else '',
+ node_selector: if self.use_local_ssd then error 'bloom_gateway.node_selector needs to be defined when using local SSDs' else {},
+ tolerations: if self.use_local_ssd then error 'bloom_gateway.tolerations needs to be defined when using local SSDs' else [],
+ },
+ loki+:
+ if $._config.use_bloom_filters
+ then
+ {
+ bloom_gateway+: {
+ enabled: true,
+ worker_concurrency: 8,
+ replication_factor: 3,
+ client: {
+ cache_results: false,
+ },
+ },
+ storage+: {
+ bloom_shipper+: {
+ working_directory: '/data/blooms',
+ blocks_downloading_queue: {
+ workers_count: 10,
+ },
+ blocks_cache: {
+ enabled: true,
+ max_size_mb: error 'set bloom_shipper.blocks_cache.max_size_mb to ~80% of available disk size',
+ ttl: 3600 * 24, // 24h
+ },
+ },
+ },
+ }
+ else {},
+ },
+
+ local cfg = self._config.bloom_gateway,
+
+ local volumeName = name + '-data',
+
+ local volumes =
+ if cfg.use_local_ssd
+ then [volume.fromHostPath(volumeName, cfg.hostpath)]
+ else [],
+
+ local volumeMounts = [
+ volumeMount.new(volumeName, '/data'),
+ ],
+
+ bloom_gateway_args::
+ if $._config.use_bloom_filters
+ then
+ $._config.commonArgs {
+ target: 'bloom-gateway',
+ }
+ else {},
+
+ bloom_gateway_ports:: [
+ containerPort.new(name='http-metrics', port=$._config.http_listen_port),
+ containerPort.new(name='grpc', port=9095),
+ ],
+
+ bloom_gateway_data_pvc::
+ if $._config.use_bloom_filters && !cfg.use_local_ssd
+ then
+ pvc.new(volumeName)
+ // set disk size
+ + pvc.mixin.spec.resources.withRequests({ storage: $._config.bloom_gateway.pvc_size })
+ // mount the volume as read-write by a single node
+ + pvc.mixin.spec.withAccessModes(['ReadWriteOnce'])
+ // set persistent volume storage class
+ + pvc.mixin.spec.withStorageClassName($._config.bloom_compactor.pvc_class)
+ else
+ null,
+
+ bloom_gateway_container::
+ if $._config.use_bloom_filters
+ then
+ container.new(name, $._images.bloom_gateway)
+ // add default ports
+ + container.withPorts($.bloom_gateway_ports)
+ // add target specific CLI arguments
+ + container.withArgsMixin(k.util.mapToFlags($.bloom_gateway_args))
+ // mount local SSD or PVC
+ + container.withVolumeMountsMixin(volumeMounts)
+ // add globale environment variables
+ + container.withEnvMixin($._config.commonEnvs)
+ // add HTTP readiness probe
+ + container.mixin.readinessProbe.httpGet.withPath('/ready')
+ + container.mixin.readinessProbe.httpGet.withPort($._config.http_listen_port)
+ + container.mixin.readinessProbe.withTimeoutSeconds(1)
+ // define container resource requests
+ + k.util.resourcesRequests('2', '4Gi')
+ // define container resource limits
+ + k.util.resourcesLimits(null, '8Gi')
+ else {},
+
+ bloom_gateway_statefulset:
+ if $._config.use_bloom_filters
+ then
+ statefulSet.new(name, cfg.replicas, [$.bloom_gateway_container])
+ // add clusterIP service
+ + statefulSet.mixin.spec.withServiceName(name)
+ // perform rolling update when statefulset configuration changes
+ + statefulSet.mixin.spec.updateStrategy.withType('RollingUpdate')
+ // launch or terminate pods in parallel, *does not* affect upgrades
+ + statefulSet.mixin.spec.withPodManagementPolicy('Parallel')
+ // 10001 is the user/group ID assigned to Loki in the Dockerfile
+ + statefulSet.mixin.spec.template.spec.securityContext.withRunAsUser(10001)
+ + statefulSet.mixin.spec.template.spec.securityContext.withRunAsGroup(10001)
+ + statefulSet.mixin.spec.template.spec.securityContext.withFsGroup(10001)
+ // ensure statefulset is updated when loki config changes
+ + $.config_hash_mixin
+ // ensure no other workloads are scheduled
+ + k.util.antiAffinity
+ // mount the loki config.yaml
+ + k.util.configVolumeMount('loki', '/etc/loki/config')
+ // mount the runtime overrides.yaml
+ + k.util.configVolumeMount('overrides', '/etc/loki/overrides')
+ // configuration specific to SSD/PVC usage
+ + (
+ if cfg.use_local_ssd
+ then
+ // ensure the pod is scheduled on a node with local SSDs if needed
+ statefulSet.mixin.spec.template.spec.withNodeSelector(cfg.node_selector)
+ // tolerate the local-ssd taint
+ + statefulSet.mixin.spec.template.spec.withTolerationsMixin(cfg.tolerations)
+ // mount the local SSDs
+ + statefulSet.mixin.spec.template.spec.withVolumesMixin(volumes)
+ else
+ // create persistent volume claim
+ statefulSet.mixin.spec.withVolumeClaimTemplates([$.bloom_gateway_data_pvc])
+ )
+ else {},
+
+ bloom_gateway_service:
+ if $._config.use_bloom_filters
+ then
+ k.util.serviceFor($.bloom_gateway_statefulset, $._config.service_ignored_labels)
+ else {},
+
+ bloom_gateway_headless_service:
+ if $._config.use_bloom_filters
+ then
+ k.util.serviceFor($.bloom_gateway_statefulset, $._config.service_ignored_labels)
+ + service.mixin.metadata.withName(name + '-headless')
+ + service.mixin.spec.withClusterIp('None')
+ else {},
+}
diff --git a/production/ksonnet/loki/bloomfilters.libsonnet b/production/ksonnet/loki/bloomfilters.libsonnet
new file mode 100644
index 0000000000000..78231a808e1a0
--- /dev/null
+++ b/production/ksonnet/loki/bloomfilters.libsonnet
@@ -0,0 +1,8 @@
+{
+ _config+:: {
+ // globally enable/disable bloom gateway and bloom compactor
+ use_bloom_filters: false,
+ },
+}
++ (import 'bloom-compactor.libsonnet')
++ (import 'bloom-gateway.libsonnet')
diff --git a/production/ksonnet/loki/images.libsonnet b/production/ksonnet/loki/images.libsonnet
index 5cb79554ac1dc..0dc2bbe105ce9 100644
--- a/production/ksonnet/loki/images.libsonnet
+++ b/production/ksonnet/loki/images.libsonnet
@@ -16,5 +16,7 @@
compactor:: self.loki,
index_gateway:: self.loki,
overrides_exporter:: self.loki,
+ bloom_gateway:: self.loki,
+ bloom_compactor:: self.loki,
},
}
diff --git a/production/ksonnet/loki/loki.libsonnet b/production/ksonnet/loki/loki.libsonnet
index ad0489a69cd3f..871a68025e990 100644
--- a/production/ksonnet/loki/loki.libsonnet
+++ b/production/ksonnet/loki/loki.libsonnet
@@ -26,6 +26,9 @@
// BoltDB and TSDB Shipper support. Anything that modifies the compactor must be imported after this.
(import 'shipper.libsonnet') +
+// Accelerated search using bloom filters
+(import 'bloomfilters.libsonnet') +
+
(import 'table-manager.libsonnet') +
// Multi-zone ingester related config