Skip to content

Commit

Permalink
integrates FingerprintBounds into bloomshipper Ref
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Feb 1, 2024
1 parent a78eae0 commit 5d6afa0
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 83 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.Bounds().Equal(bq.FingerprintBounds) {
if block.blockRef.Bounds.Equal(bq.FingerprintBounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef)

for _, ref := range refs {
for _, bq := range s.querieres {
if ref.Bounds().Equal(bq.FingerprintBounds) {
if ref.Bounds.Equal(bq.FingerprintBounds) {
result = append(result, bq)
}
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func TestTruncateDay(t *testing.T) {
func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
MinFingerprint: minFp,
MaxFingerprint: maxFp,
Bounds: v1.NewBounds(model.Fingerprint(minFp), model.Fingerprint(maxFp)),
},
}
}
Expand Down Expand Up @@ -339,8 +338,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
ref := bloomshipper.Ref{
TenantID: tenant,
TableName: "table_0",
MinFingerprint: uint64(fromFp),
MaxFingerprint: uint64(throughFp),
Bounds: v1.NewBounds(fromFp, throughFp),
StartTimestamp: from,
EndTimestamp: through,
}
Expand Down Expand Up @@ -390,9 +388,8 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _ blooms
for i := range s.bqs {
blocks = append(blocks, bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
MinFingerprint: uint64(s.bqs[i].Min),
MaxFingerprint: uint64(s.bqs[i].Max),
TenantID: tenant,
Bounds: v1.NewBounds(s.bqs[i].Min, s.bqs[i].Max),
TenantID: tenant,
},
})
}
Expand Down Expand Up @@ -457,8 +454,7 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: "",
MinFingerprint: uint64(data[i].Min),
MaxFingerprint: uint64(data[i].Max),
Bounds: v1.NewBounds(data[i].Min, data[i].Max),
StartTimestamp: 0,
EndTimestamp: 0,
Checksum: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (w *worker) stopping(err error) error {
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error {
for _, b := range boundedRefs {
if b.blockRef.Bounds().Equal(bounds) {
if b.blockRef.Bounds.Equal(bounds) {
return w.processBlock(bq, b.tasks)
}
}
Expand Down
41 changes: 40 additions & 1 deletion pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package v1

import (
"fmt"
"hash"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -17,6 +20,34 @@ const (
After
)

func ParseFingerprint(s string) (model.Fingerprint, error) {
fp, err := strconv.ParseUint(s, 16, 64)
if err != nil {
return 0, fmt.Errorf("error parsing fingerprint %s : %w", s, err)
}
return model.Fingerprint(fp), nil
}

// ParseBoundsFromAddr parses a fingerprint bounds from a string
func ParseBoundsFromAddr(s string) (FingerprintBounds, error) {
parts := strings.Split(s, "-")
return ParseBoundsFromParts(parts[0], parts[1])
}

// ParseBoundsFromParts parses a fingerprint bounds already separated strings
func ParseBoundsFromParts(a, b string) (FingerprintBounds, error) {
minFingerprint, err := ParseFingerprint(a)
if err != nil {
return FingerprintBounds{}, fmt.Errorf("error parsing minFingerprint %s : %w", a, err)
}
maxFingerprint, err := ParseFingerprint(b)
if err != nil {
return FingerprintBounds{}, fmt.Errorf("error parsing maxFingerprint %s : %w", b, err)
}

return NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), nil
}

type FingerprintBounds struct {
Min, Max model.Fingerprint
}
Expand All @@ -33,8 +64,16 @@ func (b FingerprintBounds) Hash(h hash.Hash32) error {
return errors.Wrap(err, "writing OwnershipRange")
}

// Addr returns the string representation of the fingerprint bounds for use in
// content addressable storage.
// TODO(owen-d): incorporate this into the schema so we can change it,
// similar to `{,Parse}ExternalKey`
func (b FingerprintBounds) Addr() string {
return fmt.Sprintf("%x-%x", uint64(b.Min), uint64(b.Max))
}

func (b FingerprintBounds) String() string {
return b.Min.String() + "-" + b.Max.String()
return b.Addr()
}

func (b FingerprintBounds) Less(other FingerprintBounds) bool {
Expand Down
25 changes: 22 additions & 3 deletions pkg/storage/bloom/v1/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,31 @@ package v1
import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

func Test_FingerprintBounds_String(t *testing.T) {
bounds := NewBounds(1, 2)
assert.Equal(t, "0000000000000001-0000000000000002", bounds.String())
func Test_ParseFingerprint(t *testing.T) {
fp, err := ParseFingerprint("7d0")
assert.NoError(t, err)
assert.Equal(t, model.Fingerprint(2000), fp)
}

func Test_FingerprintBounds_Addr(t *testing.T) {
bounds := NewBounds(10, 2000)
assert.Equal(t, "a-7d0", bounds.Addr())
}

func Test_ParseBoundsFromAddr(t *testing.T) {
bounds, err := ParseBoundsFromAddr("a-7d0")
assert.NoError(t, err)
assert.Equal(t, NewBounds(10, 2000), bounds)
}

func Test_ParseBoundsFromParts(t *testing.T) {
bounds, err := ParseBoundsFromParts("a", "7d0")
assert.NoError(t, err)
assert.Equal(t, NewBounds(10, 2000), bounds)
}

func Test_FingerprintBounds_Cmp(t *testing.T) {
Expand Down
38 changes: 12 additions & 26 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,16 @@ const (
)

type Ref struct {
TenantID string
TableName string
MinFingerprint, MaxFingerprint uint64
StartTimestamp, EndTimestamp model.Time
Checksum uint32
TenantID string
TableName string
Bounds v1.FingerprintBounds
StartTimestamp, EndTimestamp model.Time
Checksum uint32
}

// Cmp returns the fingerprint's position relative to the bounds
func (r Ref) Cmp(fp uint64) v1.BoundsCheck {
if fp < r.MinFingerprint {
return v1.Before
} else if fp > r.MaxFingerprint {
return v1.After
}
return v1.Overlap
return r.Bounds.Cmp(model.Fingerprint(fp))
}

type BlockRef struct {
Expand All @@ -51,10 +46,6 @@ type BlockRef struct {
BlockPath string
}

func (b *BlockRef) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(b.MinFingerprint), model.Fingerprint(b.MaxFingerprint))
}

type MetaRef struct {
Ref
FilePath string
Expand Down Expand Up @@ -131,13 +122,13 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error {
}

func externalBlockKey(ref BlockRef) string {
blockParentFolder := fmt.Sprintf("%x-%x", ref.MinFingerprint, ref.MaxFingerprint)
blockParentFolder := ref.Bounds.Addr()
filename := fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
return path.Join(rootFolder, ref.TableName, ref.TenantID, bloomsFolder, blockParentFolder, filename)
}

func externalMetaKey(ref MetaRef) string {
filename := fmt.Sprintf("%x-%x-%d-%d-%x", ref.MinFingerprint, ref.MaxFingerprint, ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
filename := fmt.Sprintf("%s-%d-%d-%x", ref.Bounds.Addr(), ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
return path.Join(rootFolder, ref.TableName, ref.TenantID, metasFolder, filename)
}

Expand Down Expand Up @@ -247,15 +238,11 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
if len(parts) != 5 {
return MetaRef{}, fmt.Errorf("%s filename parts count must be 5 but was %d: [%s]", objectKey, len(parts), strings.Join(parts, ", "))
}

minFingerprint, err := strconv.ParseUint(parts[0], 16, 64)
bounds, err := v1.ParseBoundsFromParts(parts[0], parts[1])
if err != nil {
return MetaRef{}, fmt.Errorf("error parsing minFingerprint %s : %w", parts[0], err)
}
maxFingerprint, err := strconv.ParseUint(parts[1], 16, 64)
if err != nil {
return MetaRef{}, fmt.Errorf("error parsing maxFingerprint %s : %w", parts[1], err)
return MetaRef{}, fmt.Errorf("error parsing bounds %s : %w", parts[0], err)
}

startTimestamp, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return MetaRef{}, fmt.Errorf("error parsing startTimestamp %s : %w", parts[2], err)
Expand All @@ -272,8 +259,7 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
Ref: Ref{
TenantID: tenantID,
TableName: tableName,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
Bounds: bounds,
StartTimestamp: model.Time(startTimestamp),
EndTimestamp: model.Time(endTimestamp),
Checksum: uint32(checksum),
Expand Down
38 changes: 14 additions & 24 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
Ref: Ref{
TenantID: "tenantA",
TableName: "first-period-19621",
MinFingerprint: 0xeeee,
MaxFingerprint: 0xffff,
Bounds: v1.NewBounds(0xeeee, 0xffff),
StartTimestamp: Date(2023, time.September, 21, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 21, 6, 0, 0),
Checksum: 1,
Expand All @@ -231,8 +230,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
Ref: Ref{
TenantID: "tenantA",
TableName: "second-period-19624",
MinFingerprint: 0xaaaa,
MaxFingerprint: 0xbbbb,
Bounds: v1.NewBounds(0xaaaa, 0xbbbb),
StartTimestamp: Date(2023, time.September, 24, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 24, 6, 0, 0),
Checksum: 2,
Expand Down Expand Up @@ -261,8 +259,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
Ref: Ref{
TenantID: "tenantA",
TableName: "first-period-19621",
MinFingerprint: 0xeeee,
MaxFingerprint: 0xffff,
Bounds: v1.NewBounds(0xeeee, 0xffff),
StartTimestamp: Date(2023, time.September, 21, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 21, 6, 0, 0),
Checksum: 1,
Expand All @@ -278,8 +275,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
Ref: Ref{
TenantID: "tenantA",
TableName: "second-period-19624",
MinFingerprint: 0xaaaa,
MaxFingerprint: 0xbbbb,
Bounds: v1.NewBounds(0xaaaa, 0xbbbb),
StartTimestamp: Date(2023, time.September, 24, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 24, 6, 0, 0),
Checksum: 2,
Expand All @@ -297,8 +293,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.Equal(t, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400000-1695276000000-1", path)
require.Equal(t, blockForFirstFolder.TenantID, firstResultBlock.TenantID)
require.Equal(t, blockForFirstFolder.TableName, firstResultBlock.TableName)
require.Equal(t, blockForFirstFolder.MinFingerprint, firstResultBlock.MinFingerprint)
require.Equal(t, blockForFirstFolder.MaxFingerprint, firstResultBlock.MaxFingerprint)
require.Equal(t, blockForFirstFolder.Bounds.Min, firstResultBlock.Bounds.Min)
require.Equal(t, blockForFirstFolder.Bounds.Max, firstResultBlock.Bounds.Max)
require.Equal(t, blockForFirstFolder.StartTimestamp, firstResultBlock.StartTimestamp)
require.Equal(t, blockForFirstFolder.EndTimestamp, firstResultBlock.EndTimestamp)
require.Equal(t, blockForFirstFolder.Checksum, firstResultBlock.Checksum)
Expand All @@ -315,8 +311,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.Equal(t, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600000-1695535200000-2", path)
require.Equal(t, blockForSecondFolder.TenantID, secondResultBlock.TenantID)
require.Equal(t, blockForSecondFolder.TableName, secondResultBlock.TableName)
require.Equal(t, blockForSecondFolder.MinFingerprint, secondResultBlock.MinFingerprint)
require.Equal(t, blockForSecondFolder.MaxFingerprint, secondResultBlock.MaxFingerprint)
require.Equal(t, blockForSecondFolder.Bounds.Min, secondResultBlock.Bounds.Min)
require.Equal(t, blockForSecondFolder.Bounds.Max, secondResultBlock.Bounds.Max)
require.Equal(t, blockForSecondFolder.StartTimestamp, secondResultBlock.StartTimestamp)
require.Equal(t, blockForSecondFolder.EndTimestamp, secondResultBlock.EndTimestamp)
require.Equal(t, blockForSecondFolder.Checksum, secondResultBlock.Checksum)
Expand Down Expand Up @@ -345,8 +341,7 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) {
Ref: Ref{
TenantID: "tenantA",
TableName: "second-period-19624",
MinFingerprint: 0xaaaa,
MaxFingerprint: 0xbbbb,
Bounds: v1.NewBounds(0xaaaa, 0xbbbb),
StartTimestamp: Date(2023, time.September, 24, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 24, 6, 0, 0),
Checksum: 2,
Expand All @@ -357,8 +352,7 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) {
Ref: Ref{
TenantID: "tenantA",
TableName: "first-period-19621",
MinFingerprint: 0xeeee,
MaxFingerprint: 0xffff,
Bounds: v1.NewBounds(0xeeee, 0xffff),
StartTimestamp: Date(2023, time.September, 21, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 21, 6, 0, 0),
Checksum: 1,
Expand Down Expand Up @@ -398,8 +392,7 @@ func Test_createMetaRef(t *testing.T) {
Ref: Ref{
TenantID: "tenant1",
TableName: "table1",
MinFingerprint: 0xaaa,
MaxFingerprint: 0xbbb,
Bounds: v1.NewBounds(0xaaa, 0xbbb),
StartTimestamp: 1234567890,
EndTimestamp: 9876543210,
Checksum: 0xabcdef,
Expand Down Expand Up @@ -534,8 +527,7 @@ func createMetaEntity(
Ref: Ref{
TenantID: tenant,
TableName: tableName,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)),
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
Checksum: metaChecksum,
Expand All @@ -547,8 +539,7 @@ func createMetaEntity(
Ref: Ref{
TenantID: tenant,
Checksum: metaChecksum + 1,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)),
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
},
Expand All @@ -561,8 +552,7 @@ func createMetaEntity(
Ref: Ref{
TenantID: tenant,
Checksum: metaChecksum + 2,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)),
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keysp
Ref: Ref{
TenantID: "fake",
TableName: fmt.Sprintf("%s%d", schemaCfg.Configs[0].IndexTables.Prefix, 0),
MinFingerprint: uint64(keyspace.Min),
MaxFingerprint: uint64(keyspace.Max),
Bounds: keyspace,
StartTimestamp: ts,
EndTimestamp: ts,
},
Expand Down
Loading

0 comments on commit 5d6afa0

Please sign in to comment.