diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 425d6713e92f9..6448e45324f32 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -23,6 +23,10 @@ of line filter expressions. | bloomgateway.Gateway | + queue.RequestQueue + | + bloomgateway.Worker + | bloomshipper.Store | bloomshipper.Shipper @@ -56,6 +60,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/queue" "github.com/grafana/loki/pkg/storage" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/util" @@ -63,7 +68,6 @@ import ( ) var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring") -var errInvalidTenant = errors.New("invalid tenant in chunk refs") // TODO(chaudum): Make these configurable const ( @@ -72,22 +76,26 @@ const ( pendingTasksInitialCap = 1024 ) +const ( + metricsSubsystem = "bloom_gateway" +) + type metrics struct { queueDuration prometheus.Histogram inflightRequests prometheus.Summary } -func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics { +func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { return &metrics{ queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Namespace: constants.Loki, + Namespace: namespace, Subsystem: subsystem, Name: "queue_duration_seconds", Help: "Time spent by tasks in queue before getting picked up by a worker.", Buckets: prometheus.DefBuckets, }), inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{ - Namespace: constants.Loki, + Namespace: namespace, Subsystem: subsystem, Name: "inflight_tasks", Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.", @@ -98,40 +106,6 @@ func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics { } } -// Task is the data structure that is enqueued to the internal queue and queued by query workers -type Task struct { - // ID is a lexcographically sortable unique identifier of the task - ID ulid.ULID - // Tenant is the tenant ID - Tenant string - // Request is the original request - Request *logproto.FilterChunkRefRequest - // ErrCh is a send-only channel to write an error to - ErrCh chan<- error - // ResCh is a send-only channel to write partial responses to - ResCh chan<- *logproto.GroupedChunkRefs -} - -// newTask returns a new Task that can be enqueued to the task queue. -// As additional arguments, it returns a result and an error channel, as well -// as an error if the instantiation fails. -func newTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan *logproto.GroupedChunkRefs, chan error, error) { - key, err := ulid.New(ulid.Now(), nil) - if err != nil { - return Task{}, nil, nil, err - } - errCh := make(chan error, 1) - resCh := make(chan *logproto.GroupedChunkRefs, 1) - task := Task{ - ID: key, - Tenant: tenantID, - Request: req, - ErrCh: errCh, - ResCh: resCh, - } - return task, resCh, errCh, nil -} - // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -169,14 +143,16 @@ func makePendingTasks(n int) *pendingTasks { type Gateway struct { services.Service - cfg Config - logger log.Logger - metrics *metrics + cfg Config + logger log.Logger - queue *queue.RequestQueue - queueMetrics *queue.Metrics - activeUsers *util.ActiveUsersCleanupService - bloomStore bloomshipper.Store + metrics *metrics + workerMetrics *workerMetrics + queueMetrics *queue.Metrics + + queue *queue.RequestQueue + activeUsers *util.ActiveUsersCleanupService + bloomStore bloomshipper.Store sharding ShardingStrategy @@ -184,6 +160,8 @@ type Gateway struct { serviceMngr *services.Manager serviceWatcher *services.FailureWatcher + + workerConfig workerConfig } // New returns a new instance of the Bloom Gateway. @@ -191,12 +169,17 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o g := &Gateway{ cfg: cfg, logger: logger, - metrics: newMetrics("bloom_gateway", reg), + metrics: newMetrics(reg, constants.Loki, metricsSubsystem), sharding: shardingStrategy, pendingTasks: makePendingTasks(pendingTasksInitialCap), + workerConfig: workerConfig{ + maxWaitTime: 200 * time.Millisecond, + maxItems: 100, + }, + workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem), + queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem), } - g.queueMetrics = queue.NewMetrics(reg, constants.Loki, "bloom_gateway") g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics) g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup) @@ -215,19 +198,32 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o return nil, err } + // We need to keep a reference to be able to call Stop() on shutdown of the gateway. g.bloomStore = bloomStore + if err := g.initServices(); err != nil { + return nil, err + } + g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway") + + return g, nil +} + +func (g *Gateway) initServices() error { + var err error svcs := []services.Service{g.queue, g.activeUsers} + for i := 0; i < numWorkers; i++ { + id := fmt.Sprintf("bloom-query-worker-%d", i) + w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics) + svcs = append(svcs, w) + } g.serviceMngr, err = services.NewManager(svcs...) if err != nil { - return nil, err + return err } g.serviceWatcher = services.NewFailureWatcher() g.serviceWatcher.WatchManager(g.serviceMngr) - - g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway") - - return g, nil + return nil } func (g *Gateway) starting(ctx context.Context) error { @@ -245,10 +241,6 @@ func (g *Gateway) starting(ctx context.Context) error { return errors.Wrap(err, "unable to start bloom gateway subservices") } - for i := 0; i < numWorkers; i++ { - go g.startWorker(ctx, fmt.Sprintf("worker-%d", i)) - } - return nil } @@ -278,52 +270,6 @@ func (g *Gateway) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) } -// This is just a dummy implementation of the worker! -// TODO(chaudum): Implement worker that dequeues multiple pending tasks and -// multiplexes them prior to execution. -func (g *Gateway) startWorker(_ context.Context, id string) error { - level.Info(g.logger).Log("msg", "starting worker", "worker", id) - - g.queue.RegisterConsumerConnection(id) - defer g.queue.UnregisterConsumerConnection(id) - - idx := queue.StartIndexWithLocalQueue - - for { - ctx := context.Background() - item, newIdx, err := g.queue.Dequeue(ctx, idx, id) - if err != nil { - if err != queue.ErrStopped { - level.Error(g.logger).Log("msg", "failed to dequeue task", "worker", id, "err", err) - continue - } - level.Info(g.logger).Log("msg", "stopping worker", "worker", id) - return err - } - task, ok := item.(Task) - if !ok { - level.Error(g.logger).Log("msg", "failed to cast to Task", "item", item) - continue - } - - idx = newIdx - level.Info(g.logger).Log("msg", "dequeued task", "worker", id, "task", task.ID) - g.pendingTasks.Delete(task.ID) - - r := task.Request - if len(r.Filters) > 0 { - r.Refs, err = g.bloomStore.FilterChunkRefs(ctx, task.Tenant, r.From.Time(), r.Through.Time(), r.Refs, r.Filters...) - } - if err != nil { - task.ErrCh <- err - } else { - for _, ref := range r.Refs { - task.ResCh <- ref - } - } - } -} - // FilterChunkRefs implements BloomGatewayServer func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) { tenantID, err := tenant.TenantID(ctx) @@ -331,10 +277,11 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } - for _, ref := range req.Refs { - if ref.Tenant != tenantID { - return nil, errors.Wrapf(errInvalidTenant, "expected chunk refs from tenant %s, got tenant %s", tenantID, ref.Tenant) - } + // Shortcut if request does not contain filters + if len(req.Filters) == 0 { + return &logproto.FilterChunkRefResponse{ + ChunkRefs: req.Refs, + }, nil } // Sort ChunkRefs by fingerprint in ascending order @@ -342,7 +289,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint }) - task, resCh, errCh, err := newTask(tenantID, req) + task, resCh, errCh, err := NewTask(tenantID, req) if err != nil { return nil, err } @@ -354,19 +301,61 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk g.pendingTasks.Add(task.ID, task) }) - response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) + requestCount := len(req.Refs) + // TODO(chaudum): Use pool + responses := make([]v1.Output, 0, requestCount) + for { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "waiting for results") case err := <-errCh: - return nil, err + return nil, errors.Wrap(err, "waiting for results") case res := <-resCh: - level.Info(g.logger).Log("msg", "got result", "task", task.ID, "tenant", tenantID, "res", res) + responses = append(responses, res) + // log line is helpful for debugging tests + // level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response - response = append(response, res) - if len(response) == len(req.Refs) { - return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil + if len(responses) == requestCount { + for _, o := range responses { + if res.Removals.Len() == 0 { + continue + } + // we must not remove items from req.Refs as long as the worker may iterater over them + g.removeNotMatchingChunks(req, o) + } + return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + } + } + } +} + +func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) { + // binary search index of fingerprint + idx := sort.Search(len(req.Refs), func(i int) bool { + return req.Refs[i].Fingerprint >= uint64(res.Fp) + }) + + // fingerprint not found + if idx >= len(req.Refs) { + level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) + return + } + + // if all chunks of a fingerprint are are removed + // then remove the whole group from the response + if len(req.Refs[idx].Refs) == res.Removals.Len() { + req.Refs[idx] = nil // avoid leaking pointer + req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...) + return + } + + for i := range res.Removals { + toRemove := res.Removals[i] + for j := range req.Refs[idx].Refs { + if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum { + req.Refs[idx].Refs[j] = nil // avoid leaking pointer + req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...) } } } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 0b6a207362ac6..a294500ce27bd 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -2,11 +2,13 @@ package bloomgateway import ( "context" + "fmt" "os" "testing" "time" "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" @@ -18,9 +20,12 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/storage" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" lokiring "github.com/grafana/loki/pkg/util/ring" + "github.com/grafana/loki/pkg/validation" ) func parseDayTime(s string) config.DayTime { @@ -33,17 +38,35 @@ func parseDayTime(s string) config.DayTime { } } +func mktime(s string) model.Time { + ts, err := time.Parse("2006-01-02 15:04", s) + if err != nil { + panic(err) + } + return model.TimeFromUnix(ts.Unix()) +} + func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs { t.Helper() grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs)) return groupChunkRefs(chunkRefs, grouped) } +func newLimits() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.BloomGatewayEnabled = true + + overrides, _ := validation.NewOverrides(limits, nil) + return overrides +} + func TestBloomGateway_StartStopService(t *testing.T) { ss := NewNoopStrategy() logger := log.NewNopLogger() reg := prometheus.NewRegistry() + limits := newLimits() cm := storage.NewClientMetrics() t.Cleanup(cm.Unregister) @@ -82,7 +105,7 @@ func TestBloomGateway_StartStopService(t *testing.T) { }, } - gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -103,6 +126,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { ss := NewNoopStrategy() logger := log.NewLogfmtLogger(os.Stderr) reg := prometheus.NewRegistry() + limits := newLimits() cm := storage.NewClientMetrics() t.Cleanup(cm.Unregister) @@ -142,7 +166,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -152,8 +176,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.NoError(t, err) }) - ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") - now := model.TimeFromUnix(ts.Unix()) + now := mktime("2023-10-03 10:00") chunkRefs := []*logproto.ChunkRef{ {Fingerprint: 3000, UserID: tenantID, From: now.Add(-24 * time.Hour), Through: now.Add(-23 * time.Hour), Checksum: 1}, @@ -186,33 +209,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }, res) }) - t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) { - reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) - require.NoError(t, err) - - ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") - now := model.TimeFromUnix(ts.Unix()) - - chunkRefs := []*logproto.ChunkRef{ - {Fingerprint: 1000, UserID: tenantID, From: now.Add(-22 * time.Hour), Through: now.Add(-21 * time.Hour), Checksum: 1}, - {Fingerprint: 2000, UserID: "other", From: now.Add(-20 * time.Hour), Through: now.Add(-19 * time.Hour), Checksum: 2}, - } - req := &logproto.FilterChunkRefRequest{ - From: now.Add(-24 * time.Hour), - Through: now, - Refs: groupRefs(t, chunkRefs), - } - - ctx := user.InjectOrgID(context.Background(), tenantID) - _, err = gw.FilterChunkRefs(ctx, req) - require.Error(t, err) - require.Equal(t, "expected chunk refs from tenant test, got tenant other: invalid tenant in chunk refs", err.Error()) - }) - t.Run("gateway tracks active users", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -222,8 +221,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.NoError(t, err) }) - ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") - now := model.TimeFromUnix(ts.Unix()) + now := mktime("2023-10-03 10:00") tenants := []string{"tenant-a", "tenant-b", "tenant-c"} for idx, tenantID := range tenants { @@ -240,6 +238,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { From: now.Add(-24 * time.Hour), Through: now, Refs: groupRefs(t, chunkRefs), + Filters: []*logproto.LineFilterExpression{ + {Operator: 1, Match: "foo"}, + }, } ctx := user.InjectOrgID(context.Background(), tenantID) _, err = gw.FilterChunkRefs(ctx, req) @@ -247,22 +248,157 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } require.ElementsMatch(t, tenants, gw.activeUsers.ActiveUsers()) }) + + t.Run("use fuse queriers to filter chunks", func(t *testing.T) { + reg := prometheus.NewRegistry() + gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + require.NoError(t, err) + + now := mktime("2023-10-03 10:00") + + // replace store implementation and re-initialize workers and sub-services + bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024) + gw.bloomStore = newMockBloomStore(bqs) + err = gw.initServices() + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), gw) + require.NoError(t, err) + t.Cleanup(func() { + err = services.StopAndAwaitTerminated(context.Background(), gw) + require.NoError(t, err) + }) + + chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) + + t.Run("no match - return empty response", func(t *testing.T) { + inputChunkRefs := groupRefs(t, chunkRefs) + req := &logproto.FilterChunkRefRequest{ + From: now.Add(-8 * time.Hour), + Through: now, + Refs: inputChunkRefs, + Filters: []*logproto.LineFilterExpression{ + {Operator: 1, Match: "does not match"}, + }, + } + ctx := user.InjectOrgID(context.Background(), tenantID) + res, err := gw.FilterChunkRefs(ctx, req) + require.NoError(t, err) + + expectedResponse := &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{}, + } + require.Equal(t, expectedResponse, res) + }) + + t.Run("match - return filtered", func(t *testing.T) { + inputChunkRefs := groupRefs(t, chunkRefs) + // hack to get indexed key for a specific series + // the indexed key range for a series is defined as + // i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1 + // where i is the nth series in a block + // fortunately, i is also used as Checksum for the single chunk of a series + // see mkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go + key := inputChunkRefs[0].Refs[0].Checksum*1000 + 500 + + req := &logproto.FilterChunkRefRequest{ + From: now.Add(-8 * time.Hour), + Through: now, + Refs: inputChunkRefs, + Filters: []*logproto.LineFilterExpression{ + {Operator: 1, Match: fmt.Sprint(key)}, + }, + } + ctx := user.InjectOrgID(context.Background(), tenantID) + res, err := gw.FilterChunkRefs(ctx, req) + require.NoError(t, err) + + expectedResponse := &logproto.FilterChunkRefResponse{ + ChunkRefs: inputChunkRefs[:1], + } + require.Equal(t, expectedResponse, res) + }) + + }) +} + +func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockQuerierWithFingerprintRange, [][]v1.SeriesWithBloom) { + t.Helper() + step := (maxFp - minFp) / model.Fingerprint(numBlocks) + bqs := make([]bloomshipper.BlockQuerierWithFingerprintRange, 0, numBlocks) + series := make([][]v1.SeriesWithBloom, 0, numBlocks) + for i := 0; i < numBlocks; i++ { + fromFp := minFp + (step * model.Fingerprint(i)) + throughFp := fromFp + step - 1 + // last block needs to include maxFp + if i == numBlocks-1 { + throughFp = maxFp + } + blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through) + bq := bloomshipper.BlockQuerierWithFingerprintRange{ + BlockQuerier: blockQuerier, + MinFp: fromFp, + MaxFp: throughFp, + } + bqs = append(bqs, bq) + series = append(series, data) + } + return bqs, series } -type fakeLimits struct { +func newMockBloomStore(bqs []bloomshipper.BlockQuerierWithFingerprintRange) *mockBloomStore { + return &mockBloomStore{bqs: bqs} } -func (f fakeLimits) BloomGatewayShardSize(_ string) int { - //TODO implement me - panic("implement me") +type mockBloomStore struct { + bqs []bloomshipper.BlockQuerierWithFingerprintRange } -func (f fakeLimits) BloomGatewayEnabled(_ string) bool { - //TODO implement me - panic("implement me") +// GetBlockQueriersForBlockRefs implements bloomshipper.Store. +func (s *mockBloomStore) GetBlockQueriersForBlockRefs(_ context.Context, _ string, _ []bloomshipper.BlockRef) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) { + return s.bqs, nil } -func (f fakeLimits) BloomGatewayBlocksDownloadingParallelism(_ string) int { - //TODO implement me - panic("implement me") +// GetBlockRefs implements bloomshipper.Store. +func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ time.Time) ([]bloomshipper.BlockRef, error) { + blocks := make([]bloomshipper.BlockRef, 0, len(s.bqs)) + for i := range s.bqs { + blocks = append(blocks, bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + MinFingerprint: uint64(s.bqs[i].MinFp), + MaxFingerprint: uint64(s.bqs[i].MaxFp), + TenantID: tenant, + }, + }) + } + return blocks, nil +} + +// GetBlockQueriers implements bloomshipper.Store. +func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, _, _ time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) { + return s.bqs, nil +} + +func (s *mockBloomStore) Stop() {} + +func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.SeriesWithBloom, nthSeries int) []*logproto.ChunkRef { + t.Helper() + n := 0 + res := make([]*logproto.ChunkRef, 0) + for i := range data { + for j := range data[i] { + if n%nthSeries == 0 { + chk := data[i][j].Series.Chunks[0] + res = append(res, &logproto.ChunkRef{ + Fingerprint: uint64(data[i][j].Series.Fingerprint), + UserID: tenant, + From: chk.Start, + Through: chk.End, + Checksum: chk.Checksum, + }) + } + n++ + } + } + return res } diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go new file mode 100644 index 0000000000000..17063a4903d23 --- /dev/null +++ b/pkg/bloomgateway/multiplexing.go @@ -0,0 +1,221 @@ +package bloomgateway + +import ( + "sort" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" +) + +const ( + Day = 24 * time.Hour +) + +// Task is the data structure that is enqueued to the internal queue and dequeued by query workers +type Task struct { + // ID is a lexcographically sortable unique identifier of the task + ID ulid.ULID + // Tenant is the tenant ID + Tenant string + // Request is the original request + Request *logproto.FilterChunkRefRequest + // ErrCh is a send-only channel to write an error to + ErrCh chan<- error + // ResCh is a send-only channel to write partial responses to + ResCh chan<- v1.Output +} + +// NewTask returns a new Task that can be enqueued to the task queue. +// In addition, it returns a result and an error channel, as well +// as an error if the instantiation fails. +func NewTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan v1.Output, chan error, error) { + key, err := ulid.New(ulid.Now(), nil) + if err != nil { + return Task{}, nil, nil, err + } + errCh := make(chan error, 1) + resCh := make(chan v1.Output, 1) + task := Task{ + ID: key, + Tenant: tenantID, + Request: req, + ErrCh: errCh, + ResCh: resCh, + } + return task, resCh, errCh, nil +} + +// Copy returns a copy of the existing task but with a new slice of chunks +func (t Task) Copy(refs []*logproto.GroupedChunkRefs) Task { + return Task{ + ID: t.ID, + Tenant: t.Tenant, + Request: &logproto.FilterChunkRefRequest{ + From: t.Request.From, + Through: t.Request.Through, + Filters: t.Request.Filters, + Refs: refs, + }, + ErrCh: t.ErrCh, + ResCh: t.ResCh, + } +} + +// Bounds returns the day boundaries of the task +func (t Task) Bounds() (time.Time, time.Time) { + return getDayTime(t.Request.From), getDayTime(t.Request.Through) +} + +func (t Task) ChunkIterForDay(day time.Time) v1.Iterator[*logproto.GroupedChunkRefs] { + cf := filterGroupedChunkRefsByDay{day: day} + return &FilterIter[*logproto.GroupedChunkRefs]{ + iter: v1.NewSliceIter(t.Request.Refs), + matches: cf.contains, + transform: cf.filter, + } +} + +type filterGroupedChunkRefsByDay struct { + day time.Time +} + +func (cf filterGroupedChunkRefsByDay) contains(a *logproto.GroupedChunkRefs) bool { + from, through := getFromThrough(a.Refs) + if from.Time().After(cf.day.Add(Day)) || through.Time().Before(cf.day) { + return false + } + return true +} + +func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { + minTs, maxTs := getFromThrough(a.Refs) + + // in most cases, all chunks are within day range + if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(Day)) { + return a + } + + // case where certain chunks are outside of day range + // using binary search to get min and max index of chunks that fall into the day range + min := sort.Search(len(a.Refs), func(i int) bool { + start := a.Refs[i].From.Time() + end := a.Refs[i].Through.Time() + return start.Compare(cf.day) >= 0 || end.Compare(cf.day) >= 0 + }) + + max := sort.Search(len(a.Refs), func(i int) bool { + start := a.Refs[i].From.Time() + return start.Compare(cf.day.Add(Day)) > 0 + }) + + return &logproto.GroupedChunkRefs{ + Tenant: a.Tenant, + Fingerprint: a.Fingerprint, + Refs: a.Refs[min:max], + } +} + +type Predicate[T any] func(a T) bool +type Transform[T any] func(a T) T + +type FilterIter[T any] struct { + iter v1.Iterator[T] + matches Predicate[T] + transform Transform[T] + cache T + zero T // zero value of the return type of Next() +} + +func (it *FilterIter[T]) Next() bool { + next := it.iter.Next() + if !next { + it.cache = it.zero + return false + } + for next && !it.matches(it.iter.At()) { + next = it.iter.Next() + if !next { + it.cache = it.zero + return false + } + } + it.cache = it.transform(it.iter.At()) + return true +} + +func (it *FilterIter[T]) At() T { + return it.cache +} + +func (it *FilterIter[T]) Err() error { + return nil +} + +// FilterRequest extends v1.Request with an error channel +type FilterRequest struct { + v1.Request + Error chan<- error +} + +// taskMergeIterator implements v1.Iterator +type taskMergeIterator struct { + curr FilterRequest + heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]] + tasks []Task + day time.Time + err error +} + +func newTaskMergeIterator(day time.Time, tasks ...Task) v1.PeekingIterator[v1.Request] { + it := &taskMergeIterator{ + tasks: tasks, + curr: FilterRequest{}, + day: day, + } + it.init() + return v1.NewPeekingIter[v1.Request](it) +} + +func (it *taskMergeIterator) init() { + sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) + for i := range it.tasks { + iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) + sequences = append(sequences, v1.NewPeekingIter(iter)) + } + it.heap = v1.NewHeapIterator( + func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool { + return i.val.Fingerprint < j.val.Fingerprint + }, + sequences..., + ) + it.err = nil +} + +func (it *taskMergeIterator) Next() bool { + ok := it.heap.Next() + if !ok { + return false + } + + group := it.heap.At() + task := it.tasks[group.idx] + + it.curr.Fp = model.Fingerprint(group.val.Fingerprint) + it.curr.Chks = convertToChunkRefs(group.val.Refs) + it.curr.Searches = convertToSearches(task.Request.Filters) + it.curr.Response = task.ResCh + it.curr.Error = task.ErrCh + return true +} + +func (it *taskMergeIterator) At() v1.Request { + return it.curr.Request +} + +func (it *taskMergeIterator) Err() error { + return it.err +} diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go new file mode 100644 index 0000000000000..93e5e5686fdaf --- /dev/null +++ b/pkg/bloomgateway/multiplexing_test.go @@ -0,0 +1,203 @@ +package bloomgateway + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" +) + +func TestTask(t *testing.T) { + t.Run("bounds returns request boundaries", func(t *testing.T) { + ts := model.Now() + req := &logproto.FilterChunkRefRequest{ + From: ts.Add(-1 * time.Hour), + Through: ts, + } + task, _, _, err := NewTask("tenant", req) + require.NoError(t, err) + from, through := task.Bounds() + require.Equal(t, getDayTime(req.From), from) + require.Equal(t, getDayTime(req.Through), through) + }) +} + +func TestTaskMergeIterator(t *testing.T) { + // Thu Nov 09 2023 10:56:50 UTC + ts := model.TimeFromUnix(1699523810) + day := getDayTime(ts) + tenant := "fake" + + t.Run("empty requests result in empty iterator", func(t *testing.T) { + r1 := &logproto.FilterChunkRefRequest{ + From: ts.Add(-3 * time.Hour), + Through: ts.Add(-2 * time.Hour), + Refs: []*logproto.GroupedChunkRefs{}, + } + t1, _, _, err := NewTask(tenant, r1) + require.NoError(t, err) + + r2 := &logproto.FilterChunkRefRequest{ + From: ts.Add(-1 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{}, + } + t2, _, _, err := NewTask(tenant, r2) + require.NoError(t, err) + + r3 := &logproto.FilterChunkRefRequest{ + From: ts.Add(-1 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{}, + } + t3, _, _, err := NewTask(tenant, r3) + require.NoError(t, err) + + it := newTaskMergeIterator(day, t1, t2, t3) + // nothing to iterate over + require.False(t, it.Next()) + }) + + t.Run("merge multiple tasks in ascending fingerprint order", func(t *testing.T) { + r1 := &logproto.FilterChunkRefRequest{ + From: ts.Add(-3 * time.Hour), + Through: ts.Add(-2 * time.Hour), + Refs: []*logproto.GroupedChunkRefs{ + {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100}, + }}, + }, + } + t1, _, _, err := NewTask(tenant, r1) + require.NoError(t, err) + + r2 := &logproto.FilterChunkRefRequest{ + From: ts.Add(-1 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200}, + }}, + {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300}, + }}, + }, + } + t2, _, _, err := NewTask(tenant, r2) + require.NoError(t, err) + + r3 := &logproto.FilterChunkRefRequest{ + From: ts.Add(-1 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400}, + }}, + }, + } + t3, _, _, err := NewTask(tenant, r3) + require.NoError(t, err) + + it := newTaskMergeIterator(day, t1, t2, t3) + + // first item + require.True(t, it.Next()) + r := it.At() + require.Equal(t, model.Fingerprint(100), r.Fp) + require.Equal(t, uint32(100), r.Chks[0].Checksum) + + // second item + require.True(t, it.Next()) + r = it.At() + require.Equal(t, model.Fingerprint(100), r.Fp) + require.Equal(t, uint32(200), r.Chks[0].Checksum) + + // third item + require.True(t, it.Next()) + r = it.At() + require.Equal(t, model.Fingerprint(200), r.Fp) + require.Equal(t, uint32(300), r.Chks[0].Checksum) + + // fourth item + require.True(t, it.Next()) + r = it.At() + require.Equal(t, model.Fingerprint(200), r.Fp) + require.Equal(t, uint32(400), r.Chks[0].Checksum) + + // no more items + require.False(t, it.Next()) + }) +} + +func TestChunkIterForDay(t *testing.T) { + tenant := "fake" + + // Thu Nov 09 2023 10:56:50 UTC + ts := model.TimeFromUnix(1699523810) + + t.Run("filter chunk refs that fall into the day range", func(t *testing.T) { + input := &logproto.FilterChunkRefRequest{ + From: ts.Add(-168 * time.Hour), // 1w ago + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100}, + {From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101}, + }}, + {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200}, + {From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201}, + }}, + {Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300}, + {From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301}, + }}, + {Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400}, + {From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401}, + }}, + {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500}, + {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, + }}, + {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, + {From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601}, + }}, + {Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700}, + {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701}, + }}, + }, + Filters: []*logproto.LineFilterExpression{ + {Operator: 1, Match: "foo"}, + {Operator: 1, Match: "bar"}, + }, + } + + // day ranges from ts-48h to ts-24h + day := getDayTime(ts.Add(-36 * time.Hour)) + + expected := []*logproto.GroupedChunkRefs{ + {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, + }}, + {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, + }}, + } + + task, _, _, _ := NewTask(tenant, input) + it := task.ChunkIterForDay(day) + + output := make([]*logproto.GroupedChunkRefs, 0, len(input.Refs)) + for it.Next() { + output = append(output, it.At()) + } + + require.Equal(t, expected, output) + }) +} diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go new file mode 100644 index 0000000000000..87187e071b82d --- /dev/null +++ b/pkg/bloomgateway/util.go @@ -0,0 +1,167 @@ +package bloomgateway + +import ( + "sort" + "time" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" +) + +type IndexedValue[T any] struct { + idx int + val T +} + +type IterWithIndex[T any] struct { + v1.Iterator[T] + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *IterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.Iterator.At() + return it.cache +} + +func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] { + return &IterWithIndex[T]{ + Iterator: iter, + cache: IndexedValue[T]{idx: idx}, + } +} + +type SliceIterWithIndex[T any] struct { + xs []T // source slice + pos int // position within the slice + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *SliceIterWithIndex[T]) Next() bool { + it.pos++ + return it.pos < len(it.xs) +} + +func (it *SliceIterWithIndex[T]) Err() error { + return nil +} + +func (it *SliceIterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.xs[it.pos] + return it.cache +} + +func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) { + if it.pos+1 >= len(it.xs) { + it.cache.val = it.zero + return it.cache, false + } + it.cache.val = it.xs[it.pos+1] + return it.cache, true +} + +func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] { + return &SliceIterWithIndex[T]{ + xs: xs, + pos: -1, + cache: IndexedValue[T]{idx: idx}, + } +} + +func getDayTime(ts model.Time) time.Time { + return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC) +} + +// TODO(chaudum): Fix Through time calculation +// getFromThrough assumes a list of ShortRefs sorted by From time +// However, it does also assume that the last item has the highest +// Through time, which might not be the case! +func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) { + if len(refs) == 0 { + return model.Earliest, model.Latest + } + return refs[0].From, refs[len(refs)-1].Through +} + +// convertToSearches converts a list of line filter expressions to a list of +// byte slices that can be used with the bloom filters. +// TODO(chaudum): Currently this function only supports equality matchers, +// but we eventually also want to support regex matchers. +func convertToSearches(filters []*logproto.LineFilterExpression) [][]byte { + searches := make([][]byte, 0, len(filters)) + for _, f := range filters { + searches = append(searches, []byte(f.Match)) + } + return searches +} + +// convertToShortRefs converts a v1.ChunkRefs into []*logproto.ShortRef +// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. +func convertToShortRefs(refs v1.ChunkRefs) []*logproto.ShortRef { + result := make([]*logproto.ShortRef, 0, len(refs)) + for _, ref := range refs { + result = append(result, &logproto.ShortRef{From: ref.Start, Through: ref.End, Checksum: ref.Checksum}) + } + return result +} + +// convertToChunkRefs converts a []*logproto.ShortRef into v1.ChunkRefs +// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. +func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs { + result := make(v1.ChunkRefs, 0, len(refs)) + for _, ref := range refs { + result = append(result, v1.ChunkRef{Start: ref.From, End: ref.Through, Checksum: ref.Checksum}) + } + return result +} + +// getFirstLast returns the first and last item of a fingerprint slice +// It assumes an ascending sorted list of fingerprints. +func getFirstLast[T any](s []T) (T, T) { + var zero T + if len(s) == 0 { + return zero, zero + } + return s[0], s[len(s)-1] +} + +type boundedTasks struct { + blockRef bloomshipper.BlockRef + tasks []Task +} + +func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (result []boundedTasks) { + for _, block := range blocks { + bounded := boundedTasks{ + blockRef: block, + } + + for _, task := range tasks { + refs := task.Request.Refs + min := sort.Search(len(refs), func(i int) bool { + return block.Cmp(refs[i].Fingerprint) > v1.Before + }) + + max := sort.Search(len(refs), func(i int) bool { + return block.Cmp(refs[i].Fingerprint) == v1.After + }) + + // All fingerprints fall outside of the consumer's range + if min == len(refs) || max == 0 { + continue + } + + bounded.tasks = append(bounded.tasks, task.Copy(refs[min:max])) + } + + if len(bounded.tasks) > 0 { + result = append(result, bounded) + } + + } + return result +} diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go new file mode 100644 index 0000000000000..1424c56a19153 --- /dev/null +++ b/pkg/bloomgateway/util_test.go @@ -0,0 +1,84 @@ +package bloomgateway + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" +) + +func TestSliceIterWithIndex(t *testing.T) { + t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) { + xs := []string{"a", "b", "c"} + it := NewSliceIterWithIndex(xs, 123) + + // peek at first item + p, ok := it.Peek() + require.True(t, ok) + require.Equal(t, "a", p.val) + require.Equal(t, 123, p.idx) + + // proceed to first item + require.True(t, it.Next()) + require.Equal(t, "a", it.At().val) + require.Equal(t, 123, it.At().idx) + + // proceed to second and third item + require.True(t, it.Next()) + require.True(t, it.Next()) + + // peek at non-existing fourth item + p, ok = it.Peek() + require.False(t, ok) + require.Equal(t, "", p.val) // "" is zero value for type string + require.Equal(t, 123, p.idx) + }) +} + +func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef { + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + MinFingerprint: minFp, + MaxFingerprint: maxFp, + }, + } +} + +func TestPartitionFingerprintRange(t *testing.T) { + seriesPerBound := 100 + bounds := []bloomshipper.BlockRef{ + mkBlockRef(0, 99), + mkBlockRef(100, 199), + mkBlockRef(200, 299), + mkBlockRef(300, 399), // one out of bounds block + } + + nTasks := 4 + nSeries := 300 + tasks := make([]Task, nTasks) + for i := 0; i < nSeries; i++ { + if tasks[i%4].Request == nil { + tasks[i%4].Request = &logproto.FilterChunkRefRequest{} + } + tasks[i%4].Request.Refs = append(tasks[i%nTasks].Request.Refs, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) + } + + results := partitionFingerprintRange(tasks, bounds) + require.Equal(t, 3, len(results)) // ensure we only return bounds in range + for _, res := range results { + // ensure we have the right number of tasks per bound + for i := 0; i < nTasks; i++ { + require.Equal(t, seriesPerBound/nTasks, len(res.tasks[i].Request.Refs)) + } + } + + // ensure bound membership + for i := 0; i < nSeries; i++ { + require.Equal(t, + &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}, + results[i/seriesPerBound].tasks[i%nTasks].Request.Refs[i%seriesPerBound/nTasks], + ) + } +} diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go new file mode 100644 index 0000000000000..f39632b1219ff --- /dev/null +++ b/pkg/bloomgateway/worker.go @@ -0,0 +1,227 @@ +package bloomgateway + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/queue" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" +) + +type workerConfig struct { + maxWaitTime time.Duration + maxItems int +} + +type workerMetrics struct { + dequeuedTasks *prometheus.CounterVec + dequeueErrors *prometheus.CounterVec + dequeueWaitTime *prometheus.SummaryVec + storeAccessLatency *prometheus.HistogramVec +} + +func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics { + labels := []string{"worker"} + return &workerMetrics{ + dequeuedTasks: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dequeued_tasks_total", + Help: "Total amount of tasks that the worker dequeued from the bloom query queue", + }, labels), + dequeueErrors: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dequeue_errors_total", + Help: "Total amount of failed dequeue operations", + }, labels), + dequeueWaitTime: promauto.With(registerer).NewSummaryVec(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dequeue_wait_time", + Help: "Time spent waiting for dequeuing tasks from queue", + }, labels), + storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "store_latency", + Help: "Latency in seconds of accessing the bloom store component", + }, append(labels, "operation")), + } +} + +// worker is a datastructure that consumes tasks from the request queue, +// processes them and returns the result/error back to the response channels of +// the tasks. +// It is responsible for multiplexing tasks so they can be processes in a more +// efficient way. +type worker struct { + services.Service + + id string + cfg workerConfig + queue *queue.RequestQueue + store bloomshipper.Store + tasks *pendingTasks + logger log.Logger + metrics *workerMetrics +} + +func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store bloomshipper.Store, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker { + w := &worker{ + id: id, + cfg: cfg, + queue: queue, + store: store, + tasks: tasks, + logger: log.With(logger, "worker", id), + metrics: metrics, + } + w.Service = services.NewBasicService(w.starting, w.running, w.stopping).WithName(id) + return w +} + +func (w *worker) starting(_ context.Context) error { + level.Debug(w.logger).Log("msg", "starting worker") + w.queue.RegisterConsumerConnection(w.id) + return nil +} + +func (w *worker) running(ctx context.Context) error { + idx := queue.StartIndexWithLocalQueue + + for { + select { + + case <-ctx.Done(): + return ctx.Err() + + default: + taskCtx := context.Background() + dequeueStart := time.Now() + items, newIdx, err := w.queue.DequeueMany(taskCtx, idx, w.id, w.cfg.maxItems, w.cfg.maxWaitTime) + w.metrics.dequeueWaitTime.WithLabelValues(w.id).Observe(time.Since(dequeueStart).Seconds()) + if err != nil { + // We only return an error if the queue is stopped and dequeuing did not yield any items + if err == queue.ErrStopped && len(items) == 0 { + return err + } + w.metrics.dequeueErrors.WithLabelValues(w.id).Inc() + level.Error(w.logger).Log("msg", "failed to dequeue tasks", "err", err, "items", len(items)) + } + idx = newIdx + + if len(items) == 0 { + w.queue.ReleaseRequests(items) + continue + } + w.metrics.dequeuedTasks.WithLabelValues(w.id).Add(float64(len(items))) + + tasksPerDay := make(map[time.Time][]Task) + + for _, item := range items { + task, ok := item.(Task) + if !ok { + // This really should never happen, because only the bloom gateway itself can enqueue tasks. + w.queue.ReleaseRequests(items) + return errors.Errorf("failed to cast dequeued item to Task: %v", item) + } + level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID) + w.tasks.Delete(task.ID) + + fromDay, throughDay := task.Bounds() + + if fromDay.Equal(throughDay) { + tasksPerDay[fromDay] = append(tasksPerDay[fromDay], task) + } else { + for i := fromDay; i.Before(throughDay); i = i.Add(24 * time.Hour) { + tasksPerDay[i] = append(tasksPerDay[i], task) + } + } + } + + for day, tasks := range tasksPerDay { + logger := log.With(w.logger, "day", day) + level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks)) + + storeFetchStart := time.Now() + blockRefs, err := w.store.GetBlockRefs(taskCtx, tasks[0].Tenant, day, day.Add(Day).Add(-1*time.Nanosecond)) + w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds()) + if err != nil { + for _, t := range tasks { + t.ErrCh <- err + } + // continue with tasks of next day + continue + } + // No blocks found. + // Since there are no blocks for the given tasks, we need to return the + // unfiltered list of chunk refs. + if len(blockRefs) == 0 { + level.Warn(logger).Log("msg", "no blocks found") + for _, t := range tasks { + for _, ref := range t.Request.Refs { + t.ResCh <- v1.Output{ + Fp: model.Fingerprint(ref.Fingerprint), + Removals: nil, + } + } + } + // continue with tasks of next day + continue + } + + boundedRefs := partitionFingerprintRange(tasks, blockRefs) + blockRefs = blockRefs[:0] + for _, b := range boundedRefs { + blockRefs = append(blockRefs, b.blockRef) + } + + // GetBlockQueriersForBlockRefs() waits until all blocks are downloaded and available for querying. + // TODO(chaudum): Add API that allows to process blocks as soon as they become available. + // This will require to change the taskMergeIterator to a slice of requests so we can seek + // to the appropriate fingerprint range within the slice that matches the block's fingerprint range. + storeFetchStart = time.Now() + blockQueriers, err := w.store.GetBlockQueriersForBlockRefs(taskCtx, tasks[0].Tenant, blockRefs) + w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockQueriersForBlockRefs").Observe(time.Since(storeFetchStart).Seconds()) + if err != nil { + for _, t := range tasks { + t.ErrCh <- err + } + // continue with tasks of next day + continue + } + + for i, blockQuerier := range blockQueriers { + it := newTaskMergeIterator(day, boundedRefs[i].tasks...) + fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it}) + err := fq.Run() + if err != nil { + for _, t := range boundedRefs[i].tasks { + t.ErrCh <- errors.Wrap(err, "failed to run chunk check") + } + } + } + } + + // return dequeued items back to the pool + w.queue.ReleaseRequests(items) + + } + } +} + +func (w *worker) stopping(err error) error { + level.Debug(w.logger).Log("msg", "stopping worker", "err", err) + w.queue.UnregisterConsumerConnection(w.id) + return nil +} diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 055b7b5c92717..a7bebfbfccf14 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/util" - lokiutil "github.com/grafana/loki/pkg/util" ) type Config struct { @@ -151,7 +150,7 @@ func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logg } if ring != nil { - w, err := lokiutil.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f) + w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f) if err != nil { return nil, err } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index fa1860e4e88d3..f0475164bd4d1 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -59,6 +59,7 @@ type RequestQueue struct { stopped bool metrics *Metrics + pool *SlicePool[Request] } func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue { @@ -66,6 +67,7 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, met queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay), connectedConsumers: atomic.NewInt32(0), metrics: metrics, + pool: NewSlicePool[Request](1<<6, 1<<10, 2), // Buckets are [64, 128, 256, 512, 1024]. } q.cond = contextCond{Cond: sync.NewCond(&q.mtx)} @@ -125,6 +127,41 @@ func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQue } } +// ReleaseRequests returns items back to the slice pool. +// Must only be called in combination with DequeueMany(). +func (q *RequestQueue) ReleaseRequests(items []Request) { + q.pool.Put(items) +} + +// DequeueMany consumes multiple items for a single tenant from the queue. +// It returns maxItems and waits maxWait if no requests for this tenant are enqueued. +// The caller is responsible for returning the dequeued requests back to the +// pool by calling ReleaseRequests(items). +func (q *RequestQueue) DequeueMany(ctx context.Context, last QueueIndex, consumerID string, maxItems int, maxWait time.Duration) ([]Request, QueueIndex, error) { + // create a context for dequeuing with a max time we want to wait to fullfill the desired maxItems + + dequeueCtx, cancel := context.WithTimeout(ctx, maxWait) + defer cancel() + + var idx QueueIndex + + items := q.pool.Get(maxItems) + for { + item, newIdx, err := q.Dequeue(dequeueCtx, last, consumerID) + if err != nil { + if err == context.DeadlineExceeded { + err = nil + } + return items, idx, err + } + items = append(items, item) + idx = newIdx + if len(items) == maxItems { + return items, idx, nil + } + } +} + // Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. // By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. // If consumer finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser. diff --git a/pkg/queue/util.go b/pkg/queue/util.go new file mode 100644 index 0000000000000..9b7fced6dfbf7 --- /dev/null +++ b/pkg/queue/util.go @@ -0,0 +1,25 @@ +package queue + +import "github.com/prometheus/prometheus/util/pool" + +// SlicePool uses a bucket pool and wraps the Get() and Put() functions for +// simpler access. +type SlicePool[T any] struct { + p *pool.Pool +} + +func NewSlicePool[T any](minSize, maxSize int, factor float64) *SlicePool[T] { + return &SlicePool[T]{ + p: pool.New(minSize, maxSize, factor, func(i int) interface{} { + return make([]T, 0, i) + }), + } +} + +func (sp *SlicePool[T]) Get(n int) []T { + return sp.p.Get(n).([]T) +} + +func (sp *SlicePool[T]) Put(buf []T) { + sp.p.Put(buf[0:0]) +} diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 0ea6f6451ebac..e7278b971f6c8 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -3,51 +3,13 @@ package v1 import ( "bytes" "errors" - "fmt" "testing" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc" - "github.com/grafana/loki/pkg/storage/bloom/v1/filter" ) -func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) { - seriesList = make([]SeriesWithBloom, 0, nSeries) - keysList = make([][][]byte, 0, nSeries) - for i := 0; i < nSeries; i++ { - var series Series - step := (throughFp - fromFp) / (model.Fingerprint(nSeries)) - series.Fingerprint = fromFp + model.Fingerprint(i)*step - timeDelta := fromTs + (throughTs-fromTs)/model.Time(nSeries)*model.Time(i) - series.Chunks = []ChunkRef{ - { - Start: fromTs + timeDelta*model.Time(i), - End: fromTs + timeDelta*model.Time(i), - Checksum: uint32(i), - }, - } - - var bloom Bloom - bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8) - - keys := make([][]byte, 0, keysPerSeries) - for j := 0; j < keysPerSeries; j++ { - key := []byte(fmt.Sprint(j)) - bloom.Add(key) - keys = append(keys, key) - } - - seriesList = append(seriesList, SeriesWithBloom{ - Series: &series, - Bloom: &bloom, - }) - keysList = append(keysList, keys) - } - return -} - func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) { for expected.Next() { require.True(t, actual.Next()) diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 021fba1e81856..c397a7a55fd57 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -1,56 +1,53 @@ package v1 import ( - "sort" - "github.com/efficientgo/core/errors" "github.com/prometheus/common/model" ) -type request struct { - fp model.Fingerprint - chks ChunkRefs - searches [][]byte - response chan output +type Request struct { + Fp model.Fingerprint + Chks ChunkRefs + Searches [][]byte + Response chan<- Output } -// output represents a chunk that was present in the bloom -// but failed to pass the search filters and can be removed from -// the list of chunks to download -type output struct { - fp model.Fingerprint - removals ChunkRefs +// Output represents a chunk that failed to pass all searches +// and must be downloaded +type Output struct { + Fp model.Fingerprint + Removals ChunkRefs } // Fuse combines multiple requests into a single loop iteration // over the data set and returns the corresponding outputs // TODO(owen-d): better async control -func (bq *BlockQuerier) Fuse(inputs []PeekingIterator[request]) *FusedQuerier { +func (bq *BlockQuerier) Fuse(inputs []PeekingIterator[Request]) *FusedQuerier { return NewFusedQuerier(bq, inputs) } type FusedQuerier struct { bq *BlockQuerier - inputs Iterator[[]request] + inputs Iterator[[]Request] } -func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[request]) *FusedQuerier { - heap := NewHeapIterator[request]( - func(a, b request) bool { - return a.fp < b.fp +func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request]) *FusedQuerier { + heap := NewHeapIterator[Request]( + func(a, b Request) bool { + return a.Fp < b.Fp }, inputs..., ) - merging := NewDedupingIter[request, []request]( - func(a request, b []request) bool { - return a.fp == b[0].fp + merging := NewDedupingIter[Request, []Request]( + func(a Request, b []Request) bool { + return a.Fp == b[0].Fp }, - func(a request) []request { return []request{a} }, - func(a request, b []request) []request { + func(a Request) []Request { return []Request{a} }, + func(a Request, b []Request) []Request { return append(b, a) }, - NewPeekingIter[request](heap), + NewPeekingIter[Request](heap), ) return &FusedQuerier{ bq: bq, @@ -63,7 +60,7 @@ func (fq *FusedQuerier) Run() error { // find all queries for the next relevant fingerprint nextBatch := fq.inputs.At() - fp := nextBatch[0].fp + fp := nextBatch[0].Fp // advance the series iterator to the next fingerprint if err := fq.bq.Seek(fp); err != nil { @@ -79,9 +76,9 @@ func (fq *FusedQuerier) Run() error { if series.Fingerprint != fp { // fingerprint not found, can't remove chunks for _, input := range nextBatch { - input.response <- output{ - fp: fp, - removals: nil, + input.Response <- Output{ + Fp: fp, + Removals: nil, } } } @@ -91,9 +88,9 @@ func (fq *FusedQuerier) Run() error { if !fq.bq.blooms.Next() { // fingerprint not found, can't remove chunks for _, input := range nextBatch { - input.response <- output{ - fp: fp, - removals: nil, + input.Response <- Output{ + Fp: fp, + Removals: nil, } } continue @@ -103,19 +100,17 @@ func (fq *FusedQuerier) Run() error { // test every input against this chunk inputLoop: for _, input := range nextBatch { - _, inBlooms := input.chks.Compare(series.Chunks, true) + _, inBlooms := input.Chks.Compare(series.Chunks, true) // First, see if the search passes the series level bloom before checking for chunks individually - for _, search := range input.searches { + for _, search := range input.Searches { if !bloom.Test(search) { - // the entire series bloom didn't pass one of the searches, - // so we can skip checking chunks individually. // We return all the chunks that were the intersection of the query // because they for sure do not match the search and don't // need to be downloaded - input.response <- output{ - fp: fp, - removals: inBlooms, + input.Response <- Output{ + Fp: fp, + Removals: inBlooms, } continue inputLoop } @@ -126,7 +121,7 @@ func (fq *FusedQuerier) Run() error { chunkLoop: for _, chk := range inBlooms { - for _, search := range input.searches { + for _, search := range input.Searches { // TODO(owen-d): meld chunk + search into a single byte slice from the block schema var combined = search @@ -138,9 +133,9 @@ func (fq *FusedQuerier) Run() error { // Otherwise, the chunk passed all the searches } - input.response <- output{ - fp: fp, - removals: removals, + input.Response <- Output{ + Fp: fp, + Removals: removals, } } @@ -148,43 +143,3 @@ func (fq *FusedQuerier) Run() error { return nil } - -// boundedRequests is a set of requests that are clamped to a specific range -type boundedRequests struct { - bounds FingerprintBounds - reqs [][]model.Fingerprint -} - -// reqs models a set of requests covering many fingerprints. -// consumers models a set of blocks covering different fingerprint ranges -func partitionFingerprintRange(reqs [][]model.Fingerprint, blocks []FingerprintBounds) (res []boundedRequests) { - for _, block := range blocks { - bounded := boundedRequests{ - bounds: block, - } - - for _, req := range reqs { - min := sort.Search(len(req), func(i int) bool { - return block.Cmp(req[i]) > Before - }) - - max := sort.Search(len(req), func(i int) bool { - return block.Cmp(req[i]) == After - }) - - // All fingerprints fall outside of the consumer's range - if min == len(req) || max == 0 { - continue - } - - bounded.reqs = append(bounded.reqs, req[min:max]) - } - - if len(bounded.reqs) > 0 { - res = append(res, bounded) - } - - } - - return res -} diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index b990d69f4b7bd..e784ac0168201 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -7,44 +7,11 @@ import ( "testing" "github.com/grafana/dskit/concurrency" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc" ) -func TestPartitionFingerprintRange(t *testing.T) { - seriesPerBound := 100 - bounds := []FingerprintBounds{ - {0, 99}, - {100, 199}, - {200, 299}, - {300, 399}, // one out of bounds block - } - - nReqs := 4 - nSeries := 300 - reqs := make([][]model.Fingerprint, nReqs) - for i := 0; i < nSeries; i++ { - reqs[i%4] = append(reqs[i%nReqs], model.Fingerprint(i)) - } - - results := partitionFingerprintRange(reqs, bounds) - require.Equal(t, 3, len(results)) // ensure we only return bounds in range - for _, res := range results { - // ensure we have the right number of requests per bound - for i := 0; i < nReqs; i++ { - require.Equal(t, seriesPerBound/nReqs, len(res.reqs[i])) - } - } - - // ensure bound membership - for i := 0; i < nSeries; i++ { - require.Equal(t, model.Fingerprint(i), results[i/seriesPerBound].reqs[i%nReqs][i%seriesPerBound/nReqs]) - } - -} - func TestFusedQuerier(t *testing.T) { // references for linking in memory reader+writer indexBuf := bytes.NewBuffer(nil) @@ -74,37 +41,39 @@ func TestFusedQuerier(t *testing.T) { querier := NewBlockQuerier(block) nReqs := 10 - var inputs [][]request + var inputs [][]Request + var resChans []chan Output for i := 0; i < nReqs; i++ { - ch := make(chan output) - var reqs []request + ch := make(chan Output) + var reqs []Request // find 2 series for each for j := 0; j < 2; j++ { idx := numSeries/nReqs*i + j - reqs = append(reqs, request{ - fp: data[idx].Series.Fingerprint, - chks: data[idx].Series.Chunks, - response: ch, + reqs = append(reqs, Request{ + Fp: data[idx].Series.Fingerprint, + Chks: data[idx].Series.Chunks, + Response: ch, }) } inputs = append(inputs, reqs) + resChans = append(resChans, ch) } - var itrs []PeekingIterator[request] + var itrs []PeekingIterator[Request] for _, reqs := range inputs { - itrs = append(itrs, NewPeekingIter[request](NewSliceIter[request](reqs))) + itrs = append(itrs, NewPeekingIter[Request](NewSliceIter[Request](reqs))) } - resps := make([][]output, nReqs) + resps := make([][]Output, nReqs) var g sync.WaitGroup g.Add(1) go func() { require.Nil(t, concurrency.ForEachJob( context.Background(), - len(resps), - len(resps), + len(resChans), + len(resChans), func(_ context.Context, i int) error { - for v := range inputs[i][0].response { + for v := range resChans[i] { resps[i] = append(resps[i], v) } return nil @@ -117,7 +86,7 @@ func TestFusedQuerier(t *testing.T) { require.Nil(t, fused.Run()) for _, input := range inputs { - close(input[0].response) + close(input[0].Response) } g.Wait() @@ -126,9 +95,9 @@ func TestFusedQuerier(t *testing.T) { resp := resps[i][j] require.Equal( t, - output{ - fp: req.fp, - removals: nil, + Output{ + Fp: req.Fp, + Removals: nil, }, resp, ) @@ -136,7 +105,7 @@ func TestFusedQuerier(t *testing.T) { } } -func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]request) { +func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -165,11 +134,12 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]request) { numRequestChains := 100 seriesPerRequest := 100 - var requestChains [][]request + var requestChains [][]Request + var responseChans []chan Output for i := 0; i < numRequestChains; i++ { - var reqs []request + var reqs []Request // ensure they use the same channel - ch := make(chan output) + ch := make(chan Output) // evenly spread out the series queried within a single request chain // to mimic series distribution across keyspace for j := 0; j < seriesPerRequest; j++ { @@ -178,21 +148,22 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]request) { if idx >= numSeries { idx = numSeries - 1 } - reqs = append(reqs, request{ - fp: data[idx].Series.Fingerprint, - chks: data[idx].Series.Chunks, - response: ch, + reqs = append(reqs, Request{ + Fp: data[idx].Series.Fingerprint, + Chks: data[idx].Series.Chunks, + Response: ch, }) } requestChains = append(requestChains, reqs) + responseChans = append(responseChans, ch) } - return querier, requestChains + return querier, requestChains, responseChans } func BenchmarkBlockQuerying(b *testing.B) { b.StopTimer() - querier, requestChains := setupBlockForBenchmark(b) + querier, requestChains, responseChans := setupBlockForBenchmark(b) // benchmark b.StartTimer() @@ -200,7 +171,7 @@ func BenchmarkBlockQuerying(b *testing.B) { for i := 0; i < b.N; i++ { for _, chain := range requestChains { for _, req := range chain { - _, _ = querier.CheckChunksForSeries(req.fp, req.chks, nil) + _, _ = querier.CheckChunksForSeries(req.Fp, req.Chks, nil) } } } @@ -211,22 +182,22 @@ func BenchmarkBlockQuerying(b *testing.B) { go func() { require.Nil(b, concurrency.ForEachJob( context.Background(), - len(requestChains), len(requestChains), + len(responseChans), len(responseChans), func(_ context.Context, idx int) error { // nolint:revive - for range requestChains[idx][0].response { + for range responseChans[idx] { } return nil }, )) }() - var itrs []PeekingIterator[request] + var itrs []PeekingIterator[Request] for i := 0; i < b.N; i++ { itrs = itrs[:0] for _, reqs := range requestChains { - itrs = append(itrs, NewPeekingIter[request](NewSliceIter[request](reqs))) + itrs = append(itrs, NewPeekingIter[Request](NewSliceIter[Request](reqs))) } fused := querier.Fuse(itrs) _ = fused.Run() diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go new file mode 100644 index 0000000000000..215ecaffe177e --- /dev/null +++ b/pkg/storage/bloom/v1/test_util.go @@ -0,0 +1,81 @@ +package v1 + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/storage/bloom/v1/filter" +) + +func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (*BlockQuerier, []SeriesWithBloom) { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + numSeries := int(throughFp - fromFp) + numKeysPerSeries := 1000 + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFp, throughFp, fromTs, throughTs) + + builder, err := NewBlockBuilder( + BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + writer, + ) + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data) + _, err = builder.BuildFrom(itr) + require.Nil(t, err) + block := NewBlock(reader) + return NewBlockQuerier(block), data +} + +func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) { + seriesList = make([]SeriesWithBloom, 0, nSeries) + keysList = make([][][]byte, 0, nSeries) + + step := (throughFp - fromFp) / model.Fingerprint(nSeries) + timeDelta := time.Duration(throughTs.Sub(fromTs).Nanoseconds() / int64(nSeries)) + + for i := 0; i < nSeries; i++ { + var series Series + series.Fingerprint = fromFp + model.Fingerprint(i)*step + from := fromTs.Add(timeDelta * time.Duration(i)) + series.Chunks = []ChunkRef{ + { + Start: from, + End: from.Add(timeDelta), + Checksum: uint32(i), + }, + } + + var bloom Bloom + bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8) + + keys := make([][]byte, 0, keysPerSeries) + for j := 0; j < keysPerSeries; j++ { + key := []byte(fmt.Sprint(i*keysPerSeries + j)) + bloom.Add(key) + keys = append(keys, key) + } + + seriesList = append(seriesList, SeriesWithBloom{ + Series: &series, + Bloom: &bloom, + }) + keysList = append(keysList, keys) + } + return +} diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 5709bf8866f21..d1e9f24ef866b 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -11,11 +11,11 @@ import ( "strings" "time" - "github.com/prometheus/common/model" - "github.com/grafana/dskit/concurrency" + "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/storage" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/util/math" @@ -37,6 +37,16 @@ type Ref struct { Checksum uint32 } +// Cmp returns the fingerprint's position relative to the bounds +func (b Ref) Cmp(fp uint64) v1.BoundsCheck { + if fp < b.MinFingerprint { + return v1.Before + } else if fp > b.MaxFingerprint { + return v1.After + } + return v1.Overlap +} + type BlockRef struct { Ref IndexPath string diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index 98dbbb20a476a..c04cad433308a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -39,30 +39,30 @@ func NewShipper(client Client, config config.Config, limits Limits, logger log.L }, nil } -func (s *Shipper) ForEachBlock( - ctx context.Context, - tenantID string, - from, through time.Time, - fingerprints []uint64, - callback ForEachBlockCallback) error { +func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, from, through time.Time) ([]BlockRef, error) { + level.Debug(s.logger).Log("msg", "GetBlockRefs", "tenant", tenantID, "from", from, "through", through) - level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", len(fingerprints)) - - blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from.UnixNano(), through.UnixNano(), fingerprints) + blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from.UnixNano(), through.UnixNano(), nil) if err != nil { - return fmt.Errorf("error fetching active block references : %w", err) + return nil, fmt.Errorf("error fetching active block references : %w", err) } + return blockRefs, nil +} +func (s *Shipper) Fetch(ctx context.Context, tenantID string, blocks []BlockRef, callback ForEachBlockCallback) error { cancelContext, cancelFunc := context.WithCancel(ctx) defer cancelFunc() - blocksChannel, errorsChannel := s.blockDownloader.downloadBlocks(cancelContext, tenantID, blockRefs) + blocksChannel, errorsChannel := s.blockDownloader.downloadBlocks(cancelContext, tenantID, blocks) + for { select { + case <-ctx.Done(): + return fmt.Errorf("failed to fetch blocks: %w", ctx.Err()) case result, ok := <-blocksChannel: if !ok { return nil } - err = callback(result.BlockQuerier) + err := callback(result.BlockQuerier, result.MinFingerprint, result.MaxFingerprint) if err != nil { return fmt.Errorf("error running callback function for block %s err: %w", result.BlockPath, err) } @@ -74,27 +74,34 @@ func (s *Shipper) ForEachBlock( } } +func (s *Shipper) ForEachBlock(ctx context.Context, tenantID string, from, through time.Time, fingerprints []uint64, callback ForEachBlockCallback) error { + level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", len(fingerprints)) + + blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from.UnixNano(), through.UnixNano(), fingerprints) + if err != nil { + return fmt.Errorf("error fetching active block references : %w", err) + } + + return s.Fetch(ctx, tenantID, blockRefs, callback) +} + func (s *Shipper) Stop() { s.client.Stop() s.blockDownloader.stop() } -// getFromThrough returns the first and list item of a fingerprint slice +// getFirstLast returns the first and last item of a fingerprint slice // It assumes an ascending sorted list of fingerprints. -func getFromThrough(fingerprints []uint64) (uint64, uint64) { - if len(fingerprints) == 0 { - return 0, 0 +func getFirstLast[T any](s []T) (T, T) { + var zero T + if len(s) == 0 { + return zero, zero } - return fingerprints[0], fingerprints[len(fingerprints)-1] + return s[0], s[len(s)-1] } -func (s *Shipper) getActiveBlockRefs( - ctx context.Context, - tenantID string, - from, through int64, - fingerprints []uint64) ([]BlockRef, error) { - - minFingerprint, maxFingerprint := getFromThrough(fingerprints) +func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, through int64, fingerprints []uint64) ([]BlockRef, error) { + minFingerprint, maxFingerprint := getFirstLast(fingerprints) metas, err := s.client.GetMetas(ctx, MetaSearchParams{ TenantID: tenantID, MinFingerprint: minFingerprint, @@ -164,7 +171,7 @@ func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp int64, fingerprint } // Then, check if outside of min/max of fingerprint slice - minFp, maxFp := getFromThrough(fingerprints) + minFp, maxFp := getFirstLast(fingerprints) if b.MaxFingerprint < minFp || b.MinFingerprint > maxFp { return true } diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 80f2c352d5326..70c61ba0add8e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -2,18 +2,20 @@ package bloomshipper import ( "context" + "sort" "time" "github.com/prometheus/common/model" - "github.com/grafana/loki/pkg/logproto" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) -type ForEachBlockCallback func(bq *v1.BlockQuerier) error +type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error type ReadShipper interface { + GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) ForEachBlock(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64, callback ForEachBlockCallback) error + Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error } type Interface interface { @@ -21,8 +23,15 @@ type Interface interface { Stop() } +type BlockQuerierWithFingerprintRange struct { + *v1.BlockQuerier + MinFp, MaxFp model.Fingerprint +} + type Store interface { - FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) + GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) + GetBlockQueriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) ([]BlockQuerierWithFingerprintRange, error) + GetBlockQueriersForBlockRefs(ctx context.Context, tenant string, blocks []BlockRef) ([]BlockQuerierWithFingerprintRange, error) Stop() } @@ -40,84 +49,41 @@ func (bs *BloomStore) Stop() { bs.shipper.Stop() } -func (bs *BloomStore) FilterChunkRefs(ctx context.Context, tenant string, from, through time.Time, chunkRefs []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) { - fingerprints := make([]uint64, 0, len(chunkRefs)) - for _, ref := range chunkRefs { - fingerprints = append(fingerprints, ref.Fingerprint) - } - - blooms, err := bs.queriers(ctx, tenant, from, through, fingerprints) - if err != nil { - return nil, err - } - - searches := convertLineFilterExpressions(filters) - - for _, ref := range chunkRefs { - refs, err := blooms.Filter(ctx, model.Fingerprint(ref.Fingerprint), convertToChunkRefs(ref.Refs), searches) - if err != nil { - return nil, err - } - ref.Refs = convertToShortRefs(refs) - } - return chunkRefs, nil +// GetBlockRefs implements Store +func (bs *BloomStore) GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) { + return bs.shipper.GetBlockRefs(ctx, tenant, from, through) } -func (bs *BloomStore) queriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) (*bloomQueriers, error) { - bf := newBloomFilters(1024) - err := bs.shipper.ForEachBlock(ctx, tenant, from, through, fingerprints, func(bq *v1.BlockQuerier) error { - bf.queriers = append(bf.queriers, bq) +// GetQueriersForBlocks implements Store +func (bs *BloomStore) GetBlockQueriersForBlockRefs(ctx context.Context, tenant string, blocks []BlockRef) ([]BlockQuerierWithFingerprintRange, error) { + bqs := make([]BlockQuerierWithFingerprintRange, 0, 32) + err := bs.shipper.Fetch(ctx, tenant, blocks, func(bq *v1.BlockQuerier, minFp uint64, maxFp uint64) error { + bqs = append(bqs, BlockQuerierWithFingerprintRange{ + BlockQuerier: bq, + MinFp: model.Fingerprint(minFp), + MaxFp: model.Fingerprint(maxFp), + }) return nil }) - return bf, err -} - -func convertLineFilterExpressions(filters []*logproto.LineFilterExpression) [][]byte { - searches := make([][]byte, len(filters)) - for _, f := range filters { - searches = append(searches, []byte(f.Match)) - } - return searches -} - -// convertToShortRefs converts a v1.ChunkRefs into []*logproto.ShortRef -// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. -func convertToShortRefs(refs v1.ChunkRefs) []*logproto.ShortRef { - result := make([]*logproto.ShortRef, len(refs)) - for _, ref := range refs { - result = append(result, &logproto.ShortRef{From: ref.Start, Through: ref.End, Checksum: ref.Checksum}) - } - return result -} - -// convertToChunkRefs converts a []*logproto.ShortRef into v1.ChunkRefs -// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. -func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs { - result := make(v1.ChunkRefs, len(refs)) - for _, ref := range refs { - result = append(result, v1.ChunkRef{Start: ref.From, End: ref.Through, Checksum: ref.Checksum}) - } - return result -} - -type bloomQueriers struct { - queriers []*v1.BlockQuerier -} - -func newBloomFilters(size int) *bloomQueriers { - return &bloomQueriers{ - queriers: make([]*v1.BlockQuerier, size), - } + sort.Slice(bqs, func(i, j int) bool { + return bqs[i].MinFp < bqs[j].MinFp + }) + return bqs, err } -func (bf *bloomQueriers) Filter(_ context.Context, fp model.Fingerprint, chunkRefs v1.ChunkRefs, filters [][]byte) (v1.ChunkRefs, error) { - result := make(v1.ChunkRefs, len(chunkRefs)) - for _, bq := range bf.queriers { - refs, err := bq.CheckChunksForSeries(fp, chunkRefs, filters) - if err != nil { - return nil, err - } - result = append(result, refs...) - } - return result, nil +// BlockQueriers implements Store +func (bs *BloomStore) GetBlockQueriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) ([]BlockQuerierWithFingerprintRange, error) { + bqs := make([]BlockQuerierWithFingerprintRange, 0, 32) + err := bs.shipper.ForEachBlock(ctx, tenant, from, through, fingerprints, func(bq *v1.BlockQuerier, minFp uint64, maxFp uint64) error { + bqs = append(bqs, BlockQuerierWithFingerprintRange{ + BlockQuerier: bq, + MinFp: model.Fingerprint(minFp), + MaxFp: model.Fingerprint(maxFp), + }) + return nil + }) + sort.Slice(bqs, func(i, j int) bool { + return bqs[i].MinFp < bqs[j].MinFp + }) + return bqs, err }