From 25785e03d5f529443ecef331e176999308cc65b0 Mon Sep 17 00:00:00 2001 From: Derek Cadzow Date: Tue, 13 Feb 2024 15:20:48 -0500 Subject: [PATCH 1/6] Deleting old and irrelevant information (#11929) --- docs/variables.mk | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/variables.mk b/docs/variables.mk index afa0a9e867366..1ec7dbab57677 100644 --- a/docs/variables.mk +++ b/docs/variables.mk @@ -1,8 +1,5 @@ # List of projects to provide to the make-docs script. PROJECTS := loki -# Use alternative image until make-docs 3.0.0 is rolled out. -export DOCS_IMAGE := grafana/docs-base:dbd975af06 - # Set the DOC_VALIDATOR_IMAGE to match the one defined in CI. export DOC_VALIDATOR_IMAGE := $(shell sed -En 's, *image: "(grafana/doc-validator.*)",\1,p' "$(shell git rev-parse --show-toplevel)/.github/workflows/doc-validator.yml") From 1c43991ddcbb801a6a6d7a535062c7c615b0423a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 14 Feb 2024 09:49:11 +0100 Subject: [PATCH 2/6] Remove `bloomcompactor.DayTable` in favour of `config.DayTime` (#11917) Both structs shared the same semantics. This PR moves additional functionality from the `DayTable` to the `DayTime` struct. To get the table name of a day (ordinal number of day since unix epoch), call `DayTime.Table()`. Signed-off-by: Christian Haudum --- pkg/bloomcompactor/bloomcompactor.go | 15 +++---- pkg/bloomcompactor/config.go | 38 ---------------- pkg/bloomcompactor/controller.go | 13 +++--- pkg/bloomcompactor/tsdb.go | 34 +++++++------- pkg/bloomgateway/bloomgateway.go | 2 +- pkg/bloomgateway/multiplexing.go | 7 +-- pkg/bloomgateway/processor.go | 10 ++--- pkg/bloomgateway/processor_test.go | 5 ++- pkg/bloomgateway/util.go | 5 ++- pkg/bloomgateway/util_test.go | 12 ++--- pkg/querier/queryrange/limits.go | 4 +- pkg/storage/config/schema_config.go | 45 ++++++++++++++++++- .../stores/shipper/bloomshipper/client.go | 2 +- 13 files changed, 98 insertions(+), 94 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index ed1f50ae72582..566b836609d10 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -169,11 +169,11 @@ func runWithRetries( type tenantTable struct { tenant string - table DayTable + table config.DayTime ownershipRange v1.FingerprintBounds } -func (c *Compactor) tenants(ctx context.Context, table DayTable) (v1.Iterator[string], error) { +func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) { tenants, err := c.tsdbStore.UsersForPeriod(ctx, table) if err != nil { return nil, errors.Wrap(err, "getting tenants") @@ -214,10 +214,9 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator { from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod) through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod) - fromDay := DayTable(model.TimeFromUnixNano(from)) - throughDay := DayTable(model.TimeFromUnixNano(through)) + fromDay := config.NewDayTime(model.TimeFromUnixNano(from)) + throughDay := config.NewDayTime(model.TimeFromUnixNano(through)) return newDayRangeIterator(fromDay, throughDay) - } func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { @@ -295,10 +294,10 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro } type dayRangeIterator struct { - min, max, cur DayTable + min, max, cur config.DayTime } -func newDayRangeIterator(min, max DayTable) *dayRangeIterator { +func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator { return &dayRangeIterator{min: min, max: max, cur: min.Dec()} } @@ -307,7 +306,7 @@ func (r *dayRangeIterator) Next() bool { return r.cur.Before(r.max) } -func (r *dayRangeIterator) At() DayTable { +func (r *dayRangeIterator) At() config.DayTime { return r.cur } diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index dd821d81c906b..15f9aa86c040f 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -5,10 +5,6 @@ import ( "fmt" "time" - "github.com/prometheus/common/model" - - "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" "github.com/grafana/loki/pkg/util/ring" ) @@ -70,37 +66,3 @@ type Limits interface { BloomFalsePositiveRate(tenantID string) float64 BloomCompactorMaxBlockSize(tenantID string) int } - -// TODO(owen-d): Remove this type in favor of config.DayTime -type DayTable model.Time - -func (d DayTable) String() string { - return fmt.Sprintf("%d", d.ModelTime().Time().UnixNano()/int64(config.ObjectStorageIndexRequiredPeriod)) -} - -func (d DayTable) Inc() DayTable { - return DayTable(d.ModelTime().Add(config.ObjectStorageIndexRequiredPeriod)) -} - -func (d DayTable) Dec() DayTable { - return DayTable(d.ModelTime().Add(-config.ObjectStorageIndexRequiredPeriod)) -} - -func (d DayTable) Before(other DayTable) bool { - return d.ModelTime().Before(model.Time(other)) -} - -func (d DayTable) After(other DayTable) bool { - return d.ModelTime().After(model.Time(other)) -} - -func (d DayTable) ModelTime() model.Time { - return model.Time(d) -} - -func (d DayTable) Bounds() bloomshipper.Interval { - return bloomshipper.Interval{ - Start: model.Time(d), - End: model.Time(d.Inc()), - } -} diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 47d9627d92e1a..8470fd9ad7082 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" ) @@ -55,7 +56,7 @@ func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) { func (s *SimpleBloomController) buildBlocks( ctx context.Context, - table DayTable, + table config.DayTime, tenant string, ownershipRange v1.FingerprintBounds, ) error { @@ -78,15 +79,11 @@ func (s *SimpleBloomController) buildBlocks( } // 2. Fetch metas - bounds := table.Bounds() metas, err := s.bloomStore.FetchMetas( ctx, bloomshipper.MetaSearchParams{ TenantID: tenant, - Interval: bloomshipper.Interval{ - Start: bounds.Start, - End: bounds.End, - }, + Interval: bloomshipper.NewInterval(table.Bounds()), Keyspace: ownershipRange, }, ) @@ -176,7 +173,7 @@ func (s *SimpleBloomController) buildBlocks( blockCt++ blk := newBlocks.At() - built, err := bloomshipper.BlockFrom(tenant, table.String(), blk) + built, err := bloomshipper.BlockFrom(tenant, table.Table(), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) return errors.Wrap(err, "failed to build block") @@ -214,7 +211,7 @@ func (s *SimpleBloomController) buildBlocks( func (s *SimpleBloomController) loadWorkForGap( ctx context.Context, - table DayTable, + table config.DayTime, tenant string, id tsdb.Identifier, gap gapWithBlocks, diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index e6fd92961c46c..ad7b2eafac4cd 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -26,11 +26,11 @@ const ( ) type TSDBStore interface { - UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) - ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) + UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) + ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) LoadTSDB( ctx context.Context, - table DayTable, + table config.DayTime, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, @@ -49,13 +49,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore { } } -func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) { - _, users, err := b.storage.ListFiles(ctx, table.String(), true) // bypass cache for ease of testing +func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) { + _, users, err := b.storage.ListFiles(ctx, table.Table(), true) // bypass cache for ease of testing return users, err } -func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { - indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, true) // bypass cache for ease of testing +func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + indices, err := b.storage.ListUserFiles(ctx, table.Table(), tenant, true) // bypass cache for ease of testing if err != nil { return nil, errors.Wrap(err, "failed to list user files") } @@ -80,14 +80,14 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenan func (b *BloomTSDBStore) LoadTSDB( ctx context.Context, - table DayTable, + table config.DayTime, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, ) (v1.CloseableIterator[*v1.Series], error) { withCompression := id.Name() + gzipExtension - data, err := b.storage.GetUserFile(ctx, table.String(), tenant, withCompression) + data, err := b.storage.GetUserFile(ctx, table.Table(), tenant, withCompression) if err != nil { return nil, errors.Wrap(err, "failed to get file") } @@ -244,11 +244,11 @@ func NewTSDBStores( return res, nil } -func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) { +func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) { for i := len(s.schemaCfg.Configs) - 1; i >= 0; i-- { period := s.schemaCfg.Configs[i] - if !table.Before(DayTable(period.From.Time)) { + if !table.Before(period.From) { // we have the desired period config if s.stores[i] != nil { @@ -260,19 +260,19 @@ func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) { return nil, errors.Errorf( "store for period is not of TSDB type (%s) while looking up store for (%v)", period.IndexType, - table.ModelTime().Time(), + table, ) } } return nil, fmt.Errorf( - "There is no store matching no matching period found for table (%v) -- too early", - table.ModelTime().Time(), + "there is no store matching no matching period found for table (%v) -- too early", + table, ) } -func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) { +func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) { store, err := s.storeForPeriod(table) if err != nil { return nil, err @@ -281,7 +281,7 @@ func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]stri return store.UsersForPeriod(ctx, table) } -func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { +func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { store, err := s.storeForPeriod(table) if err != nil { return nil, err @@ -292,7 +292,7 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant st func (s *TSDBStores) LoadTSDB( ctx context.Context, - table DayTable, + table config.DayTime, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index abecbf6773fd3..58f709f0be2f8 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -369,7 +369,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk tasksCh := make(chan Task, len(tasks)) for _, task := range tasks { task := task - level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "day", task.day, "series", len(task.series)) + level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series)) g.queue.Enqueue(tenantID, []string{}, task, func() { // When enqueuing, we also add the task to the pending tasks g.pendingTasks.Add(task.ID, task) diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index d2722ad8f1496..c952c9f6b87fd 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/config" ) const ( @@ -69,7 +70,7 @@ type Task struct { ctx context.Context // TODO(chaudum): Investigate how to remove that. - day model.Time + table config.DayTime } // NewTask returns a new Task that can be enqueued to the task queue. @@ -89,7 +90,7 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithBounds, filter filters: filters, series: refs.series, bounds: refs.bounds, - day: refs.day, + table: refs.table, ctx: ctx, done: make(chan struct{}), responses: make([]v1.Output, 0, len(refs.series)), @@ -129,7 +130,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { filters: t.filters, series: series, bounds: t.bounds, - day: t.day, + table: t.table, ctx: t.ctx, done: make(chan struct{}), responses: make([]v1.Output, 0, len(series)), diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 4fe9c38483cbe..5eab7a858c74b 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -7,9 +7,9 @@ import ( "time" "github.com/go-kit/log" - "github.com/prometheus/common/model" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) @@ -35,10 +35,9 @@ type processor struct { } func (p *processor) run(ctx context.Context, tasks []Task) error { - for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) { - interval := bloomshipper.NewInterval(ts, ts.Add(Day)) + for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) { tenant := tasks[0].Tenant - err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks) + err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks) if err != nil { for _, task := range tasks { task.CloseWithError(err) @@ -52,8 +51,9 @@ func (p *processor) run(ctx context.Context, tasks []Task) error { return nil } -func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []v1.FingerprintBounds, tasks []Task) error { +func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error { minFpRange, maxFpRange := getFirstLast(keyspaces) + interval := bloomshipper.NewInterval(day.Bounds()) metaSearch := bloomshipper.MetaSearchParams{ TenantID: tenant, Interval: interval, diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index c4c8f8457b3a1..27d0068753d5b 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -15,6 +15,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/util/constants" ) @@ -109,7 +110,7 @@ func TestProcessor(t *testing.T) { Start: now.Add(-1 * time.Hour), End: now, }, - day: truncateDay(now), + table: config.NewDayTime(truncateDay(now)), } filters := []syntax.LineFilter{ {Ty: 0, Match: "no match"}, @@ -153,7 +154,7 @@ func TestProcessor(t *testing.T) { Start: now.Add(-1 * time.Hour), End: now, }, - day: truncateDay(now), + table: config.NewDayTime(truncateDay(now)), } filters := []syntax.LineFilter{ {Ty: 0, Match: "no match"}, diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index cf72aec3b5b4b..3793076f7c385 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) @@ -121,7 +122,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re type seriesWithBounds struct { bounds model.Interval - day model.Time + table config.DayTime series []*logproto.GroupedChunkRefs } @@ -173,7 +174,7 @@ func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds { Start: minTs, End: maxTs, }, - day: day, + table: config.NewDayTime(day), series: res, }) } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 8fc37f20bac8e..5f4d254e8f045 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -176,7 +176,7 @@ func TestPartitionRequest(t *testing.T) { exp: []seriesWithBounds{ { bounds: model.Interval{Start: ts.Add(-60 * time.Minute), End: ts.Add(-45 * time.Minute)}, - day: mktime("2024-01-24 00:00"), + table: config.NewDayTime(mktime("2024-01-24 00:00")), series: []*logproto.GroupedChunkRefs{ { Fingerprint: 0x00, @@ -217,7 +217,7 @@ func TestPartitionRequest(t *testing.T) { exp: []seriesWithBounds{ { bounds: model.Interval{Start: ts.Add(-23 * time.Hour), End: ts.Add(-22 * time.Hour)}, - day: mktime("2024-01-23 00:00"), + table: config.NewDayTime(mktime("2024-01-23 00:00")), series: []*logproto.GroupedChunkRefs{ { Fingerprint: 0x00, @@ -229,7 +229,7 @@ func TestPartitionRequest(t *testing.T) { }, { bounds: model.Interval{Start: ts.Add(-2 * time.Hour), End: ts.Add(-1 * time.Hour)}, - day: mktime("2024-01-24 00:00"), + table: config.NewDayTime(mktime("2024-01-24 00:00")), series: []*logproto.GroupedChunkRefs{ { Fingerprint: 0x01, @@ -258,7 +258,7 @@ func TestPartitionRequest(t *testing.T) { exp: []seriesWithBounds{ { bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)}, - day: mktime("2024-01-23 00:00"), + table: config.NewDayTime(mktime("2024-01-23 00:00")), series: []*logproto.GroupedChunkRefs{ { Fingerprint: 0x00, @@ -270,7 +270,7 @@ func TestPartitionRequest(t *testing.T) { }, { bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)}, - day: mktime("2024-01-24 00:00"), + table: config.NewDayTime(mktime("2024-01-24 00:00")), series: []*logproto.GroupedChunkRefs{ { Fingerprint: 0x00, @@ -311,7 +311,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, } ref := bloomshipper.Ref{ TenantID: tenant, - TableName: "table_0", + TableName: config.NewDayTime(truncateDay(from)).Table(), Bounds: v1.NewBounds(fromFp, throughFp), StartTimestamp: from, EndTimestamp: through, diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 79cc9ad16a36d..2d14531909695 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -575,7 +575,7 @@ func WeightedParallelism( // config because query is in future // or // there is overlap with current config - finalOrFuture := i == len(configs)-1 || configs[i].From.After(end) + finalOrFuture := i == len(configs)-1 || configs[i].From.Time.After(end) if finalOrFuture { return true } @@ -605,7 +605,7 @@ func WeightedParallelism( var tsdbDur, otherDur time.Duration - for ; i < len(configs) && configs[i].From.Before(end); i++ { + for ; i < len(configs) && configs[i].From.Time.Before(end); i++ { _, from := minMaxModelTime(start, configs[i].From.Time) through := end if i+1 < len(configs) { diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index 9cdda249ea520..30b9de98b14ba 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -200,6 +200,14 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa } } +func (cfg *PeriodConfig) GetFullTableName(t model.Time) string { + return NewDayTime(t).TableWithPrefix(cfg) +} + +func NewDayTime(d model.Time) DayTime { + return DayTime{d} +} + // DayTime is a model.Time what holds day-aligned values, and marshals to/from // YAML in YYYY-MM-DD format. type DayTime struct { @@ -225,10 +233,45 @@ func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -func (d *DayTime) String() string { +func (d DayTime) String() string { return d.Time.Time().UTC().Format("2006-01-02") } +func (d DayTime) Table() string { + return fmt.Sprintf("%d", + d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) +} + +func (d DayTime) TableWithPrefix(cfg *PeriodConfig) string { + return fmt.Sprintf("%s%d", + cfg.IndexTables.Prefix, + d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) +} + +func (d DayTime) Inc() DayTime { + return DayTime{d.Add(ObjectStorageIndexRequiredPeriod)} +} + +func (d DayTime) Dec() DayTime { + return DayTime{d.Add(-ObjectStorageIndexRequiredPeriod)} +} + +func (d DayTime) Before(other DayTime) bool { + return d.Time.Before(other.Time) +} + +func (d DayTime) After(other DayTime) bool { + return d.Time.After(other.Time) +} + +func (d DayTime) ModelTime() model.Time { + return d.Time +} + +func (d DayTime) Bounds() (model.Time, model.Time) { + return d.Time, d.Inc().Time +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 80eba70d18cdb..1dbfac579c5aa 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -345,7 +345,7 @@ func (b *BloomClient) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) { func findPeriod(configs []config.PeriodConfig, ts model.Time) (config.DayTime, error) { for i := len(configs) - 1; i >= 0; i-- { periodConfig := configs[i] - if !periodConfig.From.After(ts) { + if !periodConfig.From.Time.After(ts) { return periodConfig.From, nil } } From 1b4d23f9b754458a311ff82fd6b1d52134bcc5e7 Mon Sep 17 00:00:00 2001 From: Justin Burnham Date: Wed, 14 Feb 2024 00:59:50 -0800 Subject: [PATCH 3/6] Ruler: Disable x-scope-orgid header append in remote write (#11819) Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Co-authored-by: Danny Kopping --- CHANGELOG.md | 1 + docs/sources/configure/_index.md | 4 ++++ pkg/ruler/config.go | 2 ++ pkg/ruler/registry.go | 6 +++-- pkg/ruler/registry_test.go | 39 ++++++++++++++++++++++++++++++++ 5 files changed, 50 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60b9e3dc5e2c8..68841de451dba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests. * [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation. * [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester * [11477](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index d5dd9b43bd146..382890b5bcab7 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1274,6 +1274,10 @@ remote_write: # CLI flag: -ruler.remote-write.config-refresh-period [config_refresh_period: | default = 10s] + # Add X-Scope-OrgID header in remote write requests. + # CLI flag: -ruler.remote-write.add-org-id-header + [add_org_id_header: | default = true] + # Configuration for rule evaluation. evaluation: # The evaluation mode for the ruler. Can be either 'local' or 'remote'. If set diff --git a/pkg/ruler/config.go b/pkg/ruler/config.go index 22a19851a4305..7d948baa0c30d 100644 --- a/pkg/ruler/config.go +++ b/pkg/ruler/config.go @@ -56,6 +56,7 @@ type RemoteWriteConfig struct { Clients map[string]config.RemoteWriteConfig `yaml:"clients,omitempty" doc:"description=Configure remote write clients. A map with remote client id as key."` Enabled bool `yaml:"enabled"` ConfigRefreshPeriod time.Duration `yaml:"config_refresh_period"` + AddOrgIDHeader bool `yaml:"add_org_id_header" doc:"description=Add X-Scope-OrgID header in remote write requests."` } func (c *RemoteWriteConfig) Validate() error { @@ -108,6 +109,7 @@ func (c *RemoteWriteConfig) Clone() (*RemoteWriteConfig, error) { // RegisterFlags adds the flags required to config this to the given FlagSet. func (c *RemoteWriteConfig) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&c.AddOrgIDHeader, "ruler.remote-write.add-org-id-header", true, "Add X-Scope-OrgID header in remote write requests.") f.BoolVar(&c.Enabled, "ruler.remote-write.enabled", false, "Enable remote-write functionality.") f.DurationVar(&c.ConfigRefreshPeriod, "ruler.remote-write.config-refresh-period", 10*time.Second, "Minimum period to wait between refreshing remote-write reconfigurations. This should be greater than or equivalent to -limits.per-user-override-period.") diff --git a/pkg/ruler/registry.go b/pkg/ruler/registry.go index adb4f7cf8667b..90a68d60c90b1 100644 --- a/pkg/ruler/registry.go +++ b/pkg/ruler/registry.go @@ -212,8 +212,10 @@ func (r *walRegistry) getTenantConfig(tenant string) (instance.Config, error) { } } - // always inject the X-Scope-OrgId header for multi-tenant metrics backends - clt.Headers[user.OrgIDHeaderName] = tenant + if rwCfg.AddOrgIDHeader { + // inject the X-Scope-OrgId header for multi-tenant metrics backends + clt.Headers[user.OrgIDHeaderName] = tenant + } rwCfg.Clients[id] = clt diff --git a/pkg/ruler/registry_test.go b/pkg/ruler/registry_test.go index 9e200e43ad3a2..46ab9a7084576 100644 --- a/pkg/ruler/registry_test.go +++ b/pkg/ruler/registry_test.go @@ -47,6 +47,7 @@ const remote2 = "remote-2" var remoteURL, _ = url.Parse("http://remote-write") var backCompatCfg = Config{ RemoteWrite: RemoteWriteConfig{ + AddOrgIDHeader: true, Client: &config.RemoteWriteConfig{ URL: &promConfig.URL{URL: remoteURL}, QueueConfig: config.QueueConfig{ @@ -105,6 +106,7 @@ var backCompatCfg = Config{ var remoteURL2, _ = url.Parse("http://remote-write2") var cfg = Config{ RemoteWrite: RemoteWriteConfig{ + AddOrgIDHeader: true, Clients: map[string]config.RemoteWriteConfig{ remote1: { URL: &promConfig.URL{URL: remoteURL}, @@ -751,6 +753,43 @@ func TestTenantRemoteWriteHeadersNoOverride(t *testing.T) { assert.ElementsMatch(t, actual, expected, "Headers do not match") } +func TestTenantRemoteWriteHeadersNoOrgIDHeader(t *testing.T) { + backCompatCfg.RemoteWrite.AddOrgIDHeader = false + reg := setupRegistry(t, backCompatCfg, newFakeLimitsBackwardCompat()) + + tenantCfg, err := reg.getTenantConfig(enabledRWTenant) + require.NoError(t, err) + + assert.Len(t, tenantCfg.RemoteWrite[0].Headers, 1) + // ensure that X-Scope-OrgId header is missing + assert.Equal(t, tenantCfg.RemoteWrite[0].Headers[user.OrgIDHeaderName], "") + // the original header must be present + assert.Equal(t, tenantCfg.RemoteWrite[0].Headers["Base"], "value") + + cfg.RemoteWrite.AddOrgIDHeader = false + reg = setupRegistry(t, cfg, newFakeLimits()) + + tenantCfg, err = reg.getTenantConfig(enabledRWTenant) + require.NoError(t, err) + + // Ensure that overrides take plus and that X-Scope-OrgID header is still missing + expected := []map[string]string{ + { + "Base": "value", + }, + { + "Base": "value2", + }, + } + + actual := []map[string]string{} + for _, rw := range tenantCfg.RemoteWrite { + actual = append(actual, rw.Headers) + } + + assert.ElementsMatch(t, actual, expected, "Headers do not match") +} + func TestRelabelConfigOverrides(t *testing.T) { reg := setupRegistry(t, backCompatCfg, newFakeLimitsBackwardCompat()) From 9e7725b31b19792dad692afd9ad7e9804c04bfc1 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Wed, 14 Feb 2024 16:46:38 +0530 Subject: [PATCH 4/6] feat(metadata): introduce a separate split interval for recent query window (#11897) --- CHANGELOG.md | 1 + docs/sources/configure/_index.md | 24 ++ pkg/bloomgateway/cache.go | 1 + pkg/querier/queryrange/index_stats_cache.go | 1 + pkg/querier/queryrange/labels_cache.go | 33 +- pkg/querier/queryrange/labels_cache_test.go | 334 +++++++--------- pkg/querier/queryrange/limits/definitions.go | 2 + .../queryrangebase/results_cache.go | 2 + .../queryrangebase/results_cache_test.go | 3 + pkg/querier/queryrange/roundtrip.go | 3 +- pkg/querier/queryrange/roundtrip_test.go | 52 ++- pkg/querier/queryrange/series_cache.go | 9 +- pkg/querier/queryrange/series_cache_test.go | 366 +++++++---------- .../queryrange/split_by_interval_test.go | 370 +++++++++++++++++- pkg/querier/queryrange/splitters.go | 69 +++- pkg/querier/queryrange/volume_cache.go | 1 + pkg/storage/chunk/cache/resultscache/cache.go | 24 +- .../chunk/cache/resultscache/cache_test.go | 120 +++++- pkg/validation/limits.go | 37 +- 19 files changed, 955 insertions(+), 497 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68841de451dba..7f091ed06f883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ * [11143](https://github.com/grafana/loki/pull/11143) **sandeepsukhani** otel: Add support for per tenant configuration for mapping otlp data to loki format * [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs. * [11817](https://github.com/grafana/loki/pull/11817) **ashwanthgoli** Ruler: Add support for filtering results of `/prometheus/api/v1/rules` endpoint by rule_name, rule_group, file and type. +* [11897](https://github.com/grafana/loki/pull/11897) **ashwanthgoli** Metadata: Introduces a separate split interval of `split_recent_metadata_queries_by_interval` for `recent_metadata_query_window` to help with caching recent metadata query results. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 382890b5bcab7..c30f8da01fa23 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2911,6 +2911,30 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -querier.split-metadata-queries-by-interval [split_metadata_queries_by_interval: | default = 1d] +# Experimental. Split interval to use for the portion of metadata request that +# falls within `recent_metadata_query_window`. Rest of the request which is +# outside the window still uses `split_metadata_queries_by_interval`. If set to +# 0, the entire request defaults to using a split interval of +# `split_metadata_queries_by_interval.`. +# CLI flag: -experimental.querier.split-recent-metadata-queries-by-interval +[split_recent_metadata_queries_by_interval: | default = 1h] + +# Experimental. Metadata query window inside which +# `split_recent_metadata_queries_by_interval` gets applied, portion of the +# metadata request that falls in this window is split using +# `split_recent_metadata_queries_by_interval`. The value 0 disables using a +# different split interval for recent metadata queries. +# +# This is added to improve cacheability of recent metadata queries. Query split +# interval also determines the interval used in cache key. The default split +# interval of 24h is useful for caching long queries, each cache key holding 1 +# day's results. But metadata queries are often shorter than 24h, to cache them +# effectively we need a smaller split interval. `recent_metadata_query_window` +# along with `split_recent_metadata_queries_by_interval` help configure a +# shorter split interval for recent metadata queries. +# CLI flag: -experimental.querier.recent-metadata-query-window +[recent_metadata_query_window: | default = 0s] + # Interval to use for time-based splitting when a request is within the # `query_ingesters_within` window; defaults to `split-queries-by-interval` by # setting to 0. diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go index fe40b87e95488..6c573cb47d6de 100644 --- a/pkg/bloomgateway/cache.go +++ b/pkg/bloomgateway/cache.go @@ -182,6 +182,7 @@ func NewBloomGatewayClientCacheMiddleware( }, cacheGen, retentionEnabled, + false, ) return &ClientCache{ diff --git a/pkg/querier/queryrange/index_stats_cache.go b/pkg/querier/queryrange/index_stats_cache.go index d52f2e22323ff..a91721bf36873 100644 --- a/pkg/querier/queryrange/index_stats_cache.go +++ b/pkg/querier/queryrange/index_stats_cache.go @@ -123,6 +123,7 @@ func NewIndexStatsCacheMiddleware( }, parallelismForReq, retentionEnabled, + false, metrics, ) } diff --git a/pkg/querier/queryrange/labels_cache.go b/pkg/querier/queryrange/labels_cache.go index 66c811490403d..3a940e34fa034 100644 --- a/pkg/querier/queryrange/labels_cache.go +++ b/pkg/querier/queryrange/labels_cache.go @@ -11,21 +11,42 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" - "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" ) type cacheKeyLabels struct { Limits transformer UserIDTransformer - iqo util.IngesterQueryOptions +} + +// metadataSplitIntervalForTimeRange returns split interval for series and label requests. +// If `recent_metadata_query_window` is configured and the query start interval is within this window, +// it returns `split_recent_metadata_queries_by_interval`. +// For other cases, the default split interval of `split_metadata_queries_by_interval` will be used. +func metadataSplitIntervalForTimeRange(limits Limits, tenantIDs []string, ref, start time.Time) time.Duration { + split := validation.MaxDurationOrZeroPerTenant(tenantIDs, limits.MetadataQuerySplitDuration) + + recentMetadataQueryWindow := validation.MaxDurationOrZeroPerTenant(tenantIDs, limits.RecentMetadataQueryWindow) + recentMetadataQuerySplitInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, limits.RecentMetadataQuerySplitDuration) + + // if either of the options are not configured, use the default metadata split interval + if recentMetadataQueryWindow == 0 || recentMetadataQuerySplitInterval == 0 { + return split + } + + // if the query start is not before window start, it would be split using recentMetadataQuerySplitInterval + if windowStart := ref.Add(-recentMetadataQueryWindow); !start.Before(windowStart) { + split = recentMetadataQuerySplitInterval + } + + return split } // GenerateCacheKey generates a cache key based on the userID, split duration and the interval of the request. // It also includes the label name and the provided query for label values request. func (i cacheKeyLabels) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { lr := r.(*LabelRequest) - - split := SplitIntervalForTimeRange(i.iqo, i.Limits, i.MetadataQuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) + split := metadataSplitIntervalForTimeRange(i.Limits, []string{userID}, time.Now().UTC(), r.GetStart().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { @@ -80,7 +101,6 @@ func NewLabelsCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, - iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -90,7 +110,7 @@ func NewLabelsCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( logger, c, - cacheKeyLabels{limits, transformer, iqo}, + cacheKeyLabels{limits, transformer}, limits, merger, labelsExtractor{}, @@ -100,6 +120,7 @@ func NewLabelsCacheMiddleware( }, parallelismForReq, retentionEnabled, + true, metrics, ) } diff --git a/pkg/querier/queryrange/labels_cache_test.go b/pkg/querier/queryrange/labels_cache_test.go index 4c645b8d19ce9..90b85cb1faf82 100644 --- a/pkg/querier/queryrange/labels_cache_test.go +++ b/pkg/querier/queryrange/labels_cache_test.go @@ -70,7 +70,6 @@ func TestLabelsCache(t *testing.T) { cache.NewMockCache(), nil, nil, - nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -82,173 +81,124 @@ func TestLabelsCache(t *testing.T) { return cacheMiddleware } - cacheMiddleware := setupCacheMW() - for _, values := range []bool{false, true} { - prefix := "labels" - if values { - prefix = "label values" - } - t.Run(prefix+": cache the response for the same request", func(t *testing.T) { - start := testTime.Truncate(time.Millisecond) - end := start.Add(time.Hour) - - labelsReq := LabelRequest{ - LabelRequest: logproto.LabelRequest{ - Start: &start, - End: &end, - }, - } - - if values { - labelsReq.Values = true - labelsReq.Name = "foo" - labelsReq.Query = `{cluster="eu-west1"}` - } - - labelsResp := &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"bar", "buzz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, + composeLabelsResp := func(lbls []string, splits int64) *LokiLabelNamesResponse { + return &LokiLabelNamesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: lbls, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: splits, }, - } - - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, labelsReq.GetStart(), r.GetStart()) - require.Equal(t, labelsReq.GetEnd(), r.GetEnd()) - - got := r.(*LabelRequest) - require.Equal(t, labelsReq.GetName(), got.GetName()) - require.Equal(t, labelsReq.GetValues(), got.GetValues()) - require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) - - return labelsResp, nil - })) + }, + } - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, &labelsReq) - require.NoError(t, err) - require.Equal(t, 1, called) // called actual handler, as not cached. - require.Equal(t, labelsResp, got) + } - // Doing same request again shouldn't change anything. - called = 0 - got, err = handler.Do(ctx, &labelsReq) - require.NoError(t, err) - require.Equal(t, 0, called) - require.Equal(t, labelsResp, got) - }) + start := testTime.Truncate(time.Millisecond) + end := start.Add(time.Hour) + labelsReq := &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, } + labelsResp := composeLabelsResp([]string{"bar", "buzz"}, 1) + + var downstreamHandlerFunc func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) + downstreamHandler := &mockDownstreamHandler{fn: func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + return downstreamHandlerFunc(ctx, req) + }} - // reset cacheMiddleware - cacheMiddleware = setupCacheMW() for _, values := range []bool{false, true} { + labelsReq := labelsReq prefix := "labels" + if values { - prefix = "label values" + prefix = "label values: " + labelsReq.Values = true + labelsReq.Name = "foo" + labelsReq.Query = `{cluster="eu-west1"}` } - t.Run(prefix+": a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { - cacheMiddleware := setupCacheMW() - - start := testTime.Truncate(time.Millisecond) - end := start.Add(time.Hour) - labelsReq1 := LabelRequest{ - LabelRequest: logproto.LabelRequest{ - Start: &start, - End: &end, - }, - } - - if values { - labelsReq1.Values = true - labelsReq1.Name = "foo" - labelsReq1.Query = `{cluster="eu-west1"}` - } - - labelsResp1 := &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"bar", "buzz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - } + for _, tc := range []struct { + name string + req queryrangebase.Request + expectedQueryStart, expectedQueryEnd time.Time + downstreamResponse *LokiLabelNamesResponse + downstreamCalls int + expectedReponse *LokiLabelNamesResponse + }{ + { + name: "return cached response for the same request", + downstreamCalls: 0, + expectedReponse: labelsResp, + req: labelsReq, + }, + { + name: "a new request with overlapping time range should reuse results of the previous request", + req: labelsReq.WithStartEnd(labelsReq.GetStart(), labelsReq.GetEnd().Add(15*time.Minute)), + expectedQueryStart: labelsReq.GetEnd(), + expectedQueryEnd: labelsReq.GetEnd().Add(15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeLabelsResp([]string{"fizz"}, 1), + expectedReponse: composeLabelsResp([]string{"bar", "buzz", "fizz"}, 2), + }, + { + // To avoid returning incorrect results, we only use extents that are entirely within the requested query range. + name: "cached response not entirely within the requested range", + req: labelsReq.WithStartEnd(labelsReq.GetStart().Add(15*time.Minute), labelsReq.GetEnd().Add(-15*time.Minute)), + expectedQueryStart: labelsReq.GetStart().Add(15 * time.Minute), + expectedQueryEnd: labelsReq.GetEnd().Add(-15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeLabelsResp([]string{"buzz", "fizz"}, 1), + expectedReponse: composeLabelsResp([]string{"buzz", "fizz"}, 1), + }, + } { + t.Run(prefix+tc.name, func(t *testing.T) { + cacheMiddleware := setupCacheMW() + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, labelsReq.GetStart(), r.GetStart()) + require.Equal(t, labelsReq.GetEnd(), r.GetEnd()) + + got := r.(*LabelRequest) + require.Equal(t, labelsReq.GetName(), got.GetName()) + require.Equal(t, labelsReq.GetValues(), got.GetValues()) + require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) + + return labelsResp, nil + } - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ + handler := cacheMiddleware.Wrap(downstreamHandler) - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, labelsReq1.GetStart(), r.GetStart()) - require.Equal(t, labelsReq1.GetEnd(), r.GetEnd()) + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, labelsReq) + require.NoError(t, err) + require.Equal(t, 1, downstreamHandler.Called()) // call downstream handler, as not cached. + require.Equal(t, labelsResp, got) - got := r.(*LabelRequest) - require.Equal(t, labelsReq1.GetName(), got.GetName()) - require.Equal(t, labelsReq1.GetValues(), got.GetValues()) - require.Equal(t, labelsReq1.GetQuery(), got.GetQuery()) + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + require.Equal(t, tc.expectedQueryStart, r.GetStart()) + require.Equal(t, tc.expectedQueryEnd, r.GetEnd()) - return labelsResp1, nil - })) + got := r.(*LabelRequest) + require.Equal(t, labelsReq.GetName(), got.GetName()) + require.Equal(t, labelsReq.GetValues(), got.GetValues()) + require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, &labelsReq1) - require.NoError(t, err) - require.Equal(t, 1, called) - require.Equal(t, labelsResp1, got) + return tc.downstreamResponse, nil + } - labelsReq2 := labelsReq1.WithStartEnd(labelsReq1.GetStart().Add(15*time.Minute), labelsReq1.GetEnd().Add(15*time.Minute)) + got, err = handler.Do(ctx, tc.req) + require.NoError(t, err) + require.Equal(t, tc.downstreamCalls, downstreamHandler.Called()) + require.Equal(t, tc.expectedReponse, got) - called = 0 - handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // make downstream request only for the non-overlapping portion of the query. - require.Equal(t, labelsReq1.GetEnd(), r.GetStart()) - require.Equal(t, labelsReq2.GetEnd(), r.GetEnd()) - - got := r.(*LabelRequest) - require.Equal(t, labelsReq1.GetName(), got.GetName()) - require.Equal(t, labelsReq1.GetValues(), got.GetValues()) - require.Equal(t, labelsReq1.GetQuery(), got.GetQuery()) - - return &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"fizz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - }, nil - })) - - got, err = handler.Do(ctx, labelsReq2) - require.NoError(t, err) - require.Equal(t, 1, called) - // two splits as we merge the results from the extent and downstream request - labelsResp1.Statistics.Summary.Splits = 2 - require.Equal(t, &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"bar", "buzz", "fizz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 2, - }, - }, - }, got) - }) + }) + } } } @@ -310,7 +260,6 @@ func TestLabelCache_freshness(t *testing.T) { cache.NewMockCache(), nil, nil, - nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -363,67 +312,46 @@ func TestLabelCache_freshness(t *testing.T) { func TestLabelQueryCacheKey(t *testing.T) { const ( - defaultTenant = "a" - alternateTenant = "b" - defaultSplit = time.Hour - ingesterSplit = 90 * time.Minute - ingesterQueryWindow = defaultSplit * 3 + defaultSplit = time.Hour + recentMetadataSplitDuration = 30 * time.Minute + recentMetadataQueryWindow = time.Hour ) l := fakeLimits{ - metadataSplitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, - ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + metadataSplitDuration: map[string]time.Duration{tenantID: defaultSplit}, + recentMetadataSplitDuration: map[string]time.Duration{tenantID: recentMetadataSplitDuration}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: recentMetadataQueryWindow}, } cases := []struct { - name, tenantID string - start, end time.Time - expectedSplit time.Duration - iqo util.IngesterQueryOptions - values bool + name string + start, end time.Time + expectedSplit time.Duration + values bool + limits Limits }{ { - name: "outside ingester query window", - tenantID: defaultTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-5 * time.Hour), + name: "outside recent metadata query window", + start: time.Now().Add(-3 * time.Hour), + end: time.Now().Add(-2 * time.Hour), expectedSplit: defaultSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: false, - }, - }, - { - name: "within ingester query window", - tenantID: defaultTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-ingesterQueryWindow / 2), - expectedSplit: ingesterSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: false, - }, + limits: l, }, { - name: "within ingester query window, but query store only", - tenantID: defaultTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-ingesterQueryWindow / 2), - expectedSplit: defaultSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: true, - }, + name: "within recent metadata query window", + start: time.Now().Add(-30 * time.Minute), + end: time.Now(), + expectedSplit: recentMetadataSplitDuration, + limits: l, }, { - name: "within ingester query window, but no ingester split duration configured", - tenantID: alternateTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-ingesterQueryWindow / 2), + name: "within recent metadata query window, but recent split duration is not configured", + start: time.Now().Add(-30 * time.Minute), + end: time.Now(), expectedSplit: defaultSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: false, + limits: fakeLimits{ + metadataSplitDuration: map[string]time.Duration{tenantID: defaultSplit}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: recentMetadataQueryWindow}, }, }, } @@ -431,7 +359,7 @@ func TestLabelQueryCacheKey(t *testing.T) { for _, values := range []bool{true, false} { for _, tc := range cases { t.Run(fmt.Sprintf("%s (values: %v)", tc.name, values), func(t *testing.T) { - keyGen := cacheKeyLabels{l, nil, tc.iqo} + keyGen := cacheKeyLabels{tc.limits, nil} r := &LabelRequest{ LabelRequest: logproto.LabelRequest{ @@ -453,12 +381,12 @@ func TestLabelQueryCacheKey(t *testing.T) { // and therefore we can't know the current interval apriori without duplicating the logic var pattern *regexp.Regexp if values { - pattern = regexp.MustCompile(fmt.Sprintf(`labelvalues:%s:%s:%s:(\d+):%d`, tc.tenantID, labelName, regexp.QuoteMeta(query), tc.expectedSplit)) + pattern = regexp.MustCompile(fmt.Sprintf(`labelvalues:%s:%s:%s:(\d+):%d`, tenantID, labelName, regexp.QuoteMeta(query), tc.expectedSplit)) } else { - pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:(\d+):%d`, tc.tenantID, tc.expectedSplit)) + pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:(\d+):%d`, tenantID, tc.expectedSplit)) } - require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tenantID, r)) }) } } diff --git a/pkg/querier/queryrange/limits/definitions.go b/pkg/querier/queryrange/limits/definitions.go index e12255883bf4a..3e78b34420760 100644 --- a/pkg/querier/queryrange/limits/definitions.go +++ b/pkg/querier/queryrange/limits/definitions.go @@ -15,6 +15,8 @@ type Limits interface { logql.Limits QuerySplitDuration(string) time.Duration MetadataQuerySplitDuration(string) time.Duration + RecentMetadataQuerySplitDuration(string) time.Duration + RecentMetadataQueryWindow(string) time.Duration IngesterQuerySplitDuration(string) time.Duration MaxQuerySeries(context.Context, string) int MaxEntriesLimitPerQuery(context.Context, string) int diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 097dc264d32a0..3511fe0b7dd30 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -127,6 +127,7 @@ func NewResultsCacheMiddleware( shouldCache ShouldCacheFn, parallelismForReq ParallelismForReqFn, retentionEnabled bool, + onlyUseEntireExtent bool, metrics *ResultsCacheMetrics, ) (Middleware, error) { if cacheGenNumberLoader != nil { @@ -172,6 +173,7 @@ func NewResultsCacheMiddleware( parallelismForReqWrapper, cacheGenNumberLoader, retentionEnabled, + onlyUseEntireExtent, ) return out diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index ff5e5be09a48f..6706e6a2d9fa7 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -422,6 +422,7 @@ func TestResultsCache(t *testing.T) { return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, + false, nil, ) require.NoError(t, err) @@ -468,6 +469,7 @@ func TestResultsCacheRecent(t *testing.T) { return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, + false, nil, ) require.NoError(t, err) @@ -578,6 +580,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, + false, nil, ) require.NoError(t, err) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 8223704eea021..10246f4d8277e 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -514,7 +514,6 @@ func NewSeriesTripperware( merger, c, cacheGenNumLoader, - iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -600,7 +599,6 @@ func NewLabelsTripperware( merger, c, cacheGenNumLoader, - iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -679,6 +677,7 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge ) }, retentionEnabled, + false, metrics.ResultsCacheMetrics, ) if err != nil { diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index c7c7cff4595a1..7d74b0dd615c8 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1237,24 +1237,26 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) { } type fakeLimits struct { - maxQueryLength time.Duration - maxQueryParallelism int - tsdbMaxQueryParallelism int - maxQueryLookback time.Duration - maxEntriesLimitPerQuery int - maxSeries int - splitDuration map[string]time.Duration - metadataSplitDuration map[string]time.Duration - ingesterSplitDuration map[string]time.Duration - minShardingLookback time.Duration - queryTimeout time.Duration - requiredLabels []string - requiredNumberLabels int - maxQueryBytesRead int - maxQuerierBytesRead int - maxStatsCacheFreshness time.Duration - maxMetadataCacheFreshness time.Duration - volumeEnabled bool + maxQueryLength time.Duration + maxQueryParallelism int + tsdbMaxQueryParallelism int + maxQueryLookback time.Duration + maxEntriesLimitPerQuery int + maxSeries int + splitDuration map[string]time.Duration + metadataSplitDuration map[string]time.Duration + recentMetadataSplitDuration map[string]time.Duration + recentMetadataQueryWindow map[string]time.Duration + ingesterSplitDuration map[string]time.Duration + minShardingLookback time.Duration + queryTimeout time.Duration + requiredLabels []string + requiredNumberLabels int + maxQueryBytesRead int + maxQuerierBytesRead int + maxStatsCacheFreshness time.Duration + maxMetadataCacheFreshness time.Duration + volumeEnabled bool } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -1271,6 +1273,20 @@ func (f fakeLimits) MetadataQuerySplitDuration(key string) time.Duration { return f.metadataSplitDuration[key] } +func (f fakeLimits) RecentMetadataQuerySplitDuration(key string) time.Duration { + if f.recentMetadataSplitDuration == nil { + return 0 + } + return f.recentMetadataSplitDuration[key] +} + +func (f fakeLimits) RecentMetadataQueryWindow(key string) time.Duration { + if f.recentMetadataQueryWindow == nil { + return 0 + } + return f.recentMetadataQueryWindow[key] +} + func (f fakeLimits) IngesterQuerySplitDuration(key string) time.Duration { if f.ingesterSplitDuration == nil { return 0 diff --git a/pkg/querier/queryrange/series_cache.go b/pkg/querier/queryrange/series_cache.go index bbbf96e2dd70e..5120d61fb0b4f 100644 --- a/pkg/querier/queryrange/series_cache.go +++ b/pkg/querier/queryrange/series_cache.go @@ -17,21 +17,18 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/validation" ) type cacheKeySeries struct { Limits transformer UserIDTransformer - iqo util.IngesterQueryOptions } // GenerateCacheKey generates a cache key based on the userID, matchers, split duration and the interval of the request. func (i cacheKeySeries) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { sr := r.(*LokiSeriesRequest) - - split := SplitIntervalForTimeRange(i.iqo, i.Limits, i.MetadataQuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) + split := metadataSplitIntervalForTimeRange(i.Limits, []string{userID}, time.Now().UTC(), r.GetStart().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { @@ -87,7 +84,6 @@ func NewSeriesCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, - iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -97,7 +93,7 @@ func NewSeriesCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( logger, c, - cacheKeySeries{limits, transformer, iqo}, + cacheKeySeries{limits, transformer}, limits, merger, seriesExtractor{}, @@ -107,6 +103,7 @@ func NewSeriesCacheMiddleware( }, parallelismForReq, retentionEnabled, + true, metrics, ) } diff --git a/pkg/querier/queryrange/series_cache_test.go b/pkg/querier/queryrange/series_cache_test.go index d73efa9deea87..6ba869a69411a 100644 --- a/pkg/querier/queryrange/series_cache_test.go +++ b/pkg/querier/queryrange/series_cache_test.go @@ -78,7 +78,6 @@ func TestSeriesCache(t *testing.T) { cache.NewMockCache(), nil, nil, - nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -91,195 +90,135 @@ func TestSeriesCache(t *testing.T) { return cacheMiddleware } - t.Run("caches the response for the same request", func(t *testing.T) { - cacheMiddleware := setupCacheMW() - from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) - - seriesReq := &LokiSeriesRequest{ - StartTs: from.Time(), - EndTs: through.Time(), - Match: []string{`{namespace=~".*"}`}, - Path: seriesAPIPath, + composeSeriesResp := func(series [][]logproto.SeriesIdentifier_LabelsEntry, splits int64) *LokiSeriesResponse { + var data []logproto.SeriesIdentifier + for _, v := range series { + data = append(data, logproto.SeriesIdentifier{Labels: v}) } - seriesResp := &LokiSeriesResponse{ + return &LokiSeriesResponse{ Status: "success", Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - }, + Data: data, Statistics: stats.Result{ Summary: stats.Summary{ - Splits: 1, + Splits: splits, }, }, } + } - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, seriesReq.GetStart(), r.GetStart()) - require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) - - return seriesResp, nil - })) - - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, seriesReq) - require.NoError(t, err) - require.Equal(t, 1, called) // called actual handler, as not cached. - require.Equal(t, seriesResp, got) - - // Doing same request again shouldn't change anything. - called = 0 - got, err = handler.Do(ctx, seriesReq) - require.NoError(t, err) - require.Equal(t, 0, called) - require.Equal(t, seriesResp, got) - }) - - t.Run("a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { - cacheMiddleware := setupCacheMW() - - from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) - req1 := &LokiSeriesRequest{ - StartTs: from.Time(), - EndTs: through.Time(), - Match: []string{`{namespace=~".*"}`}, - Path: seriesAPIPath, - } - resp1 := &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, - }, - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - } - - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, req1.GetStart(), r.GetStart()) - require.Equal(t, req1.GetEnd(), r.GetEnd()) + var downstreamHandlerFunc func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) + downstreamHandler := &mockDownstreamHandler{fn: func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + return downstreamHandlerFunc(ctx, req) + }} - return resp1, nil - })) + from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) + seriesReq := &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + } + seriesResp := composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, + {{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + }, 1) + + for _, tc := range []struct { + name string + req queryrangebase.Request + expectedQueryStart, expectedQueryEnd time.Time + downstreamResponse *LokiSeriesResponse + downstreamCalls int + expectedReponse *LokiSeriesResponse + }{ + { + name: "return cached response for the same request", + downstreamCalls: 0, + expectedReponse: seriesResp, + req: seriesReq, + }, + { + name: "a new request with overlapping time range should reuse results of the previous request", + req: seriesReq.WithStartEnd(seriesReq.GetStart(), seriesReq.GetEnd().Add(15*time.Minute)), + expectedQueryStart: seriesReq.GetEnd(), + expectedQueryEnd: seriesReq.GetEnd().Add(15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 1), + expectedReponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, + {{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 2), + }, + { + // To avoid returning incorrect results, we only use extents that are entirely within the requested query range. + name: "cached response not entirely within the requested range", + req: seriesReq.WithStartEnd(seriesReq.GetStart().Add(15*time.Minute), seriesReq.GetEnd().Add(-15*time.Minute)), + expectedQueryStart: seriesReq.GetStart().Add(15 * time.Minute), + expectedQueryEnd: seriesReq.GetEnd().Add(-15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 1), + expectedReponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 1), + }, + } { + t.Run(tc.name, func(t *testing.T) { + cacheMiddleware := setupCacheMW() + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + require.Equal(t, seriesReq.GetStart(), r.GetStart()) + require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, req1) - require.NoError(t, err) - require.Equal(t, 1, called) - require.Equal(t, resp1, got) + return seriesResp, nil + } - req2 := req1.WithStartEnd(req1.GetStart().Add(15*time.Minute), req1.GetEnd().Add(15*time.Minute)) + handler := cacheMiddleware.Wrap(downstreamHandler) - called = 0 - handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, seriesReq) + require.NoError(t, err) + require.Equal(t, 1, downstreamHandler.Called()) // calls downstream handler, as not cached. + require.Equal(t, seriesResp, got) - // make downstream request only for the non-overlapping portion of the query. - require.Equal(t, req1.GetEnd(), r.GetStart()) - require.Equal(t, req1.GetEnd().Add(15*time.Minute), r.GetEnd()) + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + require.Equal(t, tc.expectedQueryStart, r.GetStart()) + require.Equal(t, tc.expectedQueryEnd, r.GetEnd()) - return &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - }, nil - })) + return tc.downstreamResponse, nil + } - got, err = handler.Do(ctx, req2) - require.NoError(t, err) - require.Equal(t, 1, called) - // two splits as we merge the results from the extent and downstream request - resp1.Statistics.Summary.Splits = 2 - require.Equal(t, &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, - }, - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 2, - }, - }, - }, got) - }) + got, err = handler.Do(ctx, tc.req) + require.NoError(t, err) + require.Equal(t, tc.downstreamCalls, downstreamHandler.Called()) + require.Equal(t, tc.expectedReponse, got) + }) + } t.Run("caches are only valid for the same request parameters", func(t *testing.T) { cacheMiddleware := setupCacheMW() - - from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) - seriesReq := &LokiSeriesRequest{ - StartTs: from.Time(), - EndTs: through.Time(), - Match: []string{`{namespace=~".*"}`}, - Path: seriesAPIPath, - } - seriesResp := &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - } - - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length as none of the subsequent queries hit the cache. + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { require.Equal(t, seriesReq.GetStart(), r.GetStart()) require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) + return seriesResp, nil - })) + } + + handler := cacheMiddleware.Wrap(downstreamHandler) // initial call to fill cache ctx := user.InjectOrgID(context.Background(), "fake") _, err := handler.Do(ctx, seriesReq) require.NoError(t, err) - require.Equal(t, 1, called) + require.Equal(t, 1, downstreamHandler.Called()) type testCase struct { fn func(*LokiSeriesRequest) @@ -297,7 +236,7 @@ func TestSeriesCache(t *testing.T) { } for name, tc := range testCases { - called = 0 + downstreamHandler.ResetCount() seriesReq := seriesReq if tc.fn != nil { @@ -310,7 +249,7 @@ func TestSeriesCache(t *testing.T) { _, err = handler.Do(ctx, seriesReq) require.NoError(t, err) - require.Equal(t, 1, called, name) + require.Equal(t, 1, downstreamHandler.Called(), name) } }) } @@ -371,7 +310,6 @@ func TestSeriesCache_freshness(t *testing.T) { cache.NewMockCache(), nil, nil, - nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -428,76 +366,54 @@ func TestSeriesCache_freshness(t *testing.T) { func TestSeriesQueryCacheKey(t *testing.T) { const ( - defaultTenant = "a" - alternateTenant = "b" - defaultSplit = time.Hour - ingesterSplit = 90 * time.Minute - ingesterQueryWindow = defaultSplit * 3 + defaultSplit = time.Hour + recentMetadataSplitDuration = 30 * time.Minute + recentMetadataQueryWindow = time.Hour ) l := fakeLimits{ - metadataSplitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, - ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + metadataSplitDuration: map[string]time.Duration{tenantID: defaultSplit}, + recentMetadataSplitDuration: map[string]time.Duration{tenantID: recentMetadataSplitDuration}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: recentMetadataQueryWindow}, } cases := []struct { - name, tenantID string - start, end time.Time - expectedSplit time.Duration - iqo util.IngesterQueryOptions - values bool + name string + start, end time.Time + expectedSplit time.Duration + values bool + limits Limits }{ { - name: "outside ingester query window", - tenantID: defaultTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-5 * time.Hour), + name: "outside recent metadata query window", + start: time.Now().Add(-3 * time.Hour), + end: time.Now().Add(-2 * time.Hour), expectedSplit: defaultSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: false, - }, + limits: l, }, { - name: "within ingester query window", - tenantID: defaultTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-ingesterQueryWindow / 2), - expectedSplit: ingesterSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: false, - }, + name: "within recent metadata query window", + start: time.Now().Add(-30 * time.Minute), + end: time.Now(), + expectedSplit: recentMetadataSplitDuration, + limits: l, }, { - name: "within ingester query window, but query store only", - tenantID: defaultTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-ingesterQueryWindow / 2), + name: "within recent metadata query window, but recent split duration is not configured", + start: time.Now().Add(-30 * time.Minute), + end: time.Now(), expectedSplit: defaultSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: true, - }, - }, - { - name: "within ingester query window, but no ingester split duration configured", - tenantID: alternateTenant, - start: time.Now().Add(-6 * time.Hour), - end: time.Now().Add(-ingesterQueryWindow / 2), - expectedSplit: defaultSplit, - iqo: ingesterQueryOpts{ - queryIngestersWithin: ingesterQueryWindow, - queryStoreOnly: false, + limits: fakeLimits{ + metadataSplitDuration: map[string]time.Duration{tenantID: defaultSplit}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: recentMetadataQueryWindow}, }, }, } - for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { matchers := []string{`{namespace="prod"}`, `{service="foo"}`} - keyGen := cacheKeySeries{l, nil, tc.iqo} + keyGen := cacheKeySeries{tc.limits, nil} r := &LokiSeriesRequest{ StartTs: tc.start, @@ -508,9 +424,27 @@ func TestSeriesQueryCacheKey(t *testing.T) { // we use regex here because cache key always refers to the current time to get the ingester query window, // and therefore we can't know the current interval apriori without duplicating the logic - pattern := regexp.MustCompile(fmt.Sprintf(`series:%s:%s:(\d+):%d`, tc.tenantID, regexp.QuoteMeta(keyGen.joinMatchers(matchers)), tc.expectedSplit)) + pattern := regexp.MustCompile(fmt.Sprintf(`series:%s:%s:(\d+):%d`, tenantID, regexp.QuoteMeta(keyGen.joinMatchers(matchers)), tc.expectedSplit)) - require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tenantID, r)) }) } } + +type mockDownstreamHandler struct { + called int + fn func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) +} + +func (m *mockDownstreamHandler) Called() int { + return m.called +} + +func (m *mockDownstreamHandler) ResetCount() { + m.called = 0 +} + +func (m *mockDownstreamHandler) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + m.called++ + return m.fn(ctx, req) +} diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index acf8c495becce..c559bad17be1d 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -159,10 +159,12 @@ func Test_splitQuery(t *testing.T) { t.Run(requestType, func(t *testing.T) { for name, intervals := range map[string]struct { - input interval - expected []interval - splitInterval time.Duration - splitter splitter + input interval + expected []interval + expectedWithoutIngesterSplits []interval + splitInterval time.Duration + splitter splitter + recentMetadataQueryWindowEnabled bool }{ "no change": { input: interval{ @@ -255,6 +257,16 @@ func Test_splitQuery(t *testing.T) { end: refTime, }, }, + expectedWithoutIngesterSplits: []interval{ + { + start: refTime.Add(-time.Hour).Truncate(time.Second), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, splitInterval: time.Hour, splitter: newDefaultSplitter( fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, @@ -295,6 +307,32 @@ func Test_splitQuery(t *testing.T) { end: refTime, }, }, + expectedWithoutIngesterSplits: []interval{ + { + start: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute).Truncate(time.Second), + end: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, splitInterval: time.Hour, splitter: newDefaultSplitter( fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, @@ -394,11 +432,63 @@ func Test_splitQuery(t *testing.T) { ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour, queryStoreOnly: true}, ), }, + "metadata recent query window should not affect other query types": { + input: interval{ + start: refTime.Add(-4 * time.Hour).Truncate(time.Second), + end: refTime, + }, + expected: []interval{ + { + start: refTime.Add(-4 * time.Hour).Truncate(time.Second), + end: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ + recentMetadataSplitDuration: map[string]time.Duration{tenantID: 30 * time.Minute}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: 3 * time.Hour}, + }, nil, + ), + recentMetadataQueryWindowEnabled: true, + }, } { t.Run(name, func(t *testing.T) { req := tc.requestBuilderFunc(intervals.input.start, intervals.input.end) var want []queryrangebase.Request - for _, exp := range intervals.expected { + + // ingester splits do not apply for metadata queries + var expected []interval + switch req.(type) { + case *LabelRequest, *LokiSeriesRequest: + expected = intervals.expectedWithoutIngesterSplits + + if intervals.recentMetadataQueryWindowEnabled { + t.Skip("this flow is tested in Test_splitRecentMetadataQuery") + } + } + + if expected == nil { + expected = intervals.expected + } + + for _, exp := range expected { want = append(want, tc.requestBuilderFunc(exp.start, exp.end)) } @@ -412,22 +502,245 @@ func Test_splitQuery(t *testing.T) { splits, err := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval) require.NoError(t, err) - if !assert.Equal(t, want, splits) { - t.Logf("expected and actual do not match\n") - defer t.Fail() + assertSplits(t, want, splits) + }) + } + }) + } +} - if len(want) != len(splits) { - t.Logf("expected %d splits, got %d\n", len(want), len(splits)) - return - } +func Test_splitRecentMetadataQuery(t *testing.T) { + type interval struct { + start, end time.Time + } - for j := 0; j < len(want); j++ { - exp := want[j] - act := splits[j] - equal := assert.Equal(t, exp, act) - t.Logf("\t#%d [matches: %v]: expected %q/%q got %q/%q\n", j, equal, exp.GetStart(), exp.GetEnd(), act.GetStart(), act.GetEnd()) - } + expectedSplitGap := util.SplitGap + + for requestType, tc := range map[string]struct { + requestBuilderFunc func(start, end time.Time) queryrangebase.Request + }{ + "series request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return &LokiSeriesRequest{ + Match: []string{"match1"}, + StartTs: start, + EndTs: end, + Path: "/series", + Shards: []string{"shard1"}, + } + }, + }, + "label names request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return NewLabelRequest(start, end, `{foo="bar"}`, "", "/labels") + }, + }, + "label values request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return NewLabelRequest(start, end, `{foo="bar"}`, "test", "/label/test/values") + }, + }, + } { + t.Run(requestType, func(t *testing.T) { + for name, intervals := range map[string]struct { + input interval + expected []interval + splitInterval time.Duration + splitter splitter + }{ + "wholly within recent metadata query window": { + input: interval{ + start: refTime.Add(-time.Hour), + end: refTime, + }, + expected: []interval{ + { + start: refTime.Add(-time.Hour), + end: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ + recentMetadataSplitDuration: map[string]time.Duration{tenantID: 30 * time.Minute}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: 2 * time.Hour}, + }, nil, + ), + }, + "start aligns with recent metadata query window": { + input: interval{ + start: refTime.Add(-1 * time.Hour), + end: refTime, + }, + expected: []interval{ + { + start: refTime.Add(-time.Hour), + end: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ + recentMetadataSplitDuration: map[string]time.Duration{tenantID: 30 * time.Minute}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: 1 * time.Hour}, + }, nil, + ), + }, + "partially within recent metadata query window": { + input: interval{ + start: refTime.Add(-3 * time.Hour), + end: refTime, + }, + expected: []interval{ + { + start: refTime.Add(-3 * time.Hour), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: refTime.Add(-time.Hour).Add(-expectedSplitGap), + }, + // apply split_recent_metadata_queries_by_interval for recent metadata queries + { + start: refTime.Add(-time.Hour), + end: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ + recentMetadataSplitDuration: map[string]time.Duration{tenantID: 30 * time.Minute}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: 1 * time.Hour}, + }, nil, + ), + }, + "outside recent metadata query window": { + input: interval{ + start: refTime.Add(-4 * time.Hour), + end: refTime.Add(-2 * time.Hour), + }, + expected: []interval{ + { + start: refTime.Add(-4 * time.Hour), + end: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: refTime.Add(-2 * time.Hour), + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ + recentMetadataSplitDuration: map[string]time.Duration{tenantID: 30 * time.Minute}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: 1 * time.Hour}, + }, nil, + ), + }, + "end aligns with recent metadata query window": { + input: interval{ + start: refTime.Add(-3 * time.Hour), + end: refTime.Add(-1 * time.Hour), + }, + expected: []interval{ + { + start: refTime.Add(-3 * time.Hour), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: refTime.Add(-1 * time.Hour), + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ + recentMetadataSplitDuration: map[string]time.Duration{tenantID: 30 * time.Minute}, + recentMetadataQueryWindow: map[string]time.Duration{tenantID: 1 * time.Hour}, + }, nil, + ), + }, + "recent metadata window not configured": { + input: interval{ + start: refTime.Add(-3 * time.Hour), + end: refTime, + }, + expected: []interval{ + { + start: refTime.Add(-3 * time.Hour), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + }, + } { + t.Run(name, func(t *testing.T) { + req := tc.requestBuilderFunc(intervals.input.start, intervals.input.end) + var want []queryrangebase.Request + + for _, exp := range intervals.expected { + want = append(want, tc.requestBuilderFunc(exp.start, exp.end)) + } + + if intervals.splitInterval == 0 { + intervals.splitInterval = time.Hour } + + if intervals.splitter == nil { + intervals.splitter = newDefaultSplitter(fakeLimits{}, nil) + } + + splits, err := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval) + require.NoError(t, err) + assertSplits(t, want, splits) }) } }) @@ -1610,3 +1923,24 @@ func Test_DoesntDeadlock(t *testing.T) { // Allow for 1% increase in goroutines require.LessOrEqual(t, endingGoroutines, startingGoroutines*101/100) } + +func assertSplits(t *testing.T, want, splits []queryrangebase.Request) { + t.Helper() + + if !assert.Equal(t, want, splits) { + t.Logf("expected and actual do not match\n") + defer t.Fail() + + if len(want) != len(splits) { + t.Logf("expected %d splits, got %d\n", len(want), len(splits)) + return + } + + for j := 0; j < len(want); j++ { + exp := want[j] + act := splits[j] + equal := assert.Equal(t, exp, act) + t.Logf("\t#%d [matches: %v]: expected %q/%q got %q/%q\n", j, equal, exp.GetStart(), exp.GetEnd(), act.GetStart(), act.GetEnd()) + } + } +} diff --git a/pkg/querier/queryrange/splitters.go b/pkg/querier/queryrange/splitters.go index 0aaecf35cb96d..eddcc10edf491 100644 --- a/pkg/querier/queryrange/splitters.go +++ b/pkg/querier/queryrange/splitters.go @@ -91,31 +91,56 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer } var ( - ingesterSplits []queryrangebase.Request - origStart = req.GetStart().UTC() - origEnd = req.GetEnd().UTC() + splitsBeforeRebound []queryrangebase.Request + origStart = req.GetStart().UTC() + origEnd = req.GetEnd().UTC() + start, end = origStart, origEnd + + reboundOrigQuery bool + splitIntervalBeforeRebound time.Duration ) - start, end, needsIngesterSplits := ingesterQueryBounds(execTime, s.iqo, req) + switch req.(type) { + // not applying `split_ingester_queries_by_interval` for metadata queries since it solves a different problem of reducing the subqueries sent to the ingesters. + // we instead prefer `split_recent_metadata_queries_by_interval` for metadata queries which favours shorter subqueries to improve cache effectiveness. + // even though the number of subqueries increase, caching should deamplify it overtime. + case *LokiSeriesRequest, *LabelRequest: + var ( + recentMetadataQueryWindow = validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.RecentMetadataQueryWindow) + recentMetadataQuerySplitInterval = validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.RecentMetadataQuerySplitDuration) + ) + + // if either of them are not configured, we fallback to the default split interval for the entire query length. + if recentMetadataQueryWindow == 0 || recentMetadataQuerySplitInterval == 0 { + break + } - if ingesterQueryInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { - // perform splitting using special interval (`split_ingester_queries_by_interval`) - util.ForInterval(ingesterQueryInterval, start, end, endTimeInclusive, factory) + start, end, reboundOrigQuery = recentMetadataQueryBounds(execTime, recentMetadataQueryWindow, req) + splitIntervalBeforeRebound = recentMetadataQuerySplitInterval + default: + if ingesterQueryInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 { + start, end, reboundOrigQuery = ingesterQueryBounds(execTime, s.iqo, req) + splitIntervalBeforeRebound = ingesterQueryInterval + } + } - // rebound after ingester queries have been split out + if reboundOrigQuery { + util.ForInterval(splitIntervalBeforeRebound, start, end, endTimeInclusive, factory) + + // rebound after query portion within ingester query window or recent metadata query window has been split out end = start - start = req.GetStart().UTC() + start = origStart if endTimeInclusive { end = end.Add(-util.SplitGap) } - // query only overlaps ingester query window, nothing more to do + // query only overlaps ingester query window or recent metadata query window, nothing more to do if start.After(end) || start.Equal(end) { return reqs, nil } // copy the splits, reset the results - ingesterSplits = reqs + splitsBeforeRebound = reqs reqs = nil } else { start = origStart @@ -123,10 +148,10 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer } // perform splitting over the rest of the time range - util.ForInterval(interval, origStart, end, endTimeInclusive, factory) + util.ForInterval(interval, start, end, endTimeInclusive, factory) - // move the ingester splits to the end to maintain correct order - reqs = append(reqs, ingesterSplits...) + // move the ingester or recent metadata splits to the end to maintain correct order + reqs = append(reqs, splitsBeforeRebound...) return reqs, nil } @@ -270,6 +295,22 @@ func (s *metricQuerySplitter) buildMetricSplits(step int64, interval time.Durati } } +func recentMetadataQueryBounds(execTime time.Time, recentMetadataQueryWindow time.Duration, req queryrangebase.Request) (time.Time, time.Time, bool) { + start, end := req.GetStart().UTC(), req.GetEnd().UTC() + windowStart := execTime.UTC().Add(-recentMetadataQueryWindow) + + // rebound only if the query end is strictly inside the window + if !windowStart.Before(end) { + return start, end, false + } + + if windowStart.Before(start) { + windowStart = start + } + + return windowStart, end, true +} + // ingesterQueryBounds determines if we need to split time ranges overlapping the ingester query window (`query_ingesters_within`) // and retrieve the bounds for those specific splits func ingesterQueryBounds(execTime time.Time, iqo util.IngesterQueryOptions, req queryrangebase.Request) (time.Time, time.Time, bool) { diff --git a/pkg/querier/queryrange/volume_cache.go b/pkg/querier/queryrange/volume_cache.go index 147d61912db90..5ae2af4111150 100644 --- a/pkg/querier/queryrange/volume_cache.go +++ b/pkg/querier/queryrange/volume_cache.go @@ -131,6 +131,7 @@ func NewVolumeCacheMiddleware( }, parallelismForReq, retentionEnabled, + false, metrics, ) } diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index d05d71837404c..3ea3e727b502d 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -58,6 +58,7 @@ type ResultsCache struct { merger ResponseMerger shouldCacheReq ShouldCacheReqFn shouldCacheRes ShouldCacheResFn + onlyUseEntireExtent bool parallelismForReq func(ctx context.Context, tenantIDs []string, r Request) int } @@ -79,7 +80,7 @@ func NewResultsCache( shouldCacheRes ShouldCacheResFn, parallelismForReq func(ctx context.Context, tenantIDs []string, r Request) int, cacheGenNumberLoader CacheGenNumberLoader, - retentionEnabled bool, + retentionEnabled, onlyUseEntireExtent bool, ) *ResultsCache { return &ResultsCache{ logger: logger, @@ -95,6 +96,7 @@ func NewResultsCache( shouldCacheReq: shouldCacheReq, shouldCacheRes: shouldCacheRes, parallelismForReq: parallelismForReq, + onlyUseEntireExtent: onlyUseEntireExtent, } } @@ -334,6 +336,25 @@ func (s ResultsCache) partition(req Request, extents []Extent) ([]Request, []Res continue } + if s.onlyUseEntireExtent && (start > extent.GetStart() || end < extent.GetEnd()) { + // It is not possible to extract the overlapping portion of an extent for all request types. + // Metadata results for one cannot be extracted as the data portion is just a list of strings with no associated timestamp. + // To avoid returning incorrect results, we only use extents that are entirely within the requested query range. + // + // Start End + // ┌────────────────────────┐ + // │ Req │ + // └────────────────────────┘ + // + // ◄──────────────► only this extent can be used. Remaining portion of the query will be added to requests. + // + // + // ◄──────X───────► cannot be partially extracted. will be discarded if onlyUseEntireExtent is set. + // ◄───────X──────► + // ◄───────────────X──────────────────► + continue + } + // If this extent is tiny and request is not tiny, discard it: more efficient to do a few larger queries. // Hopefully tiny request can make tiny extent into not-so-tiny extent. @@ -353,6 +374,7 @@ func (s ResultsCache) partition(req Request, extents []Extent) ([]Request, []Res if err != nil { return nil, nil, err } + // extract the overlap from the cached extent. cachedResponses = append(cachedResponses, s.extractor.Extract(start, end, res, extent.GetStart(), extent.GetEnd())) start = extent.End diff --git a/pkg/storage/chunk/cache/resultscache/cache_test.go b/pkg/storage/chunk/cache/resultscache/cache_test.go index bacedd2dda6ba..cff371097a681 100644 --- a/pkg/storage/chunk/cache/resultscache/cache_test.go +++ b/pkg/storage/chunk/cache/resultscache/cache_test.go @@ -61,7 +61,6 @@ func TestPartition(t *testing.T) { mkAPIResponse(0, 100, 10), }, }, - { name: "Test with a complete miss.", input: &MockRequest{ @@ -182,6 +181,123 @@ func TestPartition(t *testing.T) { } } +func TestPartition_onlyUseEntireExtent(t *testing.T) { + for _, tc := range []struct { + name string + input Request + prevCachedResponse []Extent + expectedRequests []Request + expectedCachedResponse []Response + }{ + { + name: "overlapping extent - right", + input: &MockRequest{ + Start: time.UnixMilli(0), + End: time.UnixMilli(100), + }, + prevCachedResponse: []Extent{ + mkExtent(60, 120), + }, + expectedRequests: []Request{ + &MockRequest{ + Start: time.UnixMilli(0), + End: time.UnixMilli(100), + }, + }, + }, + { + name: "overlapping extent - left", + input: &MockRequest{ + Start: time.UnixMilli(20), + End: time.UnixMilli(100), + }, + prevCachedResponse: []Extent{ + mkExtent(0, 50), + }, + expectedRequests: []Request{ + &MockRequest{ + Start: time.UnixMilli(20), + End: time.UnixMilli(100), + }, + }, + }, + { + name: "overlapping extent larger than the request", + input: &MockRequest{ + Start: time.UnixMilli(20), + End: time.UnixMilli(100), + }, + prevCachedResponse: []Extent{ + mkExtent(0, 120), + }, + expectedRequests: []Request{ + &MockRequest{ + Start: time.UnixMilli(20), + End: time.UnixMilli(100), + }, + }, + }, + { + name: "overlapping extent within the requested query range", + input: &MockRequest{ + Start: time.UnixMilli(0), + End: time.UnixMilli(120), + }, + prevCachedResponse: []Extent{ + mkExtent(0, 100), + }, + expectedRequests: []Request{ + &MockRequest{ + Start: time.UnixMilli(100), + End: time.UnixMilli(120), + }, + }, + expectedCachedResponse: []Response{ + mkAPIResponse(0, 100, 10), + }, + }, + { + name: "multiple overlapping extents", + input: &MockRequest{ + Start: time.UnixMilli(50), + End: time.UnixMilli(200), + }, + prevCachedResponse: []Extent{ + mkExtent(0, 80), + mkExtent(100, 150), + mkExtent(150, 180), + mkExtent(200, 250), + }, + expectedRequests: []Request{ + &MockRequest{ + Start: time.UnixMilli(50), + End: time.UnixMilli(100), + }, + &MockRequest{ + Start: time.UnixMilli(180), + End: time.UnixMilli(200), + }, + }, + expectedCachedResponse: []Response{ + mkAPIResponse(100, 150, 10), + mkAPIResponse(150, 180, 10), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + s := ResultsCache{ + extractor: MockExtractor{}, + minCacheExtent: 10, + onlyUseEntireExtent: true, + } + reqs, resps, err := s.partition(tc.input, tc.prevCachedResponse) + require.Nil(t, err) + require.Equal(t, tc.expectedRequests, reqs) + require.Equal(t, tc.expectedCachedResponse, resps) + }) + } +} + func TestHandleHit(t *testing.T) { for _, tc := range []struct { name string @@ -491,6 +607,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { }, nil, false, + false, ) require.NoError(t, err) @@ -534,6 +651,7 @@ func Test_resultsCache_MissingData(t *testing.T) { }, nil, false, + false, ) require.NoError(t, err) ctx := context.Background() diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 262631643c723..13885c0fcb52d 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -107,14 +107,16 @@ type Limits struct { QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. - QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` - MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"` - IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"` - MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` - MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"` - MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"` - VolumeEnabled bool `yaml:"volume_enabled" json:"volume_enabled" doc:"description=Enable log-volume endpoints."` - VolumeMaxSeries int `yaml:"volume_max_series" json:"volume_max_series" doc:"description=The maximum number of aggregated series in a log-volume response"` + QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` + MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"` + RecentMetadataQuerySplitDuration model.Duration `yaml:"split_recent_metadata_queries_by_interval" json:"split_recent_metadata_queries_by_interval"` + RecentMetadataQueryWindow model.Duration `yaml:"recent_metadata_query_window" json:"recent_metadata_query_window"` + IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"` + MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` + MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"` + MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"` + VolumeEnabled bool `yaml:"volume_enabled" json:"volume_enabled" doc:"description=Enable log-volume endpoints."` + VolumeMaxSeries int `yaml:"volume_max_series" json:"volume_max_series" doc:"description=The maximum number of aggregated series in a log-volume response"` // Ruler defaults and limits. RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"` @@ -306,13 +308,14 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.QuerySplitDuration.Set("1h") f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.") - // with metadata caching, it is not possible to extract a subset of labels/series from a cached extent because unlike samples they are not associated with a timestamp. - // as a result, we could return inaccurate results. example: returning results from an entire 1h extent for a 5m query - // Setting max_metadata_cache_freshness to 24h should help us avoid caching recent data and preseve the correctness. - // For the portion of the request beyond the freshness window, granularity of the cached metadata results is determined by split_metadata_queries_by_interval. _ = l.MetadataQuerySplitDuration.Set("24h") f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.") + _ = l.RecentMetadataQuerySplitDuration.Set("1h") + f.Var(&l.RecentMetadataQuerySplitDuration, "experimental.querier.split-recent-metadata-queries-by-interval", "Experimental. Split interval to use for the portion of metadata request that falls within `recent_metadata_query_window`. Rest of the request which is outside the window still uses `split_metadata_queries_by_interval`. If set to 0, the entire request defaults to using a split interval of `split_metadata_queries_by_interval.`.") + + f.Var(&l.RecentMetadataQueryWindow, "experimental.querier.recent-metadata-query-window", "Experimental. Metadata query window inside which `split_recent_metadata_queries_by_interval` gets applied, portion of the metadata request that falls in this window is split using `split_recent_metadata_queries_by_interval`. The value 0 disables using a different split interval for recent metadata queries.\n\nThis is added to improve cacheability of recent metadata queries. Query split interval also determines the interval used in cache key. The default split interval of 24h is useful for caching long queries, each cache key holding 1 day's results. But metadata queries are often shorter than 24h, to cache them effectively we need a smaller split interval. `recent_metadata_query_window` along with `split_recent_metadata_queries_by_interval` help configure a shorter split interval for recent metadata queries.") + _ = l.IngesterQuerySplitDuration.Set("0s") f.Var(&l.IngesterQuerySplitDuration, "querier.split-ingester-queries-by-interval", "Interval to use for time-based splitting when a request is within the `query_ingesters_within` window; defaults to `split-queries-by-interval` by setting to 0.") @@ -598,6 +601,16 @@ func (o *Overrides) MetadataQuerySplitDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MetadataQuerySplitDuration) } +// RecentMetadataQuerySplitDuration returns the tenant specific splitby interval for recent metadata queries. +func (o *Overrides) RecentMetadataQuerySplitDuration(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).RecentMetadataQuerySplitDuration) +} + +// RecentMetadataQueryWindow returns the tenant specific time window used to determine recent metadata queries. +func (o *Overrides) RecentMetadataQueryWindow(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).RecentMetadataQueryWindow) +} + // IngesterQuerySplitDuration returns the tenant specific splitby interval applied in the query frontend when querying // during the `query_ingesters_within` window. func (o *Overrides) IngesterQuerySplitDuration(userID string) time.Duration { From 256f3971052848963fc4d4c9e24a346afbe1b32c Mon Sep 17 00:00:00 2001 From: Zirko <64951262+QuantumEnigmaa@users.noreply.github.com> Date: Wed, 14 Feb 2024 14:24:04 +0100 Subject: [PATCH 5/6] Helm: bump chart version (#11932) Signed-off-by: QuantumEnigmaa Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> --- production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index 183333ba652f4..d3b9186bfa77c 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.4 -version: 5.42.2 +version: 5.42.3 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index 5b6cbe5a2fa1b..a9766865305dd 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.42.2](https://img.shields.io/badge/Version-5.42.2-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) +![Version: 5.42.3](https://img.shields.io/badge/Version-5.42.3-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode From 687978826f065de2d9501b55a41104ad89b3e321 Mon Sep 17 00:00:00 2001 From: Roman Danko Date: Wed, 14 Feb 2024 15:41:37 +0100 Subject: [PATCH 6/6] Helm: Allow to define resources for GrafanaAgent pods (#11851) Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> --- CHANGELOG.md | 1 + docs/sources/setup/install/helm/reference.md | 9 +++++++++ production/helm/loki/CHANGELOG.md | 5 +++++ production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- .../helm/loki/templates/monitoring/grafana-agent.yaml | 4 ++++ production/helm/loki/values.yaml | 7 +++++++ 7 files changed, 28 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f091ed06f883..ca45604012430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods. * [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests. * [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation. * [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md index e687a560ef715..e7dbfdbdd3f65 100644 --- a/docs/sources/setup/install/helm/reference.md +++ b/docs/sources/setup/install/helm/reference.md @@ -2806,6 +2806,15 @@ true
 null
 
+ + + + monitoring.selfMonitoring.grafanaAgent.resources + object + Resource requests and limits for the grafanaAgent pods +
+{}
+
diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 068d37a495530..47d8f6333e4e9 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,11 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) + +## 5.43.0 + +- [ENHANCEMENT] Allow the definition of resources for GrafanaAgent pods + ## 5.42.3 - [BUGFIX] Added condition for `egress-discovery` networkPolicies and ciliumNetworkPolicies. diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index d3b9186bfa77c..ffa62c88d5cd3 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.4 -version: 5.42.3 +version: 5.43.0 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index a9766865305dd..5db87e6d801e0 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.42.3](https://img.shields.io/badge/Version-5.42.3-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) +![Version: 5.43.0](https://img.shields.io/badge/Version-5.43.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/templates/monitoring/grafana-agent.yaml b/production/helm/loki/templates/monitoring/grafana-agent.yaml index 010d9604aab70..a047e5f862517 100644 --- a/production/helm/loki/templates/monitoring/grafana-agent.yaml +++ b/production/helm/loki/templates/monitoring/grafana-agent.yaml @@ -30,6 +30,10 @@ spec: {{- include "loki.selectorLabels" $ | nindent 8 }} {{- end }} {{- end }} + {{- with .resources }} + resources: + {{- toYaml . | nindent 4 }} + {{- end }} {{- with .tolerations }} tolerations: {{- toYaml . | nindent 4 }} diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index e2937af382a7d..6f5a779bc8111 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -671,6 +671,13 @@ monitoring: enableConfigReadAPI: false # -- The name of the PriorityClass for GrafanaAgent pods priorityClassName: null + # -- Resource requests and limits for the grafanaAgent pods + resources: {} + # limits: + # memory: 200Mi + # requests: + # cpu: 50m + # memory: 100Mi # -- Tolerations for GrafanaAgent pods tolerations: [] # PodLogs configuration