diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index a7641bde0c960..687d60dedd13d 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -29,9 +29,13 @@ type processor struct { } func (p *processor) run(ctx context.Context, tasks []Task) error { + return p.runWithBounds(ctx, tasks, v1.MultiFingerprintBounds{{Min: 0, Max: math.MaxUint64}}) +} + +func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error { for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) { tenant := tasks[0].Tenant - err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks) + err := p.processTasks(ctx, tenant, ts, bounds, tasks) if err != nil { for _, task := range tasks { task.CloseWithError(err) @@ -45,13 +49,13 @@ func (p *processor) run(ctx context.Context, tasks []Task) error { return nil } -func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error { +func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces v1.MultiFingerprintBounds, tasks []Task) error { minFpRange, maxFpRange := getFirstLast(keyspaces) interval := bloomshipper.NewInterval(day.Bounds()) metaSearch := bloomshipper.MetaSearchParams{ TenantID: tenant, Interval: interval, - Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max}, + Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max), } metas, err := p.store.FetchMetas(ctx, metaSearch) if err != nil { diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index ec44081c1b30c..af61cdc1a0bd9 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -10,8 +10,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/queue" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) @@ -147,6 +149,7 @@ func (w *worker) running(_ context.Context) error { w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items))) tasks := make([]Task, 0, len(items)) + var mb v1.MultiFingerprintBounds for _, item := range items { task, ok := item.(Task) if !ok { @@ -157,10 +160,13 @@ func (w *worker) running(_ context.Context) error { level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID) w.pending.Delete(task.ID) tasks = append(tasks, task) + + first, last := getFirstLast(task.series) + mb = mb.Union(v1.NewBounds(model.Fingerprint(first.Fingerprint), model.Fingerprint(last.Fingerprint))) } start = time.Now() - err = p.run(taskCtx, tasks) + err = p.runWithBounds(taskCtx, tasks, mb) if err != nil { w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds()) diff --git a/pkg/storage/bloom/v1/bounds.go b/pkg/storage/bloom/v1/bounds.go index e7ff804d55cdb..d3bdc0ee2200c 100644 --- a/pkg/storage/bloom/v1/bounds.go +++ b/pkg/storage/bloom/v1/bounds.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "golang.org/x/exp/slices" "github.com/grafana/loki/pkg/util/encoding" ) @@ -169,6 +170,40 @@ func (b FingerprintBounds) Unless(target FingerprintBounds) (res []FingerprintBo return res } +type MultiFingerprintBounds []FingerprintBounds + +func (mb MultiFingerprintBounds) Union(target FingerprintBounds) MultiFingerprintBounds { + if len(mb) == 0 { + return MultiFingerprintBounds{target} + } + if len(mb) == 1 { + return mb[0].Union(target) + } + + mb = append(mb, target) + slices.SortFunc(mb, func(a, b FingerprintBounds) int { + if a.Less(b) { + return -1 + } else if a.Equal(b) { + return 0 + } + return 1 + }) + + var union MultiFingerprintBounds + for i := 0; i < len(mb); i++ { + j := len(union) - 1 // index of last item of union + if j >= 0 && union[j].Max >= mb[i].Min-1 { + union[j] = NewBounds(union[j].Min, max(mb[i].Max, union[j].Max)) + } else { + union = append(union, mb[i]) + } + } + + mb = union + return mb +} + // unused, but illustrative type BoundedIter[V any] struct { Iterator[V] diff --git a/pkg/storage/bloom/v1/bounds_test.go b/pkg/storage/bloom/v1/bounds_test.go index 1d687437fab6a..4dd01e60c1238 100644 --- a/pkg/storage/bloom/v1/bounds_test.go +++ b/pkg/storage/bloom/v1/bounds_test.go @@ -129,3 +129,132 @@ func Test_FingerprintBounds_Unless(t *testing.T) { }, NewBounds(5, 25).Unless(target)) assert.Nil(t, NewBounds(14, 15).Unless(target)) } + +func Test_MultiFingerprintBounds(t *testing.T) { + for _, tc := range []struct { + desc string + mb MultiFingerprintBounds + target FingerprintBounds + exp MultiFingerprintBounds + }{ + { + desc: "no elements", + mb: MultiFingerprintBounds{}, + target: NewBounds(0, 9), + exp: MultiFingerprintBounds{ + NewBounds(0, 9), + }, + }, + { + desc: "single element before", + mb: MultiFingerprintBounds{ + NewBounds(5, 9), + }, + target: NewBounds(15, 19), + exp: MultiFingerprintBounds{ + NewBounds(5, 9), + NewBounds(15, 19), + }, + }, + { + desc: "single element after", + mb: MultiFingerprintBounds{ + NewBounds(5, 9), + }, + target: NewBounds(0, 3), + exp: MultiFingerprintBounds{ + NewBounds(0, 3), + NewBounds(5, 9), + }, + }, + { + desc: "single element overlapping", + mb: MultiFingerprintBounds{ + NewBounds(5, 9), + }, + target: NewBounds(0, 14), + exp: MultiFingerprintBounds{ + NewBounds(0, 14), + }, + }, + { + desc: "multiple elements single overlapping", + mb: MultiFingerprintBounds{ + NewBounds(5, 9), + NewBounds(15, 19), + }, + target: NewBounds(0, 6), + exp: MultiFingerprintBounds{ + NewBounds(0, 9), + NewBounds(15, 19), + }, + }, + { + desc: "multiple elements single overlapping", + mb: MultiFingerprintBounds{ + NewBounds(5, 9), + NewBounds(15, 19), + }, + target: NewBounds(11, 25), + exp: MultiFingerprintBounds{ + NewBounds(5, 9), + NewBounds(11, 25), + }, + }, + { + desc: "multiple elements combining overlapping", + mb: MultiFingerprintBounds{ + NewBounds(5, 9), + NewBounds(15, 19), + }, + target: NewBounds(9, 15), + exp: MultiFingerprintBounds{ + NewBounds(5, 19), + }, + }, + { + desc: "combination", + mb: MultiFingerprintBounds{ + NewBounds(0, 2), + NewBounds(5, 9), + NewBounds(15, 19), + NewBounds(25, 29), + }, + target: NewBounds(9, 15), + exp: MultiFingerprintBounds{ + NewBounds(0, 2), + NewBounds(5, 19), + NewBounds(25, 29), + }, + }, + { + desc: "overlapping ranges", + mb: MultiFingerprintBounds{ + NewBounds(0, 6), + NewBounds(5, 15), + }, + target: NewBounds(8, 10), + exp: MultiFingerprintBounds{ + NewBounds(0, 15), + }, + }, + { + desc: "disjoint ranges and target is between", + mb: MultiFingerprintBounds{ + NewBounds(0, 9), + NewBounds(30, 39), + }, + target: NewBounds(15, 19), + exp: MultiFingerprintBounds{ + NewBounds(0, 9), + NewBounds(15, 19), + NewBounds(30, 39), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res := tc.mb.Union(tc.target) + assert.Equal(t, tc.exp, res) + }) + } +}