Skip to content

Commit

Permalink
(chore) Bloom shipper: Replace Keyspace struct with `v1.Fingerprint…
Browse files Browse the repository at this point in the history
…Bounds` (#11839)

The latter struct has more utility functions to compare and operate on
bounds.

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jan 31, 2024
1 parent 509f6f6 commit 4411649
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
Keyspace: bloomshipper.Keyspace{Min: job.minFp, Max: job.maxFp},
Keyspace: v1.NewBounds(job.minFp, job.maxFp),
Interval: bloomshipper.Interval{Start: job.from, End: job.through},
}
var metas []bloomshipper.Meta
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
End: ts.Add(Day),
}
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []bloomshipper.Keyspace{{Min: 0, Max: math.MaxUint64}}, tasks)
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
Expand All @@ -52,12 +52,12 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []bloomshipper.Keyspace, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []v1.FingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: interval,
Keyspace: bloomshipper.Keyspace{Min: minFpRange.Min, Max: maxFpRange.Max},
Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max},
}
metas, err := p.store.FetchMetas(ctx, metaSearch)
if err != nil {
Expand All @@ -82,7 +82,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.MinFingerprint == uint64(bq.MinFp) && block.blockRef.MaxFingerprint == uint64(bq.MaxFp) {
if block.blockRef.Bounds().Equal(bq.FingerprintBounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef)

for _, ref := range refs {
for _, bq := range s.querieres {
if ref.MinFingerprint == uint64(bq.MinFp) && ref.MaxFingerprint == uint64(bq.MaxFp) {
if ref.Bounds().Equal(bq.FingerprintBounds) {
result = append(result, bq)
}
}
Expand Down
20 changes: 9 additions & 11 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,8 @@ func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time,
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
bq := bloomshipper.BlockQuerierWithFingerprintRange{
BlockQuerier: blockQuerier,
MinFp: fromFp,
MaxFp: throughFp,
BlockQuerier: blockQuerier,
FingerprintBounds: v1.NewBounds(fromFp, throughFp),
}
bqs = append(bqs, bq)
series = append(series, data)
Expand Down Expand Up @@ -359,9 +358,8 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
querier := bloomshipper.BlockQuerierWithFingerprintRange{
BlockQuerier: blockQuerier,
MinFp: fromFp,
MaxFp: throughFp,
BlockQuerier: blockQuerier,
FingerprintBounds: v1.NewBounds(fromFp, throughFp),
}
queriers = append(queriers, querier)
metas = append(metas, meta)
Expand Down Expand Up @@ -392,8 +390,8 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _ blooms
for i := range s.bqs {
blocks = append(blocks, bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
MinFingerprint: uint64(s.bqs[i].MinFp),
MaxFingerprint: uint64(s.bqs[i].MaxFp),
MinFingerprint: uint64(s.bqs[i].Min),
MaxFingerprint: uint64(s.bqs[i].Max),
TenantID: tenant,
},
})
Expand Down Expand Up @@ -421,7 +419,7 @@ func (s *mockBloomStore) Fetch(_ context.Context, _ string, _ []bloomshipper.Blo
for _, bq := range shuffled {
// ignore errors in the mock
time.Sleep(s.delay)
err := callback(bq.BlockQuerier, uint64(bq.MinFp), uint64(bq.MaxFp))
err := callback(bq.BlockQuerier, bq.FingerprintBounds)
if err != nil {
return err
}
Expand Down Expand Up @@ -459,8 +457,8 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: "",
MinFingerprint: uint64(data[i].MinFp),
MaxFingerprint: uint64(data[i].MaxFp),
MinFingerprint: uint64(data[i].Min),
MaxFingerprint: uint64(data[i].Max),
StartTimestamp: 0,
EndTimestamp: 0,
Checksum: 0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ func (w *worker) stopping(err error) error {
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
if b.blockRef.Bounds().Equal(bounds) {
return w.processBlock(bq, b.tasks)
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (b FingerprintBounds) Within(target FingerprintBounds) bool {
return b.Min >= target.Min && b.Max <= target.Max
}

// Returns whether the fingerprint bounds is equal to the target bounds
func (b FingerprintBounds) Equal(target FingerprintBounds) bool {
return b.Min == target.Min && b.Max == target.Max
}

// Intersection returns the intersection of the two bounds
func (b FingerprintBounds) Intersection(target FingerprintBounds) *FingerprintBounds {
if !b.Overlaps(target) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type BlockRef struct {
BlockPath string
}

func (b *BlockRef) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(b.MinFingerprint), model.Fingerprint(b.MaxFingerprint))
}

type MetaRef struct {
Ref
FilePath string
Expand All @@ -67,7 +71,7 @@ type Meta struct {
type MetaSearchParams struct {
TenantID string
Interval Interval
Keyspace Keyspace
Keyspace v1.FingerprintBounds
}

type MetaClient interface {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
)
Expand Down Expand Up @@ -72,7 +73,7 @@ func Test_BloomClient_FetchMetas(t *testing.T) {

searchParams := MetaSearchParams{
TenantID: "tenantA",
Keyspace: Keyspace{Min: 50, Max: 150},
Keyspace: v1.NewBounds(50, 150),
Interval: Interval{Start: fixedDay.Add(-6 * day), End: fixedDay.Add(-1*day - 1*time.Hour)},
}

Expand Down
23 changes: 12 additions & 11 deletions pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
)

func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keyspaces []Keyspace) []Meta {
func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keyspaces []v1.FingerprintBounds) []Meta {
t.Helper()

metas := make([]Meta, len(keyspaces))
Expand Down Expand Up @@ -76,23 +77,23 @@ func TestMetasFetcher(t *testing.T) {
{
name: "all metas found in cache",
store: []Meta{},
start: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
end: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
start: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
},
{
name: "no metas found in cache",
store: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
store: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
start: []Meta{},
end: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
},
{
name: "some metas found in cache",
store: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}, {0x10000, 0x1ffff}}),
start: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
end: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}, {0x10000, 0x1ffff}}),
fetch: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}, {0x10000, 0x1ffff}}),
store: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}),
start: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}),
fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}),
},
}

Expand Down
40 changes: 12 additions & 28 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,12 @@ func (i Interval) Cmp(other model.Time) v1.BoundsCheck {
return v1.Overlap
}

type Keyspace struct {
Min, Max model.Fingerprint
}

func (r Keyspace) Cmp(other model.Fingerprint) v1.BoundsCheck {
if other < r.Min {
return v1.Before
} else if other > r.Max {
return v1.After
}
return v1.Overlap
}

type BlockQuerierWithFingerprintRange struct {
*v1.BlockQuerier
MinFp, MaxFp model.Fingerprint
v1.FingerprintBounds
}

type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error
type ForEachBlockCallback func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error

type Interface interface {
GetBlockRefs(ctx context.Context, tenant string, interval Interval) ([]BlockRef, error)
Expand Down Expand Up @@ -93,8 +80,8 @@ func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, interval In
level.Debug(s.logger).Log("msg", "GetBlockRefs", "tenant", tenantID, "[", interval.Start, "", interval.End)

// TODO(chaudum): The bloom gateway should not fetch blocks for the complete key space
keyspaces := []Keyspace{{0, math.MaxUint64}}
blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, interval, keyspaces)
bounds := []v1.FingerprintBounds{v1.NewBounds(0, math.MaxUint64)}
blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, interval, bounds)
if err != nil {
return nil, fmt.Errorf("error fetching active block references : %w", err)
}
Expand Down Expand Up @@ -136,7 +123,7 @@ func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error {
_ = b.Close()
}(block)

err := callback(block.closableBlockQuerier.BlockQuerier, block.MinFingerprint, block.MaxFingerprint)
err := callback(block.closableBlockQuerier.BlockQuerier, block.Bounds())
if err != nil {
return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err)
}
Expand All @@ -158,22 +145,22 @@ func getFirstLast[T any](s []T) (T, T) {
return s[0], s[len(s)-1]
}

func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, interval Interval, keyspaces []Keyspace) ([]BlockRef, error) {
minFpRange, maxFpRange := getFirstLast(keyspaces)
func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, interval Interval, bounds []v1.FingerprintBounds) ([]BlockRef, error) {
minFpRange, maxFpRange := getFirstLast(bounds)
metas, err := s.store.FetchMetas(ctx, MetaSearchParams{
TenantID: tenantID,
Keyspace: Keyspace{Min: minFpRange.Min, Max: maxFpRange.Max},
Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max),
Interval: interval,
})
if err != nil {
return []BlockRef{}, fmt.Errorf("error fetching meta.json files: %w", err)
}
level.Debug(s.logger).Log("msg", "dowloaded metas", "count", len(metas))

return BlocksForMetas(metas, interval, keyspaces), nil
return BlocksForMetas(metas, interval, bounds), nil
}

func BlocksForMetas(metas []Meta, interval Interval, keyspaces []Keyspace) []BlockRef {
func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) []BlockRef {
tombstones := make(map[string]interface{})
for _, meta := range metas {
for _, tombstone := range meta.Tombstones {
Expand Down Expand Up @@ -216,18 +203,15 @@ func BlocksForMetas(metas []Meta, interval Interval, keyspaces []Keyspace) []Blo
// isOutsideRange tests if a given BlockRef b is outside of search boundaries
// defined by min/max timestamp and min/max fingerprint.
// Fingerprint ranges must be sorted in ascending order.
func isOutsideRange(b BlockRef, interval Interval, keyspaces []Keyspace) bool {
func isOutsideRange(b BlockRef, interval Interval, keyspaces []v1.FingerprintBounds) bool {
// check time interval
if interval.Cmp(b.EndTimestamp) == v1.Before || interval.Cmp(b.StartTimestamp) == v1.After {
return true
}

// check fingerprint ranges
for _, keyspace := range keyspaces {
if keyspace.Cmp(model.Fingerprint(b.MinFingerprint)) == v1.Before && keyspace.Cmp(model.Fingerprint(b.MaxFingerprint)) == v1.After {
return false
}
if keyspace.Cmp(model.Fingerprint(b.MinFingerprint)) == v1.Overlap || keyspace.Cmp(model.Fingerprint(b.MaxFingerprint)) == v1.Overlap {
if keyspace.Within(b.Bounds()) || keyspace.Overlaps(b.Bounds()) {
return false
}
}
Expand Down
Loading

0 comments on commit 4411649

Please sign in to comment.