Skip to content

Commit

Permalink
Fix filter test
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 13, 2023
1 parent 58e3bf0 commit 470dfc0
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 110 deletions.
48 changes: 26 additions & 22 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,22 +312,26 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
})

response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
responseCount := 0
for {
select {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "waiting for results")
case err := <-errCh:
return nil, errors.Wrap(err, "waiting for results")
case res := <-resCh:
responseCount++
// log line is helpful for debugging tests
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", res.Fp, "refs", res.Chks.Len())
response = append(response, &logproto.GroupedChunkRefs{
Tenant: tenantID,
Fingerprint: uint64(res.Fp),
Refs: convertToShortRefs(res.Chks),
})
// level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", res.Fp, "chunks", res.Chks.Len(), "progress", fmt.Sprintf("%d/%d", responseCount, len(req.Refs)))
if res.Chks.Len() > 0 {
response = append(response, &logproto.GroupedChunkRefs{
Tenant: tenantID,
Fingerprint: uint64(res.Fp),
Refs: convertToShortRefs(res.Chks),
})
}
// wait for all parts of the full response
if len(response) == len(req.Refs) {
if responseCount == len(req.Refs) {
return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil
}
}
Expand Down Expand Up @@ -391,7 +395,6 @@ type worker struct {
tasks *pendingTasks
logger log.Logger
metrics *workerMetrics
rp RequestPool
}

func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store bloomshipper.Store, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker {
Expand All @@ -405,13 +408,6 @@ func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store blo
metrics: metrics,
}
w.Service = services.NewBasicService(w.starting, w.running, w.stopping).WithName(id)
w.rp = RequestPool{
Pool: sync.Pool{
New: func() any {
return make([]v1.Request, 0, 1024)
},
},
}
return w
}

Expand All @@ -424,6 +420,9 @@ func (w *worker) starting(_ context.Context) error {
func (w *worker) running(ctx context.Context) error {
idx := queue.StartIndexWithLocalQueue

requests := make([]v1.Request, 0, 128)
fingerprints := make([]uint64, 0, 1024)

for ctx.Err() == nil {
taskCtx := context.Background()
dequeueStart := time.Now()
Expand Down Expand Up @@ -475,7 +474,7 @@ func (w *worker) running(ctx context.Context) error {

it := newTaskMergeIterator(tasks...)

fingerprints := make([]uint64, 0, 1024)
fingerprints = fingerprints[:0]
for it.Next() {
// fingerprints are already sorted. we can skip duplicates by checking
// if the next is greater than the previous
Expand All @@ -493,7 +492,7 @@ func (w *worker) running(ctx context.Context) error {
// TODO(chaudum): Add API that allows to process blocks as soon as they become available.
// This will require to change the taskMergeIterator to a slice of requests so we can seek
// to the appropriate fingerprint range within the slice that matches the block's fingerprint range.
bqs, err := w.store.GetBlockQueriers(taskCtx, tasks[0].Tenant, day, day.Add(24*time.Hour), fingerprints)
bqs, err := w.store.GetBlockQueriers(taskCtx, tasks[0].Tenant, day, day.Add(24*time.Hour).Add(-1*time.Nanosecond), fingerprints)
w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockQueriers").Observe(time.Since(storeFetchStart).Seconds())
if err != nil {
for _, t := range tasks {
Expand Down Expand Up @@ -521,13 +520,13 @@ func (w *worker) running(ctx context.Context) error {
}

hasNext := it.Next()
requests := w.rp.Get()
for _, bq := range bqs {
requests = requests[:0]
for hasNext && it.At().Fp <= bq.MaxFp {
requests = append(requests, it.At().Request)
hasNext = it.Next()
}
// level.Debug(logger).Log("msg", "processing block", "block", i+1, "of", len(bqs), "requests", len(requests))
// no fingerprints in the fingerprint range of the current block
if len(requests) == 0 {
continue
Expand All @@ -538,13 +537,18 @@ func (w *worker) running(ctx context.Context) error {
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to run chunk check")
}
// return slice back to pool
w.rp.Put(requests)
continue
}
}
// return slice back to pool
w.rp.Put(requests)

for hasNext {
level.Warn(logger).Log("msg", "processing remaining fingerprint", "fp", it.At().Fp)
it.At().Response <- v1.Output{
Fp: it.At().Fp,
Chks: it.At().Chks,
}
hasNext = it.Next()
}
}
}
return ctx.Err()
Expand Down
135 changes: 74 additions & 61 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -261,8 +262,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
require.NoError(t, err)

ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
now := model.TimeFromUnix(ts.Unix())

// replace store implementation and re-initialize workers and sub-services
gw.bloomStore = newMockBloomStore(t)
bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024)
gw.bloomStore = newMockBloomStore(bqs)
err = gw.initServices()
require.NoError(t, err)

Expand All @@ -273,51 +278,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.NoError(t, err)
})

ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
now := model.TimeFromUnix(ts.Unix())

chunkRefs := []*logproto.ChunkRef{
{
Fingerprint: 100,
UserID: tenantID,
From: now.Add(-24 * time.Hour),
Through: now.Add(-23 * time.Hour),
Checksum: 1,
},
{
Fingerprint: 100,
UserID: tenantID,
From: now.Add(-23 * time.Hour),
Through: now.Add(-22 * time.Hour),
Checksum: 2,
},
{
Fingerprint: 500,
UserID: tenantID,
From: now.Add(-22 * time.Hour),
Through: now.Add(-21 * time.Hour),
Checksum: 3,
},
{
Fingerprint: 1000,
UserID: tenantID,
From: now.Add(-20 * time.Hour),
Through: now.Add(-19 * time.Hour),
Checksum: 4,
},
{
Fingerprint: 1001,
UserID: tenantID,
From: now.Add(-19 * time.Hour),
Through: now.Add(-18 * time.Hour),
Checksum: 5,
},
}
chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)
inputChunkRefs := groupRefs(t, chunkRefs)

t.Run("no match - return filtered", func(t *testing.T) {
t.Run("no match - return empty response", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
From: now.Add(-8 * time.Hour),
Through: now,
Refs: inputChunkRefs,
Filters: []*logproto.LineFilterExpression{
Expand All @@ -329,50 +295,97 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.NoError(t, err)

expectedResponse := &logproto.FilterChunkRefResponse{
ChunkRefs: inputChunkRefs, // why does it return all chunks?
ChunkRefs: []*logproto.GroupedChunkRefs{},
}
require.Equal(t, expectedResponse, res)
})

t.Run("match - return unfiltered", func(t *testing.T) {
t.Run("match - return filtered", func(t *testing.T) {
// hack to get indexed key for a specific series
// the indexed key range for a series is defined as
// i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1
// where i is the nth series in a block
// fortunately, i is also used as Checksum for the single chunk of a series
// see mkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go
key := inputChunkRefs[0].Refs[0].Checksum*1000 + 500

req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
From: now.Add(-8 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Refs: inputChunkRefs,
Filters: []*logproto.LineFilterExpression{
// series with fingerprint 100 has 1000 keys
// range is from 100_000 to 100_999
{Operator: 1, Match: "100001"},
{Operator: 1, Match: fmt.Sprint(key)},
},
}
ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
require.NoError(t, err)

expectedResponse := &logproto.FilterChunkRefResponse{
ChunkRefs: inputChunkRefs, // why does it return all chunks?
ChunkRefs: inputChunkRefs[:1],
}
require.Equal(t, expectedResponse, res)
})

})
}

func newMockBloomStore(t *testing.T) *mockBloomStore {
return &mockBloomStore{t: t}
func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockQuerierWithFingerprintRange, [][]v1.SeriesWithBloom) {
t.Helper()
step := (maxFp - minFp) / model.Fingerprint(numBlocks)
bqs := make([]bloomshipper.BlockQuerierWithFingerprintRange, 0, numBlocks)
series := make([][]v1.SeriesWithBloom, 0, numBlocks)
for i := 0; i < numBlocks; i++ {
fromFp := minFp + (step * model.Fingerprint(i))
throughFp := fromFp + step - 1
// last block needs to include maxFp
if i == numBlocks-1 {
throughFp = maxFp
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
bq := bloomshipper.BlockQuerierWithFingerprintRange{
BlockQuerier: blockQuerier,
MinFp: fromFp,
MaxFp: throughFp,
}
bqs = append(bqs, bq)
series = append(series, data)
}
return bqs, series
}

func newMockBloomStore(bqs []bloomshipper.BlockQuerierWithFingerprintRange) *mockBloomStore {
return &mockBloomStore{bqs: bqs}
}

type mockBloomStore struct {
t *testing.T
bqs []bloomshipper.BlockQuerierWithFingerprintRange
}

func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, from, through time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) {
return []bloomshipper.BlockQuerierWithFingerprintRange{
{BlockQuerier: v1.MakeBlockQuerier(s.t, 0, 255, from.Unix(), through.Unix()), MinFp: 0, MaxFp: 255},
{BlockQuerier: v1.MakeBlockQuerier(s.t, 256, 511, from.Unix(), through.Unix()), MinFp: 256, MaxFp: 511},
{BlockQuerier: v1.MakeBlockQuerier(s.t, 512, 767, from.Unix(), through.Unix()), MinFp: 512, MaxFp: 767},
{BlockQuerier: v1.MakeBlockQuerier(s.t, 768, 1023, from.Unix(), through.Unix()), MinFp: 768, MaxFp: 1023},
}, nil
func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, _, _ time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) {
return s.bqs, nil
}

func (s *mockBloomStore) Stop() {}

func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.SeriesWithBloom, nthSeries int) []*logproto.ChunkRef {
t.Helper()
n := 0
res := make([]*logproto.ChunkRef, 0)
for i := range data {
for j := range data[i] {
if n%nthSeries == 0 {
chk := data[i][j].Series.Chunks[0]
res = append(res, &logproto.ChunkRef{
Fingerprint: uint64(data[i][j].Series.Fingerprint),
UserID: tenant,
From: chk.Start,
Through: chk.End,
Checksum: chk.Checksum,
})
}
n++
}
}
return res
}
23 changes: 10 additions & 13 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package bloomgateway

import (
"sync"
"time"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -50,18 +49,6 @@ func NewIterWithIndex[T any](i int, xs []T) *SliceIterWithIndex[T] {
}
}

type RequestPool struct {
sync.Pool
}

func (p *RequestPool) Get() []v1.Request {
return p.Pool.Get().([]v1.Request)
}

func (p *RequestPool) Put(r []v1.Request) {
p.Pool.Put(r[:0]) // nolint:staticcheck
}

func getDay(ts model.Time) int64 {
return ts.Unix() / int64(24*time.Hour/time.Second)
}
Expand Down Expand Up @@ -157,3 +144,13 @@ func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
}
return result
}

// getFirstLast returns the first and last item of a fingerprint slice
// It assumes an ascending sorted list of fingerprints.
func getFirstLast[T any](s []T) (T, T) {
var zero T
if len(s) == 0 {
return zero, zero
}
return s[0], s[len(s)-1]
}
Loading

0 comments on commit 470dfc0

Please sign in to comment.