Skip to content

Commit

Permalink
Blooms/bounds integration (grafana#11848)
Browse files Browse the repository at this point in the history
Replaces `Ref`'s min/max fingerprint fields with `v1.FingerprintBounds`
and improves a bunch of checks that use it. Builds on top of
grafana#11847
  • Loading branch information
owen-d authored Feb 1, 2024
1 parent c335cd2 commit 098eef7
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 92 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.Bounds().Equal(bq.FingerprintBounds) {
if block.blockRef.Bounds.Equal(bq.FingerprintBounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef)

for _, ref := range refs {
for _, bq := range s.querieres {
if ref.Bounds().Equal(bq.FingerprintBounds) {
if ref.Bounds.Equal(bq.FingerprintBounds) {
result = append(result, bq)
}
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func TestTruncateDay(t *testing.T) {
func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
MinFingerprint: minFp,
MaxFingerprint: maxFp,
Bounds: v1.NewBounds(model.Fingerprint(minFp), model.Fingerprint(maxFp)),
},
}
}
Expand Down Expand Up @@ -339,8 +338,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
ref := bloomshipper.Ref{
TenantID: tenant,
TableName: "table_0",
MinFingerprint: uint64(fromFp),
MaxFingerprint: uint64(throughFp),
Bounds: v1.NewBounds(fromFp, throughFp),
StartTimestamp: from,
EndTimestamp: through,
}
Expand Down Expand Up @@ -390,9 +388,8 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _ blooms
for i := range s.bqs {
blocks = append(blocks, bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
MinFingerprint: uint64(s.bqs[i].Min),
MaxFingerprint: uint64(s.bqs[i].Max),
TenantID: tenant,
Bounds: v1.NewBounds(s.bqs[i].Min, s.bqs[i].Max),
TenantID: tenant,
},
})
}
Expand Down Expand Up @@ -457,8 +454,7 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: "",
MinFingerprint: uint64(data[i].Min),
MaxFingerprint: uint64(data[i].Max),
Bounds: v1.NewBounds(data[i].Min, data[i].Max),
StartTimestamp: 0,
EndTimestamp: 0,
Checksum: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (w *worker) stopping(err error) error {
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error {
for _, b := range boundedRefs {
if b.blockRef.Bounds().Equal(bounds) {
if b.blockRef.Bounds.Equal(bounds) {
return w.processBlock(bq, b.tasks)
}
}
Expand Down
28 changes: 27 additions & 1 deletion pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package v1

import (
"fmt"
"hash"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -17,6 +19,26 @@ const (
After
)

// ParseBoundsFromAddr parses a fingerprint bounds from a string
func ParseBoundsFromAddr(s string) (FingerprintBounds, error) {
parts := strings.Split(s, "-")
return ParseBoundsFromParts(parts[0], parts[1])
}

// ParseBoundsFromParts parses a fingerprint bounds already separated strings
func ParseBoundsFromParts(a, b string) (FingerprintBounds, error) {
minFingerprint, err := model.ParseFingerprint(a)
if err != nil {
return FingerprintBounds{}, fmt.Errorf("error parsing minFingerprint %s : %w", a, err)
}
maxFingerprint, err := model.ParseFingerprint(b)
if err != nil {
return FingerprintBounds{}, fmt.Errorf("error parsing maxFingerprint %s : %w", b, err)
}

return NewBounds(minFingerprint, maxFingerprint), nil
}

type FingerprintBounds struct {
Min, Max model.Fingerprint
}
Expand All @@ -33,8 +55,12 @@ func (b FingerprintBounds) Hash(h hash.Hash32) error {
return errors.Wrap(err, "writing FingerprintBounds")
}

// Addr returns the string representation of the fingerprint bounds for use in
// content addressable storage.
// TODO(owen-d): incorporate this into the schema so we can change it,
// similar to `{,Parse}ExternalKey`
func (b FingerprintBounds) String() string {
return b.Min.String() + "-" + b.Max.String()
return fmt.Sprintf("%016x-%016x", uint64(b.Min), uint64(b.Max))
}

func (b FingerprintBounds) Less(other FingerprintBounds) bool {
Expand Down
23 changes: 21 additions & 2 deletions pkg/storage/bloom/v1/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,31 @@ package v1
import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

func Test_ParseFingerprint(t *testing.T) {
fp, err := model.ParseFingerprint("7d0")
assert.NoError(t, err)
assert.Equal(t, model.Fingerprint(2000), fp)
}

func Test_FingerprintBounds_String(t *testing.T) {
bounds := NewBounds(1, 2)
assert.Equal(t, "0000000000000001-0000000000000002", bounds.String())
bounds := NewBounds(10, 2000)
assert.Equal(t, "000000000000000a-00000000000007d0", bounds.String())
}

func Test_ParseBoundsFromAddr(t *testing.T) {
bounds, err := ParseBoundsFromAddr("a-7d0")
assert.NoError(t, err)
assert.Equal(t, NewBounds(10, 2000), bounds)
}

func Test_ParseBoundsFromParts(t *testing.T) {
bounds, err := ParseBoundsFromParts("a", "7d0")
assert.NoError(t, err)
assert.Equal(t, NewBounds(10, 2000), bounds)
}

func Test_FingerprintBounds_Cmp(t *testing.T) {
Expand Down
38 changes: 12 additions & 26 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,16 @@ const (
)

type Ref struct {
TenantID string
TableName string
MinFingerprint, MaxFingerprint uint64
StartTimestamp, EndTimestamp model.Time
Checksum uint32
TenantID string
TableName string
Bounds v1.FingerprintBounds
StartTimestamp, EndTimestamp model.Time
Checksum uint32
}

// Cmp returns the fingerprint's position relative to the bounds
func (r Ref) Cmp(fp uint64) v1.BoundsCheck {
if fp < r.MinFingerprint {
return v1.Before
} else if fp > r.MaxFingerprint {
return v1.After
}
return v1.Overlap
}

func (r Ref) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(r.MinFingerprint), model.Fingerprint(r.MaxFingerprint))
return r.Bounds.Cmp(model.Fingerprint(fp))
}

func (r Ref) Interval() Interval {
Expand Down Expand Up @@ -135,13 +126,13 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error {
}

func externalBlockKey(ref BlockRef) string {
blockParentFolder := fmt.Sprintf("%x-%x", ref.MinFingerprint, ref.MaxFingerprint)
blockParentFolder := ref.Bounds.String()
filename := fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
return path.Join(rootFolder, ref.TableName, ref.TenantID, bloomsFolder, blockParentFolder, filename)
}

func externalMetaKey(ref MetaRef) string {
filename := fmt.Sprintf("%x-%x-%d-%d-%x", ref.MinFingerprint, ref.MaxFingerprint, ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
filename := fmt.Sprintf("%s-%d-%d-%x", ref.Bounds.String(), ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
return path.Join(rootFolder, ref.TableName, ref.TenantID, metasFolder, filename)
}

Expand Down Expand Up @@ -251,15 +242,11 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
if len(parts) != 5 {
return MetaRef{}, fmt.Errorf("%s filename parts count must be 5 but was %d: [%s]", objectKey, len(parts), strings.Join(parts, ", "))
}

minFingerprint, err := strconv.ParseUint(parts[0], 16, 64)
bounds, err := v1.ParseBoundsFromParts(parts[0], parts[1])
if err != nil {
return MetaRef{}, fmt.Errorf("error parsing minFingerprint %s : %w", parts[0], err)
}
maxFingerprint, err := strconv.ParseUint(parts[1], 16, 64)
if err != nil {
return MetaRef{}, fmt.Errorf("error parsing maxFingerprint %s : %w", parts[1], err)
return MetaRef{}, fmt.Errorf("error parsing bounds %s : %w", parts[0], err)
}

startTimestamp, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return MetaRef{}, fmt.Errorf("error parsing startTimestamp %s : %w", parts[2], err)
Expand All @@ -276,8 +263,7 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
Ref: Ref{
TenantID: tenantID,
TableName: tableName,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
Bounds: bounds,
StartTimestamp: model.Time(startTimestamp),
EndTimestamp: model.Time(endTimestamp),
Checksum: uint32(checksum),
Expand Down
Loading

0 comments on commit 098eef7

Please sign in to comment.