diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 7d1d687853979..aa1ba80898619 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -29,9 +29,13 @@ type processor struct { } func (p *processor) run(ctx context.Context, tasks []Task) error { + return p.runWithBounds(ctx, tasks, MultiFingerprintBounds{{Min: 0, Max: math.MaxUint64}}) +} + +func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds MultiFingerprintBounds) error { for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) { tenant := tasks[0].Tenant - err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks) + err := p.processTasks(ctx, tenant, ts, bounds, tasks) if err != nil { for _, task := range tasks { task.CloseWithError(err) @@ -45,13 +49,13 @@ func (p *processor) run(ctx context.Context, tasks []Task) error { return nil } -func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error { +func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces MultiFingerprintBounds, tasks []Task) error { minFpRange, maxFpRange := getFirstLast(keyspaces) interval := bloomshipper.NewInterval(day.Bounds()) metaSearch := bloomshipper.MetaSearchParams{ TenantID: tenant, Interval: interval, - Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max}, + Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max), } metas, err := p.store.FetchMetas(ctx, metaSearch) if err != nil { diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 3ab234aaa8ae0..1aab96c1632a5 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -184,3 +184,29 @@ func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds { return result } + +type MultiFingerprintBounds []v1.FingerprintBounds + +func (mb MultiFingerprintBounds) Union(target v1.FingerprintBounds) MultiFingerprintBounds { + if len(mb) == 0 { + return MultiFingerprintBounds{target} + } + if len(mb) == 1 { + return mb[0].Union(target) + } + + var union MultiFingerprintBounds + last := target + for len(mb) > 0 { + res := last.Union(mb[0]) + if len(res) == 2 { + last = res[1] + union = append(union, res[0]) + } else { + last = res[0] + } + mb = mb[1:] + } + + return append(union, last) +} diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 6bc43cf794342..af1cbf39a95e9 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -385,3 +385,108 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []*bloomship } return res } + +func Test_MultiFingerprintBounds(t *testing.T) { + for _, tc := range []struct { + desc string + mb MultiFingerprintBounds + target v1.FingerprintBounds + exp MultiFingerprintBounds + }{ + { + desc: "no elements", + mb: MultiFingerprintBounds{}, + target: v1.NewBounds(0, 9), + exp: MultiFingerprintBounds{ + v1.NewBounds(0, 9), + }, + }, + { + desc: "single element before", + mb: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + }, + target: v1.NewBounds(15, 19), + exp: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + v1.NewBounds(15, 19), + }, + }, + { + desc: "single element after", + mb: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + }, + target: v1.NewBounds(0, 3), + exp: MultiFingerprintBounds{ + v1.NewBounds(0, 3), + v1.NewBounds(5, 9), + }, + }, + { + desc: "single element overlapping", + mb: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + }, + target: v1.NewBounds(0, 14), + exp: MultiFingerprintBounds{ + v1.NewBounds(0, 14), + }, + }, + { + desc: "multiple elements single overlapping", + mb: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + v1.NewBounds(15, 19), + }, + target: v1.NewBounds(0, 6), + exp: MultiFingerprintBounds{ + v1.NewBounds(0, 9), + v1.NewBounds(15, 19), + }, + }, + { + desc: "multiple elements single overlapping", + mb: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + v1.NewBounds(15, 19), + }, + target: v1.NewBounds(11, 25), + exp: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + v1.NewBounds(11, 25), + }, + }, + { + desc: "multiple elements combining overlapping", + mb: MultiFingerprintBounds{ + v1.NewBounds(5, 9), + v1.NewBounds(15, 19), + }, + target: v1.NewBounds(9, 15), + exp: MultiFingerprintBounds{ + v1.NewBounds(5, 19), + }, + }, + { + desc: "combination", + mb: MultiFingerprintBounds{ + v1.NewBounds(0, 2), + v1.NewBounds(5, 9), + v1.NewBounds(15, 19), + v1.NewBounds(25, 29), + }, + target: v1.NewBounds(9, 15), + exp: MultiFingerprintBounds{ + v1.NewBounds(0, 2), + v1.NewBounds(5, 19), + v1.NewBounds(25, 29), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res := tc.mb.Union(tc.target) + require.Equal(t, tc.exp, res) + }) + } +} diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index ec44081c1b30c..d11e3e22cc01f 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -10,8 +10,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/queue" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) @@ -147,6 +149,7 @@ func (w *worker) running(_ context.Context) error { w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items))) tasks := make([]Task, 0, len(items)) + var mb MultiFingerprintBounds for _, item := range items { task, ok := item.(Task) if !ok { @@ -157,10 +160,13 @@ func (w *worker) running(_ context.Context) error { level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID) w.pending.Delete(task.ID) tasks = append(tasks, task) + + first, last := getFirstLast(task.series) + mb = mb.Union(v1.NewBounds(model.Fingerprint(first.Fingerprint), model.Fingerprint(last.Fingerprint))) } start = time.Now() - err = p.run(taskCtx, tasks) + err = p.runWithBounds(taskCtx, tasks, mb) if err != nil { w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())