diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 5685851aef512..135daa88a73ed 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -82,7 +82,7 @@ outer: for blockIter.Next() { bq := blockIter.At() for i, block := range data { - if block.blockRef.Bounds().Equal(bq.FingerprintBounds) { + if block.blockRef.Bounds.Equal(bq.FingerprintBounds) { err := p.processBlock(ctx, bq.BlockQuerier, block.tasks) if err != nil { return err diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index 6b43e688a4cc5..7ef8e067bac38 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -50,7 +50,7 @@ func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef) for _, ref := range refs { for _, bq := range s.querieres { - if ref.Bounds().Equal(bq.FingerprintBounds) { + if ref.Bounds.Equal(bq.FingerprintBounds) { result = append(result, bq) } } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 969f0ddacd7b6..a705d1965780f 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -71,8 +71,7 @@ func TestTruncateDay(t *testing.T) { func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef { return bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ - MinFingerprint: minFp, - MaxFingerprint: maxFp, + Bounds: v1.NewBounds(model.Fingerprint(minFp), model.Fingerprint(maxFp)), }, } } @@ -339,8 +338,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, ref := bloomshipper.Ref{ TenantID: tenant, TableName: "table_0", - MinFingerprint: uint64(fromFp), - MaxFingerprint: uint64(throughFp), + Bounds: v1.NewBounds(fromFp, throughFp), StartTimestamp: from, EndTimestamp: through, } @@ -390,9 +388,8 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _ blooms for i := range s.bqs { blocks = append(blocks, bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ - MinFingerprint: uint64(s.bqs[i].Min), - MaxFingerprint: uint64(s.bqs[i].Max), - TenantID: tenant, + Bounds: v1.NewBounds(s.bqs[i].Min, s.bqs[i].Max), + TenantID: tenant, }, }) } @@ -457,8 +454,7 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp Ref: bloomshipper.Ref{ TenantID: tenant, TableName: "", - MinFingerprint: uint64(data[i].Min), - MaxFingerprint: uint64(data[i].Max), + Bounds: v1.NewBounds(data[i].Min, data[i].Max), StartTimestamp: 0, EndTimestamp: 0, Checksum: 0, diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 5c6b8a76dbb22..46229538c73d6 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -246,7 +246,7 @@ func (w *worker) stopping(err error) error { func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error { for _, b := range boundedRefs { - if b.blockRef.Bounds().Equal(bounds) { + if b.blockRef.Bounds.Equal(bounds) { return w.processBlock(bq, b.tasks) } } diff --git a/pkg/storage/bloom/v1/bounds.go b/pkg/storage/bloom/v1/bounds.go index a41d70a89d867..a9521d7c60152 100644 --- a/pkg/storage/bloom/v1/bounds.go +++ b/pkg/storage/bloom/v1/bounds.go @@ -1,7 +1,10 @@ package v1 import ( + "fmt" "hash" + "strconv" + "strings" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -17,6 +20,34 @@ const ( After ) +func ParseFingerprint(s string) (model.Fingerprint, error) { + fp, err := strconv.ParseUint(s, 16, 64) + if err != nil { + return 0, fmt.Errorf("error parsing fingerprint %s : %w", s, err) + } + return model.Fingerprint(fp), nil +} + +// ParseBoundsFromAddr parses a fingerprint bounds from a string +func ParseBoundsFromAddr(s string) (FingerprintBounds, error) { + parts := strings.Split(s, "-") + return ParseBoundsFromParts(parts[0], parts[1]) +} + +// ParseBoundsFromParts parses a fingerprint bounds already separated strings +func ParseBoundsFromParts(a, b string) (FingerprintBounds, error) { + minFingerprint, err := ParseFingerprint(a) + if err != nil { + return FingerprintBounds{}, fmt.Errorf("error parsing minFingerprint %s : %w", a, err) + } + maxFingerprint, err := ParseFingerprint(b) + if err != nil { + return FingerprintBounds{}, fmt.Errorf("error parsing maxFingerprint %s : %w", b, err) + } + + return NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), nil +} + type FingerprintBounds struct { Min, Max model.Fingerprint } @@ -33,8 +64,16 @@ func (b FingerprintBounds) Hash(h hash.Hash32) error { return errors.Wrap(err, "writing OwnershipRange") } +// Addr returns the string representation of the fingerprint bounds for use in +// content addressable storage. +// TODO(owen-d): incorporate this into the schema so we can change it, +// similar to `{,Parse}ExternalKey` +func (b FingerprintBounds) Addr() string { + return fmt.Sprintf("%x-%x", uint64(b.Min), uint64(b.Max)) +} + func (b FingerprintBounds) String() string { - return b.Min.String() + "-" + b.Max.String() + return b.Addr() } func (b FingerprintBounds) Less(other FingerprintBounds) bool { diff --git a/pkg/storage/bloom/v1/bounds_test.go b/pkg/storage/bloom/v1/bounds_test.go index 3a80f6e6b849a..e99c1bcb1524e 100644 --- a/pkg/storage/bloom/v1/bounds_test.go +++ b/pkg/storage/bloom/v1/bounds_test.go @@ -3,12 +3,31 @@ package v1 import ( "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) -func Test_FingerprintBounds_String(t *testing.T) { - bounds := NewBounds(1, 2) - assert.Equal(t, "0000000000000001-0000000000000002", bounds.String()) +func Test_ParseFingerprint(t *testing.T) { + fp, err := ParseFingerprint("7d0") + assert.NoError(t, err) + assert.Equal(t, model.Fingerprint(2000), fp) +} + +func Test_FingerprintBounds_Addr(t *testing.T) { + bounds := NewBounds(10, 2000) + assert.Equal(t, "a-7d0", bounds.Addr()) +} + +func Test_ParseBoundsFromAddr(t *testing.T) { + bounds, err := ParseBoundsFromAddr("a-7d0") + assert.NoError(t, err) + assert.Equal(t, NewBounds(10, 2000), bounds) +} + +func Test_ParseBoundsFromParts(t *testing.T) { + bounds, err := ParseBoundsFromParts("a", "7d0") + assert.NoError(t, err) + assert.Equal(t, NewBounds(10, 2000), bounds) } func Test_FingerprintBounds_Cmp(t *testing.T) { diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 835ee13686d37..f3d721b5a8885 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -28,21 +28,16 @@ const ( ) type Ref struct { - TenantID string - TableName string - MinFingerprint, MaxFingerprint uint64 - StartTimestamp, EndTimestamp model.Time - Checksum uint32 + TenantID string + TableName string + Bounds v1.FingerprintBounds + StartTimestamp, EndTimestamp model.Time + Checksum uint32 } // Cmp returns the fingerprint's position relative to the bounds func (r Ref) Cmp(fp uint64) v1.BoundsCheck { - if fp < r.MinFingerprint { - return v1.Before - } else if fp > r.MaxFingerprint { - return v1.After - } - return v1.Overlap + return r.Bounds.Cmp(model.Fingerprint(fp)) } type BlockRef struct { @@ -51,10 +46,6 @@ type BlockRef struct { BlockPath string } -func (b *BlockRef) Bounds() v1.FingerprintBounds { - return v1.NewBounds(model.Fingerprint(b.MinFingerprint), model.Fingerprint(b.MaxFingerprint)) -} - type MetaRef struct { Ref FilePath string @@ -131,13 +122,13 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error { } func externalBlockKey(ref BlockRef) string { - blockParentFolder := fmt.Sprintf("%x-%x", ref.MinFingerprint, ref.MaxFingerprint) + blockParentFolder := ref.Bounds.Addr() filename := fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum) return path.Join(rootFolder, ref.TableName, ref.TenantID, bloomsFolder, blockParentFolder, filename) } func externalMetaKey(ref MetaRef) string { - filename := fmt.Sprintf("%x-%x-%d-%d-%x", ref.MinFingerprint, ref.MaxFingerprint, ref.StartTimestamp, ref.EndTimestamp, ref.Checksum) + filename := fmt.Sprintf("%s-%d-%d-%x", ref.Bounds.Addr(), ref.StartTimestamp, ref.EndTimestamp, ref.Checksum) return path.Join(rootFolder, ref.TableName, ref.TenantID, metasFolder, filename) } @@ -247,15 +238,11 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef if len(parts) != 5 { return MetaRef{}, fmt.Errorf("%s filename parts count must be 5 but was %d: [%s]", objectKey, len(parts), strings.Join(parts, ", ")) } - - minFingerprint, err := strconv.ParseUint(parts[0], 16, 64) + bounds, err := v1.ParseBoundsFromParts(parts[0], parts[1]) if err != nil { - return MetaRef{}, fmt.Errorf("error parsing minFingerprint %s : %w", parts[0], err) - } - maxFingerprint, err := strconv.ParseUint(parts[1], 16, 64) - if err != nil { - return MetaRef{}, fmt.Errorf("error parsing maxFingerprint %s : %w", parts[1], err) + return MetaRef{}, fmt.Errorf("error parsing bounds %s : %w", parts[0], err) } + startTimestamp, err := strconv.ParseInt(parts[2], 10, 64) if err != nil { return MetaRef{}, fmt.Errorf("error parsing startTimestamp %s : %w", parts[2], err) @@ -272,8 +259,7 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef Ref: Ref{ TenantID: tenantID, TableName: tableName, - MinFingerprint: minFingerprint, - MaxFingerprint: maxFingerprint, + Bounds: bounds, StartTimestamp: model.Time(startTimestamp), EndTimestamp: model.Time(endTimestamp), Checksum: uint32(checksum), diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 30aac5c901e08..1450c0215d93e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -219,8 +219,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) { Ref: Ref{ TenantID: "tenantA", TableName: "first-period-19621", - MinFingerprint: 0xeeee, - MaxFingerprint: 0xffff, + Bounds: v1.NewBounds(0xeeee, 0xffff), StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, @@ -231,8 +230,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) { Ref: Ref{ TenantID: "tenantA", TableName: "second-period-19624", - MinFingerprint: 0xaaaa, - MaxFingerprint: 0xbbbb, + Bounds: v1.NewBounds(0xaaaa, 0xbbbb), StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, @@ -261,8 +259,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { Ref: Ref{ TenantID: "tenantA", TableName: "first-period-19621", - MinFingerprint: 0xeeee, - MaxFingerprint: 0xffff, + Bounds: v1.NewBounds(0xeeee, 0xffff), StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, @@ -278,8 +275,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { Ref: Ref{ TenantID: "tenantA", TableName: "second-period-19624", - MinFingerprint: 0xaaaa, - MaxFingerprint: 0xbbbb, + Bounds: v1.NewBounds(0xaaaa, 0xbbbb), StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, @@ -297,8 +293,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.Equal(t, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400000-1695276000000-1", path) require.Equal(t, blockForFirstFolder.TenantID, firstResultBlock.TenantID) require.Equal(t, blockForFirstFolder.TableName, firstResultBlock.TableName) - require.Equal(t, blockForFirstFolder.MinFingerprint, firstResultBlock.MinFingerprint) - require.Equal(t, blockForFirstFolder.MaxFingerprint, firstResultBlock.MaxFingerprint) + require.Equal(t, blockForFirstFolder.Bounds.Min, firstResultBlock.Bounds.Min) + require.Equal(t, blockForFirstFolder.Bounds.Max, firstResultBlock.Bounds.Max) require.Equal(t, blockForFirstFolder.StartTimestamp, firstResultBlock.StartTimestamp) require.Equal(t, blockForFirstFolder.EndTimestamp, firstResultBlock.EndTimestamp) require.Equal(t, blockForFirstFolder.Checksum, firstResultBlock.Checksum) @@ -315,8 +311,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.Equal(t, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600000-1695535200000-2", path) require.Equal(t, blockForSecondFolder.TenantID, secondResultBlock.TenantID) require.Equal(t, blockForSecondFolder.TableName, secondResultBlock.TableName) - require.Equal(t, blockForSecondFolder.MinFingerprint, secondResultBlock.MinFingerprint) - require.Equal(t, blockForSecondFolder.MaxFingerprint, secondResultBlock.MaxFingerprint) + require.Equal(t, blockForSecondFolder.Bounds.Min, secondResultBlock.Bounds.Min) + require.Equal(t, blockForSecondFolder.Bounds.Max, secondResultBlock.Bounds.Max) require.Equal(t, blockForSecondFolder.StartTimestamp, secondResultBlock.StartTimestamp) require.Equal(t, blockForSecondFolder.EndTimestamp, secondResultBlock.EndTimestamp) require.Equal(t, blockForSecondFolder.Checksum, secondResultBlock.Checksum) @@ -345,8 +341,7 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) { Ref: Ref{ TenantID: "tenantA", TableName: "second-period-19624", - MinFingerprint: 0xaaaa, - MaxFingerprint: 0xbbbb, + Bounds: v1.NewBounds(0xaaaa, 0xbbbb), StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, @@ -357,8 +352,7 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) { Ref: Ref{ TenantID: "tenantA", TableName: "first-period-19621", - MinFingerprint: 0xeeee, - MaxFingerprint: 0xffff, + Bounds: v1.NewBounds(0xeeee, 0xffff), StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, @@ -398,8 +392,7 @@ func Test_createMetaRef(t *testing.T) { Ref: Ref{ TenantID: "tenant1", TableName: "table1", - MinFingerprint: 0xaaa, - MaxFingerprint: 0xbbb, + Bounds: v1.NewBounds(0xaaa, 0xbbb), StartTimestamp: 1234567890, EndTimestamp: 9876543210, Checksum: 0xabcdef, @@ -534,8 +527,7 @@ func createMetaEntity( Ref: Ref{ TenantID: tenant, TableName: tableName, - MinFingerprint: minFingerprint, - MaxFingerprint: maxFingerprint, + Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), StartTimestamp: startTimestamp, EndTimestamp: endTimestamp, Checksum: metaChecksum, @@ -547,8 +539,7 @@ func createMetaEntity( Ref: Ref{ TenantID: tenant, Checksum: metaChecksum + 1, - MinFingerprint: minFingerprint, - MaxFingerprint: maxFingerprint, + Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), StartTimestamp: startTimestamp, EndTimestamp: endTimestamp, }, @@ -561,8 +552,7 @@ func createMetaEntity( Ref: Ref{ TenantID: tenant, Checksum: metaChecksum + 2, - MinFingerprint: minFingerprint, - MaxFingerprint: maxFingerprint, + Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), StartTimestamp: startTimestamp, EndTimestamp: endTimestamp, }, diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index 85117a718f629..3bb3e1348f1bd 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -28,8 +28,7 @@ func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keysp Ref: Ref{ TenantID: "fake", TableName: fmt.Sprintf("%s%d", schemaCfg.Configs[0].IndexTables.Prefix, 0), - MinFingerprint: uint64(keyspace.Min), - MaxFingerprint: uint64(keyspace.Max), + Bounds: keyspace, StartTimestamp: ts, EndTimestamp: ts, }, diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index 54c2185fae56d..223d13a895f56 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "math" + "sort" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "golang.org/x/exp/slices" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" @@ -123,7 +123,7 @@ func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error { _ = b.Close() }(block) - err := callback(block.closableBlockQuerier.BlockQuerier, block.Bounds()) + err := callback(block.closableBlockQuerier.BlockQuerier, block.Bounds) if err != nil { return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err) } @@ -186,15 +186,8 @@ func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintB blockRefs = append(blockRefs, ref) } - slices.SortStableFunc(blockRefs, func(a, b BlockRef) int { - if a.MinFingerprint < b.MinFingerprint { - return -1 - } - if a.MinFingerprint > b.MinFingerprint { - return 1 - } - - return 0 + sort.Slice(blockRefs, func(i, j int) bool { + return blockRefs[i].Bounds.Less(blockRefs[j].Bounds) }) return blockRefs @@ -211,9 +204,10 @@ func isOutsideRange(b BlockRef, interval Interval, keyspaces []v1.FingerprintBou // check fingerprint ranges for _, keyspace := range keyspaces { - if keyspace.Within(b.Bounds()) || keyspace.Overlaps(b.Bounds()) { + if keyspace.Overlaps(b.Bounds) { return false } + } return true diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index d2311f808e26f..d3d6405865ede 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -199,8 +199,7 @@ func createBlockRef( Ref: Ref{ TenantID: "fake", TableName: fmt.Sprintf("%d", day), - MinFingerprint: minFingerprint, - MaxFingerprint: maxFingerprint, + Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), StartTimestamp: startTimestamp, EndTimestamp: endTimestamp, Checksum: 0, diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 0c2f2d5405515..7c53380bc353e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/storage" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" @@ -54,10 +55,20 @@ func (b *bloomStoreEntry) ResolveMetas(ctx context.Context, params MetaSearchPar if err != nil { return nil, nil, err } - if metaRef.MaxFingerprint < uint64(params.Keyspace.Min) || uint64(params.Keyspace.Max) < metaRef.MinFingerprint || + + // LIST calls return keys in lexicographic order. + // Since fingerprints are the first part of the path, + // we can stop iterating once we find an item greater + // than the keyspace we're looking for + if params.Keyspace.Cmp(model.Fingerprint(metaRef.Bounds.Min)) == v1.After { + break + } + + if !params.Keyspace.Overlaps(metaRef.Bounds) || metaRef.EndTimestamp.Before(params.Interval.Start) || metaRef.StartTimestamp.After(params.Interval.End) { continue } + refs = append(refs, metaRef) } }