diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index cf5fcbd6172b5..efad0e77a26bf 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -60,6 +60,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/queue" "github.com/grafana/loki/pkg/storage" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/util" @@ -300,8 +301,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk g.pendingTasks.Add(task.ID, task) }) - response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) - responseCount := 0 + requestCount := len(req.Refs) + // TODO(chaudum): Use pool + responses := make([]v1.Output, 0, requestCount) + for { select { case <-ctx.Done(): @@ -309,19 +312,47 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk case err := <-errCh: return nil, errors.Wrap(err, "waiting for results") case res := <-resCh: - responseCount++ + responses = append(responses, res) // log line is helpful for debugging tests - // level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", res.Fp, "chunks", res.Chks.Len(), "progress", fmt.Sprintf("%d/%d", responseCount, len(req.Refs))) - if res.Chks.Len() > 0 { - response = append(response, &logproto.GroupedChunkRefs{ - Tenant: tenantID, - Fingerprint: uint64(res.Fp), - Refs: convertToShortRefs(res.Chks), - }) - } + // level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response - if responseCount == len(req.Refs) { - return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil + if len(responses) == requestCount { + for _, o := range responses { + // we must not remove items from req.Refs as long as the worker may iterater over them + g.removeNotMatchingChunks(req, o) + } + return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + } + } + } +} + +func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) { + // binary search index of fingerprint + idx := sort.Search(len(req.Refs), func(i int) bool { + return req.Refs[i].Fingerprint >= uint64(res.Fp) + }) + + // fingerprint not found + if idx >= len(req.Refs) { + level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) + return + } + + // if all chunks of a fingerprint are are removed + // then remove the whole group from the response + if len(req.Refs[idx].Refs) == res.Removals.Len() { + req.Refs[idx] = nil // avoid leaking pointer + req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...) + return + } + + for i := range res.Removals { + toRemove := res.Removals[i] + for j := range req.Refs[idx].Refs { + if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum { + req.Refs[idx].Refs[j] = nil // avoid leaking pointer + req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...) } } } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 7de910de0ed44..0106c043f6540 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -257,9 +257,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }) chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) - inputChunkRefs := groupRefs(t, chunkRefs) t.Run("no match - return empty response", func(t *testing.T) { + inputChunkRefs := groupRefs(t, chunkRefs) req := &logproto.FilterChunkRefRequest{ From: now.Add(-8 * time.Hour), Through: now, @@ -279,6 +279,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }) t.Run("match - return filtered", func(t *testing.T) { + inputChunkRefs := groupRefs(t, chunkRefs) // hack to get indexed key for a specific series // the indexed key range for a series is defined as // i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1 diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 0e8d8e052958a..327f481404737 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -54,14 +54,12 @@ func (t Task) Bounds() (time.Time, time.Time) { return getDayTime(t.Request.From), getDayTime(t.Request.Through) } -// CopyWithRequest returns a copy of the original task, but with newly provided request -func (t Task) CopyWithRequest(req *logproto.FilterChunkRefRequest) Task { - return Task{ - ID: t.ID, - Tenant: t.Tenant, - Request: req, - ErrCh: t.ErrCh, - ResCh: t.ResCh, +func (t Task) ChunkIterForDay(day time.Time) v1.Iterator[*logproto.GroupedChunkRefs] { + cf := filterGroupedChunkRefsByDay{day: day} + return &FilterIter[*logproto.GroupedChunkRefs]{ + iter: v1.NewSliceIter(t.Request.Refs), + matches: cf.contains, + transform: cf.filter, } } @@ -81,7 +79,7 @@ func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logp minTs, maxTs := getFromThrough(a.Refs) // in most cases, all chunks are within day range - if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(24*time.Hour)) { + if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(Day)) { return a } @@ -89,11 +87,13 @@ func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logp // using binary search to get min and max index of chunks that fall into the day range min := sort.Search(len(a.Refs), func(i int) bool { start := a.Refs[i].From.Time() - return start.Compare(cf.day) >= 0 && start.Compare(cf.day.Add(Day)) < 0 + end := a.Refs[i].Through.Time() + return start.Compare(cf.day) >= 0 || end.Compare(cf.day) >= 0 }) + max := sort.Search(len(a.Refs), func(i int) bool { start := a.Refs[i].From.Time() - return start.Compare(cf.day.Add(Day)) >= 0 + return start.Compare(cf.day.Add(Day)) > 0 }) return &logproto.GroupedChunkRefs{ @@ -103,22 +103,12 @@ func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logp } } -func (t Task) ChunkIterForDay(day time.Time) v1.PeekingIterator[*logproto.GroupedChunkRefs] { - cf := filterGroupedChunkRefsByDay{day: day} - iter := &FilterIter[*logproto.GroupedChunkRefs]{ - iter: v1.NewSliceIter(t.Request.Refs), - predicate: cf.contains, - transform: cf.filter, - } - return v1.NewPeekingIter[*logproto.GroupedChunkRefs](iter) -} - type Predicate[T any] func(a T) bool type Transform[T any] func(a T) T type FilterIter[T any] struct { iter v1.Iterator[T] - predicate Predicate[T] + matches Predicate[T] transform Transform[T] cache T zero T // zero value of the return type of Next() @@ -130,7 +120,7 @@ func (it *FilterIter[T]) Next() bool { it.cache = it.zero return false } - for next && !it.predicate(it.iter.At()) { + for next && !it.matches(it.iter.At()) { next = it.iter.Next() if !next { it.cache = it.zero @@ -178,7 +168,7 @@ func (it *taskMergeIterator) init() { sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) for i := range it.tasks { iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) - sequences = append(sequences, iter) + sequences = append(sequences, v1.NewPeekingIter(iter)) } it.heap = v1.NewHeapIterator( func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool { diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index dc04522e13d2f..af740715c22fb 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -184,3 +184,73 @@ func TestTaskMergeIterator(t *testing.T) { } }) } + +func TestChunkIterForDay(t *testing.T) { + tenant := "fake" + + // Thu Nov 09 2023 10:56:50 UTC + ts := model.TimeFromUnix(1699523810) + + t.Run("filter chunk refs that fall into the day range", func(t *testing.T) { + input := &logproto.FilterChunkRefRequest{ + From: ts.Add(-168 * time.Hour), // 1w ago + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100}, + {From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101}, + }}, + {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200}, + {From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201}, + }}, + {Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300}, + {From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301}, + }}, + {Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400}, + {From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401}, + }}, + {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500}, + {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, + }}, + {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, + {From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601}, + }}, + {Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700}, + {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701}, + }}, + }, + Filters: []*logproto.LineFilterExpression{ + {Operator: 1, Match: "foo"}, + {Operator: 1, Match: "bar"}, + }, + } + + // day ranges from ts-48h to ts-24h + day := getDayTime(ts.Add(-36 * time.Hour)) + + expected := []*logproto.GroupedChunkRefs{ + {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, + }}, + {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ + {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, + }}, + } + + task, _, _, _ := NewTask(tenant, input) + it := task.ChunkIterForDay(day) + + output := make([]*logproto.GroupedChunkRefs, 0, len(input.Refs)) + for it.Next() { + output = append(output, it.At()) + } + + require.Equal(t, expected, output) + }) +} diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 39d4e370da7c7..9254067e1b52f 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -15,30 +15,20 @@ type IndexedValue[T any] struct { } type IterWithIndex[T any] struct { - v1.PeekingIterator[T] + v1.Iterator[T] zero T // zero value of T cache IndexedValue[T] } func (it *IterWithIndex[T]) At() IndexedValue[T] { - it.cache.val = it.PeekingIterator.At() + it.cache.val = it.Iterator.At() return it.cache } -func (it *IterWithIndex[T]) Peek() (IndexedValue[T], bool) { - peek, ok := it.PeekingIterator.Peek() - if !ok { - it.cache.val = it.zero - return it.cache, false - } - it.cache.val = peek - return it.cache, true -} - -func NewIterWithIndex[T any](iter v1.PeekingIterator[T], idx int) v1.PeekingIterator[IndexedValue[T]] { +func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] { return &IterWithIndex[T]{ - PeekingIterator: iter, - cache: IndexedValue[T]{idx: idx}, + Iterator: iter, + cache: IndexedValue[T]{idx: idx}, } } @@ -84,59 +74,6 @@ func getDayTime(ts model.Time) time.Time { return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC) } -func filterRequestForDay(r *logproto.FilterChunkRefRequest, day time.Time) *logproto.FilterChunkRefRequest { - through := model.TimeFromUnix(day.Unix()) - from := model.TimeFromUnix(day.Add(24 * time.Hour).Unix()) - - refs := make([]*logproto.GroupedChunkRefs, 0, len(r.Refs)) - for i := range r.Refs { - groupedChunkRefs := &logproto.GroupedChunkRefs{ - Fingerprint: r.Refs[i].Fingerprint, - Tenant: r.Refs[i].Tenant, - Refs: make([]*logproto.ShortRef, 0, len(r.Refs[i].Refs)), - } - for j := range r.Refs[i].Refs { - shortRef := r.Refs[i].Refs[j] - fromDay := getDayTime(shortRef.From) - if fromDay.After(day) { - break - } - throughDay := getDayTime(shortRef.Through) - if fromDay.Equal(day) || throughDay.Equal(day) { - groupedChunkRefs.Refs = append(groupedChunkRefs.Refs, shortRef) - } - } - - // do not add empty groups to request - if len(groupedChunkRefs.Refs) == 0 { - continue - } - - groupFrom, groupThrough := getFromThrough(groupedChunkRefs.Refs) - if groupFrom.Before(from) { - from = groupFrom - } - if groupThrough.After(through) { - through = groupThrough - } - refs = append(refs, groupedChunkRefs) - } - - // The initial value of `from` is the through time and vice versa. - // This is, in order to determine min From and max Through. - // In case no chunk refs match, we need to swap the initial value again. - if len(refs) == 0 { - from, through = through, from - } - - return &logproto.FilterChunkRefRequest{ - From: from, - Through: through, - Refs: refs, - Filters: r.Filters, - } -} - // TODO(chaudum): Fix Through time calculation // getFromThrough assumes a list of ShortRefs sorted by From time // However, it does also assume that the last item has the highest diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 8f200ebfbb296..e82a3c4ea460a 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -2,12 +2,8 @@ package bloomgateway import ( "testing" - "time" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/logproto" ) func TestSliceIterWithIndex(t *testing.T) { @@ -37,106 +33,3 @@ func TestSliceIterWithIndex(t *testing.T) { require.Equal(t, 123, p.idx) }) } - -func TestFilterRequestForDay(t *testing.T) { - tenant := "fake" - - // Thu Nov 09 2023 10:56:50 UTC - ts := model.TimeFromUnix(1699523810) - - t.Run("filter chunk refs that fall into the day range", func(t *testing.T) { - input := &logproto.FilterChunkRefRequest{ - From: ts.Add(-168 * time.Hour), // 1w ago - Through: ts, - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100}, - {From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101}, - }}, - {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200}, - {From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201}, - }}, - {Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300}, - {From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301}, - }}, - {Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400}, - {From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401}, - }}, - {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500}, - {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, - }}, - {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, - {From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601}, - }}, - {Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700}, - {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701}, - }}, - }, - Filters: []*logproto.LineFilterExpression{ - {Operator: 1, Match: "foo"}, - {Operator: 1, Match: "bar"}, - }, - } - - // day ranges from ts-48h to ts-24h - day := getDayTime(ts.Add(-36 * time.Hour)) - - expected := &logproto.FilterChunkRefRequest{ - From: ts.Add(-48 * time.Hour), - Through: ts.Add(-46 * time.Hour), - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, - }}, - {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, - }}, - }, - Filters: []*logproto.LineFilterExpression{ - {Operator: 1, Match: "foo"}, - {Operator: 1, Match: "bar"}, - }, - } - - output := filterRequestForDay(input, day) - require.Equal(t, expected, output) - }) - - t.Run("empty response returns time range from input day", func(t *testing.T) { - input := &logproto.FilterChunkRefRequest{ - From: ts.Add(-168 * time.Hour), // 1w ago - Through: ts, - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100}, - }}, - }, - Filters: []*logproto.LineFilterExpression{ - {Operator: 1, Match: "foo"}, - {Operator: 1, Match: "bar"}, - }, - } - - // day ranges from ts-48h to ts-24h - day := getDayTime(ts.Add(-36 * time.Hour)) - - expected := &logproto.FilterChunkRefRequest{ - From: model.TimeFromUnix(day.Unix()), - Through: model.TimeFromUnix(day.Add(24 * time.Hour).Unix()), - Refs: []*logproto.GroupedChunkRefs{}, - Filters: []*logproto.LineFilterExpression{ - {Operator: 1, Match: "foo"}, - {Operator: 1, Match: "bar"}, - }, - } - - output := filterRequestForDay(input, day) - require.Equal(t, expected, output) - }) -} diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 4b7e6542e1dfa..82e1a195f1fe9 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -191,8 +191,8 @@ func (w *worker) running(ctx context.Context) error { for _, t := range tasks { for _, ref := range t.Request.Refs { t.ResCh <- v1.Output{ - Fp: model.Fingerprint(ref.Fingerprint), - Chks: convertToChunkRefs(ref.Refs), + Fp: model.Fingerprint(ref.Fingerprint), + Removals: nil, } } } @@ -227,8 +227,8 @@ func (w *worker) running(ctx context.Context) error { for hasNext { level.Warn(logger).Log("msg", "processing remaining fingerprint", "fp", it.At().Fp) it.At().Response <- v1.Output{ - Fp: it.At().Fp, - Chks: it.At().Chks, + Fp: it.At().Fp, + Removals: nil, } hasNext = it.Next() }