Skip to content

Commit

Permalink
Change task merge iterator to also filter chunkrefs by day
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 16, 2023
1 parent 455e8ee commit 65a732d
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 15 deletions.
86 changes: 84 additions & 2 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bloomgateway

import (
"sort"
"time"

"github.com/oklog/ulid"
Expand All @@ -10,6 +11,10 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

const (
Day = 24 * time.Hour
)

// Task is the data structure that is enqueued to the internal queue and dequeued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
Expand Down Expand Up @@ -60,6 +65,80 @@ func (t Task) CopyWithRequest(req *logproto.FilterChunkRefRequest) Task {
}
}

type filterGroupedChunkRefsByDay struct {
day time.Time
}

func (cf filterGroupedChunkRefsByDay) contains(a *logproto.GroupedChunkRefs) bool {
from, through := getFromThrough(a.Refs)
if from.Time().After(cf.day.Add(Day)) || through.Time().Before(cf.day) {
return false
}
return true
}

func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
min := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(cf.day) >= 0 && start.Compare(cf.day.Add(Day)) < 0
})
max := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(cf.day.Add(Day)) >= 0
})
return &logproto.GroupedChunkRefs{
Tenant: a.Tenant,
Fingerprint: a.Fingerprint,
Refs: a.Refs[min:max],
}
}

func (t Task) ChunkIterForDay(day time.Time) v1.PeekingIterator[*logproto.GroupedChunkRefs] {
cf := filterGroupedChunkRefsByDay{day: day}
iter := &FilterIter[*logproto.GroupedChunkRefs]{
iter: v1.NewSliceIter(t.Request.Refs),
predicate: cf.contains,
transform: cf.filter,
}
return v1.NewPeekingIter[*logproto.GroupedChunkRefs](iter)
}

type Predicate[T any] func(a T) bool
type Transform[T any] func(a T) T

type FilterIter[T any] struct {
iter v1.Iterator[T]
predicate Predicate[T]
transform Transform[T]
cache T
zero T // zero value of the return type of Next()
}

func (it *FilterIter[T]) Next() bool {
next := it.iter.Next()
if !next {
it.cache = it.zero
return false
}
for next && !it.predicate(it.iter.At()) {
next = it.iter.Next()
if !next {
it.cache = it.zero
return false
}
}
it.cache = it.transform(it.iter.At())
return true
}

func (it *FilterIter[T]) At() T {
return it.cache
}

func (it *FilterIter[T]) Err() error {
return nil
}

// FilterRequest extends v1.Request with an error channel
type FilterRequest struct {
v1.Request
Expand All @@ -71,13 +150,15 @@ type taskMergeIterator struct {
curr FilterRequest
heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day time.Time
err error
}

func newTaskMergeIterator(tasks ...Task) *taskMergeIterator {
func newTaskMergeIterator(day time.Time, tasks ...Task) *taskMergeIterator {
it := &taskMergeIterator{
tasks: tasks,
curr: FilterRequest{},
day: day,
}
it.init()
return it
Expand All @@ -86,7 +167,8 @@ func newTaskMergeIterator(tasks ...Task) *taskMergeIterator {
func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
sequences = append(sequences, NewIterWithIndex(it.tasks[i].Request.Refs, i))
iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
sequences = append(sequences, iter)
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool {
Expand Down
7 changes: 4 additions & 3 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestTask(t *testing.T) {
func TestTaskMergeIterator(t *testing.T) {
// Thu Nov 09 2023 10:56:50 UTC
ts := model.TimeFromUnix(1699523810)
day := getDayTime(ts)
tenant := "fake"

t.Run("empty requests result in empty iterator", func(t *testing.T) {
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestTaskMergeIterator(t *testing.T) {
t3, _, _, err := NewTask(tenant, r3)
require.NoError(t, err)

it := newTaskMergeIterator(t1, t2, t3)
it := newTaskMergeIterator(day, t1, t2, t3)
require.NotNil(t, it.heap)
// nothing to iterate over
require.False(t, it.Next())
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestTaskMergeIterator(t *testing.T) {
t3, _, _, err := NewTask(tenant, r3)
require.NoError(t, err)

it := newTaskMergeIterator(t1, t2, t3)
it := newTaskMergeIterator(day, t1, t2, t3)
require.NotNil(t, it.heap)

// first item
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestTaskMergeIterator(t *testing.T) {
t3, _, _, err := NewTask(tenant, r3)
require.NoError(t, err)

it := newTaskMergeIterator(t1, t2, t3)
it := newTaskMergeIterator(day, t1, t2, t3)
require.NotNil(t, it.heap)

checksums := []uint32{100, 200, 300}
Expand Down
35 changes: 33 additions & 2 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,34 @@ type IndexedValue[T any] struct {
val T
}

// SliceIterWithIndex implements v1.PeekingIterator
type IterWithIndex[T any] struct {
v1.PeekingIterator[T]
zero T // zero value of T
cache IndexedValue[T]
}

func (it *IterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.PeekingIterator.At()
return it.cache
}

func (it *IterWithIndex[T]) Peek() (IndexedValue[T], bool) {
peek, ok := it.PeekingIterator.Peek()
if !ok {
it.cache.val = it.zero
return it.cache, false
}
it.cache.val = peek
return it.cache, true
}

func NewIterWithIndex[T any](iter v1.PeekingIterator[T], idx int) v1.PeekingIterator[IndexedValue[T]] {
return &IterWithIndex[T]{
PeekingIterator: iter,
cache: IndexedValue[T]{idx: idx},
}
}

type SliceIterWithIndex[T any] struct {
xs []T // source slice
pos int // position within the slice
Expand Down Expand Up @@ -45,7 +72,7 @@ func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) {
return it.cache, true
}

func NewIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] {
func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] {
return &SliceIterWithIndex[T]{
xs: xs,
pos: -1,
Expand Down Expand Up @@ -110,6 +137,10 @@ func filterRequestForDay(r *logproto.FilterChunkRefRequest, day time.Time) *logp
}
}

// TODO(chaudum): Fix Through time calculation
// getFromThrough assumes a list of ShortRefs sorted by From time
// However, it does also assume that the last item has the highest
// Through time, which might not be the case!
func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) {
if len(refs) == 0 {
return model.Earliest, model.Latest
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestSliceIterWithIndex(t *testing.T) {
t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) {
xs := []string{"a", "b", "c"}
it := NewIterWithIndex(xs, 123)
it := NewSliceIterWithIndex(xs, 123)

// peek at first item
p, ok := it.Peek()
Expand Down
11 changes: 4 additions & 7 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,8 @@ func (w *worker) running(ctx context.Context) error {
if fromDay.Equal(throughDay) {
tasksPerDay[fromDay] = append(tasksPerDay[fromDay], task)
} else {
// split task into separate tasks per day
for i := fromDay; i.Before(throughDay); i = i.Add(24 * time.Hour) {
r := filterRequestForDay(task.Request, i)
t := task.CopyWithRequest(r)
tasksPerDay[i] = append(tasksPerDay[i], t)
tasksPerDay[i] = append(tasksPerDay[i], task)
}
}
}
Expand All @@ -156,7 +153,7 @@ func (w *worker) running(ctx context.Context) error {
logger := log.With(w.logger, "day", day)
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))

it := newTaskMergeIterator(tasks...)
it := newTaskMergeIterator(day, tasks...)

fingerprints = fingerprints[:0]
for it.Next() {
Expand Down Expand Up @@ -204,13 +201,13 @@ func (w *worker) running(ctx context.Context) error {
}

hasNext := it.Next()
for _, blockQuerier := range blockQueriers {
for i, blockQuerier := range blockQueriers {
requests = requests[:0]
for hasNext && it.At().Fp <= blockQuerier.MaxFp {
requests = append(requests, it.At().Request)
hasNext = it.Next()
}
// level.Debug(logger).Log("msg", "processing block", "block", i+1, "of", len(bqs), "requests", len(requests))
level.Debug(logger).Log("msg", "processing block", "block", i+1, "of", len(blockQueriers), "requests", len(requests))
// no fingerprints in the fingerprint range of the current block
if len(requests) == 0 {
continue
Expand Down

0 comments on commit 65a732d

Please sign in to comment.