Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(chore) Bloom shipper: Replace Keyspace struct with v1.FingerprintBounds #11839

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
Keyspace: bloomshipper.Keyspace{Min: job.minFp, Max: job.maxFp},
Keyspace: v1.NewBounds(job.minFp, job.maxFp),
Interval: bloomshipper.Interval{Start: job.from, End: job.through},
}
var metas []bloomshipper.Meta
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
End: ts.Add(Day),
}
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []bloomshipper.Keyspace{{Min: 0, Max: math.MaxUint64}}, tasks)
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
Expand All @@ -52,12 +52,12 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []bloomshipper.Keyspace, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []v1.FingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: interval,
Keyspace: bloomshipper.Keyspace{Min: minFpRange.Min, Max: maxFpRange.Max},
Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max},
}
metas, err := p.store.FetchMetas(ctx, metaSearch)
if err != nil {
Expand All @@ -82,7 +82,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.MinFingerprint == uint64(bq.MinFp) && block.blockRef.MaxFingerprint == uint64(bq.MaxFp) {
if block.blockRef.Bounds().Equal(bq.FingerprintBounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef)

for _, ref := range refs {
for _, bq := range s.querieres {
if ref.MinFingerprint == uint64(bq.MinFp) && ref.MaxFingerprint == uint64(bq.MaxFp) {
if ref.Bounds().Equal(bq.FingerprintBounds) {
result = append(result, bq)
}
}
Expand Down
20 changes: 9 additions & 11 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,8 @@ func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time,
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
bq := bloomshipper.BlockQuerierWithFingerprintRange{
BlockQuerier: blockQuerier,
MinFp: fromFp,
MaxFp: throughFp,
BlockQuerier: blockQuerier,
FingerprintBounds: v1.NewBounds(fromFp, throughFp),
}
bqs = append(bqs, bq)
series = append(series, data)
Expand Down Expand Up @@ -359,9 +358,8 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
querier := bloomshipper.BlockQuerierWithFingerprintRange{
BlockQuerier: blockQuerier,
MinFp: fromFp,
MaxFp: throughFp,
BlockQuerier: blockQuerier,
FingerprintBounds: v1.NewBounds(fromFp, throughFp),
}
queriers = append(queriers, querier)
metas = append(metas, meta)
Expand Down Expand Up @@ -392,8 +390,8 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _ blooms
for i := range s.bqs {
blocks = append(blocks, bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
MinFingerprint: uint64(s.bqs[i].MinFp),
MaxFingerprint: uint64(s.bqs[i].MaxFp),
MinFingerprint: uint64(s.bqs[i].Min),
MaxFingerprint: uint64(s.bqs[i].Max),
TenantID: tenant,
},
})
Expand Down Expand Up @@ -421,7 +419,7 @@ func (s *mockBloomStore) Fetch(_ context.Context, _ string, _ []bloomshipper.Blo
for _, bq := range shuffled {
// ignore errors in the mock
time.Sleep(s.delay)
err := callback(bq.BlockQuerier, uint64(bq.MinFp), uint64(bq.MaxFp))
err := callback(bq.BlockQuerier, bq.FingerprintBounds)
if err != nil {
return err
}
Expand Down Expand Up @@ -459,8 +457,8 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: "",
MinFingerprint: uint64(data[i].MinFp),
MaxFingerprint: uint64(data[i].MaxFp),
MinFingerprint: uint64(data[i].Min),
MaxFingerprint: uint64(data[i].Max),
StartTimestamp: 0,
EndTimestamp: 0,
Checksum: 0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ func (w *worker) stopping(err error) error {
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
if b.blockRef.Bounds().Equal(bounds) {
return w.processBlock(bq, b.tasks)
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (b FingerprintBounds) Within(target FingerprintBounds) bool {
return b.Min >= target.Min && b.Max <= target.Max
}

// Returns whether the fingerprint bounds is equal to the target bounds
func (b FingerprintBounds) Equal(target FingerprintBounds) bool {
return b.Min == target.Min && b.Max == target.Max
}

// Intersection returns the intersection of the two bounds
func (b FingerprintBounds) Intersection(target FingerprintBounds) *FingerprintBounds {
if !b.Overlaps(target) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type BlockRef struct {
BlockPath string
}

func (b *BlockRef) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(b.MinFingerprint), model.Fingerprint(b.MaxFingerprint))
}

type MetaRef struct {
Ref
FilePath string
Expand All @@ -67,7 +71,7 @@ type Meta struct {
type MetaSearchParams struct {
TenantID string
Interval Interval
Keyspace Keyspace
Keyspace v1.FingerprintBounds
}

type MetaClient interface {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
)
Expand Down Expand Up @@ -72,7 +73,7 @@ func Test_BloomClient_FetchMetas(t *testing.T) {

searchParams := MetaSearchParams{
TenantID: "tenantA",
Keyspace: Keyspace{Min: 50, Max: 150},
Keyspace: v1.NewBounds(50, 150),
Interval: Interval{Start: fixedDay.Add(-6 * day), End: fixedDay.Add(-1*day - 1*time.Hour)},
}

Expand Down
23 changes: 12 additions & 11 deletions pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
)

func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keyspaces []Keyspace) []Meta {
func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keyspaces []v1.FingerprintBounds) []Meta {
t.Helper()

metas := make([]Meta, len(keyspaces))
Expand Down Expand Up @@ -76,23 +77,23 @@ func TestMetasFetcher(t *testing.T) {
{
name: "all metas found in cache",
store: []Meta{},
start: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
end: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
start: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
},
{
name: "no metas found in cache",
store: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
store: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
start: []Meta{},
end: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
},
{
name: "some metas found in cache",
store: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}, {0x10000, 0x1ffff}}),
start: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}}),
end: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}, {0x10000, 0x1ffff}}),
fetch: makeMetas(t, schemaCfg, now, []Keyspace{{0x0000, 0xffff}, {0x10000, 0x1ffff}}),
store: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}),
start: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}),
end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}),
fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}),
},
}

Expand Down
40 changes: 12 additions & 28 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,12 @@ func (i Interval) Cmp(other model.Time) v1.BoundsCheck {
return v1.Overlap
}

type Keyspace struct {
Min, Max model.Fingerprint
}

func (r Keyspace) Cmp(other model.Fingerprint) v1.BoundsCheck {
if other < r.Min {
return v1.Before
} else if other > r.Max {
return v1.After
}
return v1.Overlap
}

type BlockQuerierWithFingerprintRange struct {
*v1.BlockQuerier
MinFp, MaxFp model.Fingerprint
v1.FingerprintBounds
}

type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error
type ForEachBlockCallback func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error

type Interface interface {
GetBlockRefs(ctx context.Context, tenant string, interval Interval) ([]BlockRef, error)
Expand Down Expand Up @@ -93,8 +80,8 @@ func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, interval In
level.Debug(s.logger).Log("msg", "GetBlockRefs", "tenant", tenantID, "[", interval.Start, "", interval.End)

// TODO(chaudum): The bloom gateway should not fetch blocks for the complete key space
keyspaces := []Keyspace{{0, math.MaxUint64}}
blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, interval, keyspaces)
bounds := []v1.FingerprintBounds{v1.NewBounds(0, math.MaxUint64)}
blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, interval, bounds)
if err != nil {
return nil, fmt.Errorf("error fetching active block references : %w", err)
}
Expand Down Expand Up @@ -136,7 +123,7 @@ func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error {
_ = b.Close()
}(block)

err := callback(block.closableBlockQuerier.BlockQuerier, block.MinFingerprint, block.MaxFingerprint)
err := callback(block.closableBlockQuerier.BlockQuerier, block.Bounds())
if err != nil {
return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err)
}
Expand All @@ -158,22 +145,22 @@ func getFirstLast[T any](s []T) (T, T) {
return s[0], s[len(s)-1]
}

func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, interval Interval, keyspaces []Keyspace) ([]BlockRef, error) {
minFpRange, maxFpRange := getFirstLast(keyspaces)
func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, interval Interval, bounds []v1.FingerprintBounds) ([]BlockRef, error) {
minFpRange, maxFpRange := getFirstLast(bounds)
metas, err := s.store.FetchMetas(ctx, MetaSearchParams{
TenantID: tenantID,
Keyspace: Keyspace{Min: minFpRange.Min, Max: maxFpRange.Max},
Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max),
Interval: interval,
})
if err != nil {
return []BlockRef{}, fmt.Errorf("error fetching meta.json files: %w", err)
}
level.Debug(s.logger).Log("msg", "dowloaded metas", "count", len(metas))

return BlocksForMetas(metas, interval, keyspaces), nil
return BlocksForMetas(metas, interval, bounds), nil
}

func BlocksForMetas(metas []Meta, interval Interval, keyspaces []Keyspace) []BlockRef {
func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) []BlockRef {
tombstones := make(map[string]interface{})
for _, meta := range metas {
for _, tombstone := range meta.Tombstones {
Expand Down Expand Up @@ -216,18 +203,15 @@ func BlocksForMetas(metas []Meta, interval Interval, keyspaces []Keyspace) []Blo
// isOutsideRange tests if a given BlockRef b is outside of search boundaries
// defined by min/max timestamp and min/max fingerprint.
// Fingerprint ranges must be sorted in ascending order.
func isOutsideRange(b BlockRef, interval Interval, keyspaces []Keyspace) bool {
func isOutsideRange(b BlockRef, interval Interval, keyspaces []v1.FingerprintBounds) bool {
// check time interval
if interval.Cmp(b.EndTimestamp) == v1.Before || interval.Cmp(b.StartTimestamp) == v1.After {
return true
}

// check fingerprint ranges
for _, keyspace := range keyspaces {
if keyspace.Cmp(model.Fingerprint(b.MinFingerprint)) == v1.Before && keyspace.Cmp(model.Fingerprint(b.MaxFingerprint)) == v1.After {
return false
}
if keyspace.Cmp(model.Fingerprint(b.MinFingerprint)) == v1.Overlap || keyspace.Cmp(model.Fingerprint(b.MaxFingerprint)) == v1.Overlap {
if keyspace.Within(b.Bounds()) || keyspace.Overlaps(b.Bounds()) {
return false
}
}
Expand Down
Loading
Loading