From 2e0879452c10a503ca77021b3f782c6c81b3bfb6 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 16 Feb 2024 15:39:59 +0100 Subject: [PATCH] Cleanup duplicate function in bloomgateway package Signed-off-by: Christian Haudum --- pkg/bloomgateway/processor.go | 48 +++-------------------------------- pkg/bloomgateway/util.go | 14 +++++----- pkg/bloomgateway/util_test.go | 6 ++--- 3 files changed, 15 insertions(+), 53 deletions(-) diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 5eab7a858c74b..7d1d687853979 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -3,7 +3,6 @@ package bloomgateway import ( "context" "math" - "sort" "time" "github.com/go-kit/log" @@ -13,11 +12,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) -type tasksForBlock struct { - blockRef bloomshipper.BlockRef - tasks []Task -} - func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor { return &processor{ id: id, @@ -66,13 +60,13 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config. p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas))) blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces) - return p.processBlocks(ctx, partition(tasks, blocksRefs)) + return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs)) } -func (p *processor) processBlocks(ctx context.Context, data []tasksForBlock) error { +func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error { refs := make([]bloomshipper.BlockRef, len(data)) for _, block := range data { - refs = append(refs, block.blockRef) + refs = append(refs, block.ref) } bqs, err := p.store.FetchBlocks(ctx, refs) @@ -87,7 +81,7 @@ outer: for blockIter.Next() { bq := blockIter.At() for i, block := range data { - if block.blockRef.Bounds.Equal(bq.Bounds) { + if block.ref.Bounds.Equal(bq.Bounds) { err := p.processBlock(ctx, bq.BlockQuerier, block.tasks) bq.Close() if err != nil { @@ -146,37 +140,3 @@ func group[K comparable, V any, S ~[]V](s S, f func(v V) K) map[K]S { } return m } - -func partition(tasks []Task, blocks []bloomshipper.BlockRef) []tasksForBlock { - result := make([]tasksForBlock, 0, len(blocks)) - - for _, block := range blocks { - bounded := tasksForBlock{ - blockRef: block, - } - - for _, task := range tasks { - refs := task.series - min := sort.Search(len(refs), func(i int) bool { - return block.Cmp(refs[i].Fingerprint) > v1.Before - }) - - max := sort.Search(len(refs), func(i int) bool { - return block.Cmp(refs[i].Fingerprint) == v1.After - }) - - // All fingerprints fall outside of the consumer's range - if min == len(refs) || max == 0 { - continue - } - - bounded.tasks = append(bounded.tasks, task.Copy(refs[min:max])) - } - - if len(bounded.tasks) > 0 { - result = append(result, bounded) - } - - } - return result -} diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 3793076f7c385..3ab234aaa8ae0 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -83,15 +83,17 @@ func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs { return result } -type boundedTasks struct { - blockRef bloomshipper.BlockRef - tasks []Task +type blockWithTasks struct { + ref bloomshipper.BlockRef + tasks []Task } -func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (result []boundedTasks) { +func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks { + result := make([]blockWithTasks, 0, len(blocks)) + for _, block := range blocks { - bounded := boundedTasks{ - blockRef: block, + bounded := blockWithTasks{ + ref: block, } for _, task := range tasks { diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 9b5ce6e897bb9..2ed55561738fb 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -73,7 +73,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef { } } -func TestPartitionFingerprintRange(t *testing.T) { +func TestPartitionTasks(t *testing.T) { t.Run("consecutive block ranges", func(t *testing.T) { bounds := []bloomshipper.BlockRef{ @@ -93,7 +93,7 @@ func TestPartitionFingerprintRange(t *testing.T) { tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) } - results := partitionFingerprintRange(tasks, bounds) + results := partitionTasks(tasks, bounds) require.Equal(t, 3, len(results)) // ensure we only return bounds in range actualFingerprints := make([]*logproto.GroupedChunkRefs, 0, nSeries) @@ -128,7 +128,7 @@ func TestPartitionFingerprintRange(t *testing.T) { task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) } - results := partitionFingerprintRange([]Task{task}, bounds) + results := partitionTasks([]Task{task}, bounds) require.Equal(t, 3, len(results)) // ensure we only return bounds in range for _, res := range results { // ensure we have the right number of tasks per bound