diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 614e3e048506c..5d5575cd03a60 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -1,6 +1,7 @@ package bloomgateway import ( + "sort" "time" "github.com/oklog/ulid" @@ -10,6 +11,10 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) +const ( + Day = 24 * time.Hour +) + // Task is the data structure that is enqueued to the internal queue and dequeued by query workers type Task struct { // ID is a lexcographically sortable unique identifier of the task @@ -60,6 +65,80 @@ func (t Task) CopyWithRequest(req *logproto.FilterChunkRefRequest) Task { } } +type filterGroupedChunkRefsByDay struct { + day time.Time +} + +func (cf filterGroupedChunkRefsByDay) contains(a *logproto.GroupedChunkRefs) bool { + from, through := getFromThrough(a.Refs) + if from.Time().After(cf.day.Add(Day)) || through.Time().Before(cf.day) { + return false + } + return true +} + +func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { + min := sort.Search(len(a.Refs), func(i int) bool { + start := a.Refs[i].From.Time() + return start.Compare(cf.day) >= 0 && start.Compare(cf.day.Add(Day)) < 0 + }) + max := sort.Search(len(a.Refs), func(i int) bool { + start := a.Refs[i].From.Time() + return start.Compare(cf.day.Add(Day)) >= 0 + }) + return &logproto.GroupedChunkRefs{ + Tenant: a.Tenant, + Fingerprint: a.Fingerprint, + Refs: a.Refs[min:max], + } +} + +func (t Task) ChunkIterForDay(day time.Time) v1.PeekingIterator[*logproto.GroupedChunkRefs] { + cf := filterGroupedChunkRefsByDay{day: day} + iter := &FilterIter[*logproto.GroupedChunkRefs]{ + iter: v1.NewSliceIter(t.Request.Refs), + predicate: cf.contains, + transform: cf.filter, + } + return v1.NewPeekingIter[*logproto.GroupedChunkRefs](iter) +} + +type Predicate[T any] func(a T) bool +type Transform[T any] func(a T) T + +type FilterIter[T any] struct { + iter v1.Iterator[T] + predicate Predicate[T] + transform Transform[T] + cache T + zero T // zero value of the return type of Next() +} + +func (it *FilterIter[T]) Next() bool { + next := it.iter.Next() + if !next { + it.cache = it.zero + return false + } + for next && !it.predicate(it.iter.At()) { + next = it.iter.Next() + if !next { + it.cache = it.zero + return false + } + } + it.cache = it.transform(it.iter.At()) + return true +} + +func (it *FilterIter[T]) At() T { + return it.cache +} + +func (it *FilterIter[T]) Err() error { + return nil +} + // FilterRequest extends v1.Request with an error channel type FilterRequest struct { v1.Request @@ -71,13 +150,15 @@ type taskMergeIterator struct { curr FilterRequest heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]] tasks []Task + day time.Time err error } -func newTaskMergeIterator(tasks ...Task) *taskMergeIterator { +func newTaskMergeIterator(day time.Time, tasks ...Task) *taskMergeIterator { it := &taskMergeIterator{ tasks: tasks, curr: FilterRequest{}, + day: day, } it.init() return it @@ -86,7 +167,8 @@ func newTaskMergeIterator(tasks ...Task) *taskMergeIterator { func (it *taskMergeIterator) init() { sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) for i := range it.tasks { - sequences = append(sequences, NewIterWithIndex(it.tasks[i].Request.Refs, i)) + iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) + sequences = append(sequences, iter) } it.heap = v1.NewHeapIterator( func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool { diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index 4e572c1280fa8..dc04522e13d2f 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -28,6 +28,7 @@ func TestTask(t *testing.T) { func TestTaskMergeIterator(t *testing.T) { // Thu Nov 09 2023 10:56:50 UTC ts := model.TimeFromUnix(1699523810) + day := getDayTime(ts) tenant := "fake" t.Run("empty requests result in empty iterator", func(t *testing.T) { @@ -55,7 +56,7 @@ func TestTaskMergeIterator(t *testing.T) { t3, _, _, err := NewTask(tenant, r3) require.NoError(t, err) - it := newTaskMergeIterator(t1, t2, t3) + it := newTaskMergeIterator(day, t1, t2, t3) require.NotNil(t, it.heap) // nothing to iterate over require.False(t, it.Next()) @@ -101,7 +102,7 @@ func TestTaskMergeIterator(t *testing.T) { t3, _, _, err := NewTask(tenant, r3) require.NoError(t, err) - it := newTaskMergeIterator(t1, t2, t3) + it := newTaskMergeIterator(day, t1, t2, t3) require.NotNil(t, it.heap) // first item @@ -169,7 +170,7 @@ func TestTaskMergeIterator(t *testing.T) { t3, _, _, err := NewTask(tenant, r3) require.NoError(t, err) - it := newTaskMergeIterator(t1, t2, t3) + it := newTaskMergeIterator(day, t1, t2, t3) require.NotNil(t, it.heap) checksums := []uint32{100, 200, 300} diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index d55abf5b753fa..39d4e370da7c7 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -14,7 +14,34 @@ type IndexedValue[T any] struct { val T } -// SliceIterWithIndex implements v1.PeekingIterator +type IterWithIndex[T any] struct { + v1.PeekingIterator[T] + zero T // zero value of T + cache IndexedValue[T] +} + +func (it *IterWithIndex[T]) At() IndexedValue[T] { + it.cache.val = it.PeekingIterator.At() + return it.cache +} + +func (it *IterWithIndex[T]) Peek() (IndexedValue[T], bool) { + peek, ok := it.PeekingIterator.Peek() + if !ok { + it.cache.val = it.zero + return it.cache, false + } + it.cache.val = peek + return it.cache, true +} + +func NewIterWithIndex[T any](iter v1.PeekingIterator[T], idx int) v1.PeekingIterator[IndexedValue[T]] { + return &IterWithIndex[T]{ + PeekingIterator: iter, + cache: IndexedValue[T]{idx: idx}, + } +} + type SliceIterWithIndex[T any] struct { xs []T // source slice pos int // position within the slice @@ -45,7 +72,7 @@ func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) { return it.cache, true } -func NewIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] { +func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] { return &SliceIterWithIndex[T]{ xs: xs, pos: -1, @@ -110,6 +137,10 @@ func filterRequestForDay(r *logproto.FilterChunkRefRequest, day time.Time) *logp } } +// TODO(chaudum): Fix Through time calculation +// getFromThrough assumes a list of ShortRefs sorted by From time +// However, it does also assume that the last item has the highest +// Through time, which might not be the case! func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) { if len(refs) == 0 { return model.Earliest, model.Latest diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 30b31b461d637..8f200ebfbb296 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -13,7 +13,7 @@ import ( func TestSliceIterWithIndex(t *testing.T) { t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) { xs := []string{"a", "b", "c"} - it := NewIterWithIndex(xs, 123) + it := NewSliceIterWithIndex(xs, 123) // peek at first item p, ok := it.Peek() diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index c08b82ddf759b..4b7e6542e1dfa 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -143,11 +143,8 @@ func (w *worker) running(ctx context.Context) error { if fromDay.Equal(throughDay) { tasksPerDay[fromDay] = append(tasksPerDay[fromDay], task) } else { - // split task into separate tasks per day for i := fromDay; i.Before(throughDay); i = i.Add(24 * time.Hour) { - r := filterRequestForDay(task.Request, i) - t := task.CopyWithRequest(r) - tasksPerDay[i] = append(tasksPerDay[i], t) + tasksPerDay[i] = append(tasksPerDay[i], task) } } } @@ -156,7 +153,7 @@ func (w *worker) running(ctx context.Context) error { logger := log.With(w.logger, "day", day) level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks)) - it := newTaskMergeIterator(tasks...) + it := newTaskMergeIterator(day, tasks...) fingerprints = fingerprints[:0] for it.Next() { @@ -204,13 +201,13 @@ func (w *worker) running(ctx context.Context) error { } hasNext := it.Next() - for _, blockQuerier := range blockQueriers { + for i, blockQuerier := range blockQueriers { requests = requests[:0] for hasNext && it.At().Fp <= blockQuerier.MaxFp { requests = append(requests, it.At().Request) hasNext = it.Next() } - // level.Debug(logger).Log("msg", "processing block", "block", i+1, "of", len(bqs), "requests", len(requests)) + level.Debug(logger).Log("msg", "processing block", "block", i+1, "of", len(blockQueriers), "requests", len(requests)) // no fingerprints in the fingerprint range of the current block if len(requests) == 0 { continue