Skip to content

Commit

Permalink
Cleanup duplicate function in bloomgateway package
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 16, 2024
1 parent 5d1798f commit 2e08794
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 53 deletions.
48 changes: 4 additions & 44 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bloomgateway
import (
"context"
"math"
"sort"
"time"

"github.com/go-kit/log"
Expand All @@ -13,11 +12,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

type tasksForBlock struct {
blockRef bloomshipper.BlockRef
tasks []Task
}

func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
id: id,
Expand Down Expand Up @@ -66,13 +60,13 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas)))

blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
return p.processBlocks(ctx, partition(tasks, blocksRefs))
return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs))
}

func (p *processor) processBlocks(ctx context.Context, data []tasksForBlock) error {
func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error {
refs := make([]bloomshipper.BlockRef, len(data))
for _, block := range data {
refs = append(refs, block.blockRef)
refs = append(refs, block.ref)
}

bqs, err := p.store.FetchBlocks(ctx, refs)
Expand All @@ -87,7 +81,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.Bounds.Equal(bq.Bounds) {
if block.ref.Bounds.Equal(bq.Bounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
bq.Close()
if err != nil {
Expand Down Expand Up @@ -146,37 +140,3 @@ func group[K comparable, V any, S ~[]V](s S, f func(v V) K) map[K]S {
}
return m
}

func partition(tasks []Task, blocks []bloomshipper.BlockRef) []tasksForBlock {
result := make([]tasksForBlock, 0, len(blocks))

for _, block := range blocks {
bounded := tasksForBlock{
blockRef: block,
}

for _, task := range tasks {
refs := task.series
min := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) > v1.Before
})

max := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) == v1.After
})

// All fingerprints fall outside of the consumer's range
if min == len(refs) || max == 0 {
continue
}

bounded.tasks = append(bounded.tasks, task.Copy(refs[min:max]))
}

if len(bounded.tasks) > 0 {
result = append(result, bounded)
}

}
return result
}
14 changes: 8 additions & 6 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
return result
}

type boundedTasks struct {
blockRef bloomshipper.BlockRef
tasks []Task
type blockWithTasks struct {
ref bloomshipper.BlockRef
tasks []Task
}

func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (result []boundedTasks) {
func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
result := make([]blockWithTasks, 0, len(blocks))

for _, block := range blocks {
bounded := boundedTasks{
blockRef: block,
bounded := blockWithTasks{
ref: block,
}

for _, task := range tasks {
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
}
}

func TestPartitionFingerprintRange(t *testing.T) {
func TestPartitionTasks(t *testing.T) {

t.Run("consecutive block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
Expand All @@ -93,7 +93,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}

results := partitionFingerprintRange(tasks, bounds)
results := partitionTasks(tasks, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range

actualFingerprints := make([]*logproto.GroupedChunkRefs, 0, nSeries)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}

results := partitionFingerprintRange([]Task{task}, bounds)
results := partitionTasks([]Task{task}, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
for _, res := range results {
// ensure we have the right number of tasks per bound
Expand Down

0 comments on commit 2e08794

Please sign in to comment.