Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(chore) Cleanup duplicate functions/strucs in bloomgateway package #11978

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 4 additions & 44 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bloomgateway
import (
"context"
"math"
"sort"
"time"

"github.com/go-kit/log"
Expand All @@ -13,11 +12,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

type tasksForBlock struct {
blockRef bloomshipper.BlockRef
tasks []Task
}

func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
id: id,
Expand Down Expand Up @@ -66,13 +60,13 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas)))

blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
return p.processBlocks(ctx, partition(tasks, blocksRefs))
return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs))
}

func (p *processor) processBlocks(ctx context.Context, data []tasksForBlock) error {
func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error {
refs := make([]bloomshipper.BlockRef, len(data))
for _, block := range data {
refs = append(refs, block.blockRef)
refs = append(refs, block.ref)
}

bqs, err := p.store.FetchBlocks(ctx, refs)
Expand All @@ -87,7 +81,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.Bounds.Equal(bq.Bounds) {
if block.ref.Bounds.Equal(bq.Bounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
bq.Close()
if err != nil {
Expand Down Expand Up @@ -146,37 +140,3 @@ func group[K comparable, V any, S ~[]V](s S, f func(v V) K) map[K]S {
}
return m
}

func partition(tasks []Task, blocks []bloomshipper.BlockRef) []tasksForBlock {
result := make([]tasksForBlock, 0, len(blocks))

for _, block := range blocks {
bounded := tasksForBlock{
blockRef: block,
}

for _, task := range tasks {
refs := task.series
min := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) > v1.Before
})

max := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) == v1.After
})

// All fingerprints fall outside of the consumer's range
if min == len(refs) || max == 0 {
continue
}

bounded.tasks = append(bounded.tasks, task.Copy(refs[min:max]))
}

if len(bounded.tasks) > 0 {
result = append(result, bounded)
}

}
return result
}
14 changes: 8 additions & 6 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
return result
}

type boundedTasks struct {
blockRef bloomshipper.BlockRef
tasks []Task
type blockWithTasks struct {
ref bloomshipper.BlockRef
tasks []Task
}

func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (result []boundedTasks) {
func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
result := make([]blockWithTasks, 0, len(blocks))

for _, block := range blocks {
bounded := boundedTasks{
blockRef: block,
bounded := blockWithTasks{
ref: block,
}

for _, task := range tasks {
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
}
}

func TestPartitionFingerprintRange(t *testing.T) {
func TestPartitionTasks(t *testing.T) {

t.Run("consecutive block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
Expand All @@ -93,7 +93,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}

results := partitionFingerprintRange(tasks, bounds)
results := partitionTasks(tasks, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range

actualFingerprints := make([]*logproto.GroupedChunkRefs, 0, nSeries)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}

results := partitionFingerprintRange([]Task{task}, bounds)
results := partitionTasks([]Task{task}, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
for _, res := range results {
// ensure we have the right number of tasks per bound
Expand Down
Loading