From 470dfc02748eb3637ec0c0ef628e3595625d181e Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 10 Nov 2023 12:17:18 +0100 Subject: [PATCH] Fix filter test Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 48 ++++----- pkg/bloomgateway/bloomgateway_test.go | 135 ++++++++++++++------------ pkg/bloomgateway/util.go | 23 ++--- pkg/storage/bloom/v1/test_util.go | 25 +++-- 4 files changed, 121 insertions(+), 110 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index a20aafee64be8..c76383a60af46 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -312,6 +312,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk }) response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) + responseCount := 0 for { select { case <-ctx.Done(): @@ -319,15 +320,18 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk case err := <-errCh: return nil, errors.Wrap(err, "waiting for results") case res := <-resCh: + responseCount++ // log line is helpful for debugging tests - // level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", res.Fp, "refs", res.Chks.Len()) - response = append(response, &logproto.GroupedChunkRefs{ - Tenant: tenantID, - Fingerprint: uint64(res.Fp), - Refs: convertToShortRefs(res.Chks), - }) + // level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", res.Fp, "chunks", res.Chks.Len(), "progress", fmt.Sprintf("%d/%d", responseCount, len(req.Refs))) + if res.Chks.Len() > 0 { + response = append(response, &logproto.GroupedChunkRefs{ + Tenant: tenantID, + Fingerprint: uint64(res.Fp), + Refs: convertToShortRefs(res.Chks), + }) + } // wait for all parts of the full response - if len(response) == len(req.Refs) { + if responseCount == len(req.Refs) { return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil } } @@ -391,7 +395,6 @@ type worker struct { tasks *pendingTasks logger log.Logger metrics *workerMetrics - rp RequestPool } func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store bloomshipper.Store, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker { @@ -405,13 +408,6 @@ func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store blo metrics: metrics, } w.Service = services.NewBasicService(w.starting, w.running, w.stopping).WithName(id) - w.rp = RequestPool{ - Pool: sync.Pool{ - New: func() any { - return make([]v1.Request, 0, 1024) - }, - }, - } return w } @@ -424,6 +420,9 @@ func (w *worker) starting(_ context.Context) error { func (w *worker) running(ctx context.Context) error { idx := queue.StartIndexWithLocalQueue + requests := make([]v1.Request, 0, 128) + fingerprints := make([]uint64, 0, 1024) + for ctx.Err() == nil { taskCtx := context.Background() dequeueStart := time.Now() @@ -475,7 +474,7 @@ func (w *worker) running(ctx context.Context) error { it := newTaskMergeIterator(tasks...) - fingerprints := make([]uint64, 0, 1024) + fingerprints = fingerprints[:0] for it.Next() { // fingerprints are already sorted. we can skip duplicates by checking // if the next is greater than the previous @@ -493,7 +492,7 @@ func (w *worker) running(ctx context.Context) error { // TODO(chaudum): Add API that allows to process blocks as soon as they become available. // This will require to change the taskMergeIterator to a slice of requests so we can seek // to the appropriate fingerprint range within the slice that matches the block's fingerprint range. - bqs, err := w.store.GetBlockQueriers(taskCtx, tasks[0].Tenant, day, day.Add(24*time.Hour), fingerprints) + bqs, err := w.store.GetBlockQueriers(taskCtx, tasks[0].Tenant, day, day.Add(24*time.Hour).Add(-1*time.Nanosecond), fingerprints) w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockQueriers").Observe(time.Since(storeFetchStart).Seconds()) if err != nil { for _, t := range tasks { @@ -521,13 +520,13 @@ func (w *worker) running(ctx context.Context) error { } hasNext := it.Next() - requests := w.rp.Get() for _, bq := range bqs { requests = requests[:0] for hasNext && it.At().Fp <= bq.MaxFp { requests = append(requests, it.At().Request) hasNext = it.Next() } + // level.Debug(logger).Log("msg", "processing block", "block", i+1, "of", len(bqs), "requests", len(requests)) // no fingerprints in the fingerprint range of the current block if len(requests) == 0 { continue @@ -538,13 +537,18 @@ func (w *worker) running(ctx context.Context) error { for _, t := range tasks { t.ErrCh <- errors.Wrap(err, "failed to run chunk check") } - // return slice back to pool - w.rp.Put(requests) continue } } - // return slice back to pool - w.rp.Put(requests) + + for hasNext { + level.Warn(logger).Log("msg", "processing remaining fingerprint", "fp", it.At().Fp) + it.At().Response <- v1.Output{ + Fp: it.At().Fp, + Chks: it.At().Chks, + } + hasNext = it.Next() + } } } return ctx.Err() diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index b6004ba930520..376f00e531b35 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "fmt" "os" "testing" "time" @@ -261,8 +262,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) require.NoError(t, err) + ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") + now := model.TimeFromUnix(ts.Unix()) + // replace store implementation and re-initialize workers and sub-services - gw.bloomStore = newMockBloomStore(t) + bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024) + gw.bloomStore = newMockBloomStore(bqs) err = gw.initServices() require.NoError(t, err) @@ -273,51 +278,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.NoError(t, err) }) - ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") - now := model.TimeFromUnix(ts.Unix()) - - chunkRefs := []*logproto.ChunkRef{ - { - Fingerprint: 100, - UserID: tenantID, - From: now.Add(-24 * time.Hour), - Through: now.Add(-23 * time.Hour), - Checksum: 1, - }, - { - Fingerprint: 100, - UserID: tenantID, - From: now.Add(-23 * time.Hour), - Through: now.Add(-22 * time.Hour), - Checksum: 2, - }, - { - Fingerprint: 500, - UserID: tenantID, - From: now.Add(-22 * time.Hour), - Through: now.Add(-21 * time.Hour), - Checksum: 3, - }, - { - Fingerprint: 1000, - UserID: tenantID, - From: now.Add(-20 * time.Hour), - Through: now.Add(-19 * time.Hour), - Checksum: 4, - }, - { - Fingerprint: 1001, - UserID: tenantID, - From: now.Add(-19 * time.Hour), - Through: now.Add(-18 * time.Hour), - Checksum: 5, - }, - } + chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) inputChunkRefs := groupRefs(t, chunkRefs) - t.Run("no match - return filtered", func(t *testing.T) { + t.Run("no match - return empty response", func(t *testing.T) { req := &logproto.FilterChunkRefRequest{ - From: now.Add(-24 * time.Hour), + From: now.Add(-8 * time.Hour), Through: now, Refs: inputChunkRefs, Filters: []*logproto.LineFilterExpression{ @@ -329,20 +295,26 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.NoError(t, err) expectedResponse := &logproto.FilterChunkRefResponse{ - ChunkRefs: inputChunkRefs, // why does it return all chunks? + ChunkRefs: []*logproto.GroupedChunkRefs{}, } require.Equal(t, expectedResponse, res) }) - t.Run("match - return unfiltered", func(t *testing.T) { + t.Run("match - return filtered", func(t *testing.T) { + // hack to get indexed key for a specific series + // the indexed key range for a series is defined as + // i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1 + // where i is the nth series in a block + // fortunately, i is also used as Checksum for the single chunk of a series + // see mkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go + key := inputChunkRefs[0].Refs[0].Checksum*1000 + 500 + req := &logproto.FilterChunkRefRequest{ - From: now.Add(-24 * time.Hour), + From: now.Add(-8 * time.Hour), Through: now, - Refs: groupRefs(t, chunkRefs), + Refs: inputChunkRefs, Filters: []*logproto.LineFilterExpression{ - // series with fingerprint 100 has 1000 keys - // range is from 100_000 to 100_999 - {Operator: 1, Match: "100001"}, + {Operator: 1, Match: fmt.Sprint(key)}, }, } ctx := user.InjectOrgID(context.Background(), tenantID) @@ -350,7 +322,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.NoError(t, err) expectedResponse := &logproto.FilterChunkRefResponse{ - ChunkRefs: inputChunkRefs, // why does it return all chunks? + ChunkRefs: inputChunkRefs[:1], } require.Equal(t, expectedResponse, res) }) @@ -358,21 +330,62 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }) } -func newMockBloomStore(t *testing.T) *mockBloomStore { - return &mockBloomStore{t: t} +func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockQuerierWithFingerprintRange, [][]v1.SeriesWithBloom) { + t.Helper() + step := (maxFp - minFp) / model.Fingerprint(numBlocks) + bqs := make([]bloomshipper.BlockQuerierWithFingerprintRange, 0, numBlocks) + series := make([][]v1.SeriesWithBloom, 0, numBlocks) + for i := 0; i < numBlocks; i++ { + fromFp := minFp + (step * model.Fingerprint(i)) + throughFp := fromFp + step - 1 + // last block needs to include maxFp + if i == numBlocks-1 { + throughFp = maxFp + } + blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through) + bq := bloomshipper.BlockQuerierWithFingerprintRange{ + BlockQuerier: blockQuerier, + MinFp: fromFp, + MaxFp: throughFp, + } + bqs = append(bqs, bq) + series = append(series, data) + } + return bqs, series +} + +func newMockBloomStore(bqs []bloomshipper.BlockQuerierWithFingerprintRange) *mockBloomStore { + return &mockBloomStore{bqs: bqs} } type mockBloomStore struct { - t *testing.T + bqs []bloomshipper.BlockQuerierWithFingerprintRange } -func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, from, through time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) { - return []bloomshipper.BlockQuerierWithFingerprintRange{ - {BlockQuerier: v1.MakeBlockQuerier(s.t, 0, 255, from.Unix(), through.Unix()), MinFp: 0, MaxFp: 255}, - {BlockQuerier: v1.MakeBlockQuerier(s.t, 256, 511, from.Unix(), through.Unix()), MinFp: 256, MaxFp: 511}, - {BlockQuerier: v1.MakeBlockQuerier(s.t, 512, 767, from.Unix(), through.Unix()), MinFp: 512, MaxFp: 767}, - {BlockQuerier: v1.MakeBlockQuerier(s.t, 768, 1023, from.Unix(), through.Unix()), MinFp: 768, MaxFp: 1023}, - }, nil +func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, _, _ time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) { + return s.bqs, nil } func (s *mockBloomStore) Stop() {} + +func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.SeriesWithBloom, nthSeries int) []*logproto.ChunkRef { + t.Helper() + n := 0 + res := make([]*logproto.ChunkRef, 0) + for i := range data { + for j := range data[i] { + if n%nthSeries == 0 { + chk := data[i][j].Series.Chunks[0] + res = append(res, &logproto.ChunkRef{ + Fingerprint: uint64(data[i][j].Series.Fingerprint), + UserID: tenant, + From: chk.Start, + Through: chk.End, + Checksum: chk.Checksum, + }) + } + n++ + } + } + return res +} diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index aa2712b99af6c..4796cbe0e2530 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -1,7 +1,6 @@ package bloomgateway import ( - "sync" "time" "github.com/prometheus/common/model" @@ -50,18 +49,6 @@ func NewIterWithIndex[T any](i int, xs []T) *SliceIterWithIndex[T] { } } -type RequestPool struct { - sync.Pool -} - -func (p *RequestPool) Get() []v1.Request { - return p.Pool.Get().([]v1.Request) -} - -func (p *RequestPool) Put(r []v1.Request) { - p.Pool.Put(r[:0]) // nolint:staticcheck -} - func getDay(ts model.Time) int64 { return ts.Unix() / int64(24*time.Hour/time.Second) } @@ -157,3 +144,13 @@ func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs { } return result } + +// getFirstLast returns the first and last item of a fingerprint slice +// It assumes an ascending sorted list of fingerprints. +func getFirstLast[T any](s []T) (T, T) { + var zero T + if len(s) == 0 { + return zero, zero + } + return s[0], s[len(s)-1] +} diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index 19af66299f8de..215ecaffe177e 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "testing" + "time" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -12,7 +13,7 @@ import ( "github.com/grafana/loki/pkg/storage/bloom/v1/filter" ) -func MakeBlockQuerier(t testing.TB, fromFp, throughFp uint64, fromTs, throughTs int64) *BlockQuerier { +func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (*BlockQuerier, []SeriesWithBloom) { // references for linking in memory reader+writer indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) @@ -20,14 +21,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp uint64, fromTs, throughTs reader := NewByteReader(indexBuf, bloomsBuf) numSeries := int(throughFp - fromFp) numKeysPerSeries := 1000 - data, _ := mkBasicSeriesWithBlooms( - numSeries, - numKeysPerSeries, - model.Fingerprint(fromFp), - model.Fingerprint(throughFp), - model.TimeFromUnix(fromTs), - model.TimeFromUnix(throughTs), - ) + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFp, throughFp, fromTs, throughTs) builder, err := NewBlockBuilder( BlockOptions{ @@ -45,21 +39,24 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp uint64, fromTs, throughTs _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader) - return NewBlockQuerier(block) + return NewBlockQuerier(block), data } func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) { seriesList = make([]SeriesWithBloom, 0, nSeries) keysList = make([][][]byte, 0, nSeries) + + step := (throughFp - fromFp) / model.Fingerprint(nSeries) + timeDelta := time.Duration(throughTs.Sub(fromTs).Nanoseconds() / int64(nSeries)) + for i := 0; i < nSeries; i++ { var series Series - step := (throughFp - fromFp) / (model.Fingerprint(nSeries)) series.Fingerprint = fromFp + model.Fingerprint(i)*step - timeDelta := fromTs + (throughTs-fromTs)/model.Time(nSeries)*model.Time(i) + from := fromTs.Add(timeDelta * time.Duration(i)) series.Chunks = []ChunkRef{ { - Start: fromTs + timeDelta*model.Time(i), - End: fromTs + timeDelta*model.Time(i), + Start: from, + End: from.Add(timeDelta), Checksum: uint32(i), }, }