Skip to content

Commit

Permalink
(chore) Bloom shipper: Extend Interval struct with utility functions (
Browse files Browse the repository at this point in the history
grafana#11841)

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored and rhnasc committed Apr 12, 2024
1 parent 2d038b0 commit 9da52e7
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 70 deletions.
5 changes: 1 addition & 4 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ type processor struct {

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) {
interval := bloomshipper.Interval{
Start: ts,
End: ts.Add(Day),
}
interval := bloomshipper.NewInterval(ts, ts.Add(Day))
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ func (w *worker) running(_ context.Context) error {
}

// interval is [Start, End)
interval := bloomshipper.Interval{
Start: day, // inclusive
End: day.Add(Day), // non-inclusive
}

interval := bloomshipper.NewInterval(day, day.Add(Day))
logger := log.With(w.logger, "day", day.Time(), "tenant", tasks[0].Tenant)
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))

Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (b FingerprintBounds) Hash(h hash.Hash32) error {
enc.PutBE64(uint64(b.Min))
enc.PutBE64(uint64(b.Max))
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing OwnershipRange")
return errors.Wrap(err, "writing FingerprintBounds")
}

func (b FingerprintBounds) String() string {
Expand All @@ -54,6 +54,7 @@ func (b FingerprintBounds) Cmp(fp model.Fingerprint) BoundsCheck {
return Overlap
}

// Overlaps returns whether the bounds (partially) overlap with the target bounds
func (b FingerprintBounds) Overlaps(target FingerprintBounds) bool {
return b.Cmp(target.Min) != After && b.Cmp(target.Max) != Before
}
Expand All @@ -63,7 +64,7 @@ func (b FingerprintBounds) Slice(min, max model.Fingerprint) *FingerprintBounds
return b.Intersection(FingerprintBounds{Min: min, Max: max})
}

// Returns whether the fingerprint is fully within the target bounds
// Within returns whether the fingerprint is fully within the target bounds
func (b FingerprintBounds) Within(target FingerprintBounds) bool {
return b.Min >= target.Min && b.Max <= target.Max
}
Expand Down
28 changes: 13 additions & 15 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,20 @@ func (r Ref) Cmp(fp uint64) v1.BoundsCheck {
return v1.Overlap
}

func (r Ref) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(r.MinFingerprint), model.Fingerprint(r.MaxFingerprint))
}

func (r Ref) Interval() Interval {
return NewInterval(r.StartTimestamp, r.EndTimestamp)
}

type BlockRef struct {
Ref
IndexPath string
BlockPath string
}

func (b *BlockRef) Bounds() v1.FingerprintBounds {
return v1.NewBounds(model.Fingerprint(b.MinFingerprint), model.Fingerprint(b.MaxFingerprint))
}

type MetaRef struct {
Ref
FilePath string
Expand Down Expand Up @@ -282,19 +286,13 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
}, nil
}

func tablesForRange(periodConfig config.PeriodConfig, from, to model.Time) []string {
interval := periodConfig.IndexTables.Period
step := int64(interval.Seconds())
lower := from.Unix() / step
upper := to.Unix() / step
func tablesForRange(periodConfig config.PeriodConfig, interval Interval) []string {
step := int64(periodConfig.IndexTables.Period.Seconds())
lower := interval.Start.Unix() / step
upper := interval.End.Unix() / step
tables := make([]string, 0, 1+upper-lower)
prefix := periodConfig.IndexTables.Prefix
for i := lower; i <= upper; i++ {
tables = append(tables, joinTableName(prefix, i))
tables = append(tables, fmt.Sprintf("%s%d", periodConfig.IndexTables.Prefix, i))
}
return tables
}

func joinTableName(prefix string, tableNumber int64) string {
return fmt.Sprintf("%s%d", prefix, tableNumber)
}
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func Test_BloomClient_FetchMetas(t *testing.T) {

searchParams := MetaSearchParams{
TenantID: "tenantA",

Keyspace: v1.NewBounds(50, 150),
Interval: Interval{Start: fixedDay.Add(-6 * day), End: fixedDay.Add(-1*day - 1*time.Hour)},
Interval: NewInterval(fixedDay.Add(-6*day), fixedDay.Add(-1*day-1*time.Hour)),
}

fetched, err := store.FetchMetas(context.Background(), searchParams)
Expand Down
59 changes: 59 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bloomshipper

import (
"fmt"
"hash"

"github.com/pkg/errors"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/util/encoding"
)

// Interval defines a time range with start end end time
// where the start is inclusive, the end is non-inclusive.
type Interval struct {
Start, End model.Time
}

func NewInterval(start, end model.Time) Interval {
return Interval{Start: start, End: end}
}

func (i Interval) Hash(h hash.Hash32) error {
var enc encoding.Encbuf
enc.PutBE64(uint64(i.Start))
enc.PutBE64(uint64(i.End))
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing Interval")
}

func (i Interval) String() string {
// 13 digits are enough until Sat Nov 20 2286 17:46:39 UTC
return fmt.Sprintf("%013d-%013d", i.Start, i.End)
}

func (i Interval) Repr() string {
return fmt.Sprintf("[%s, %s)", i.Start.Time().UTC(), i.End.Time().UTC())
}

// Cmp returns the position of a time relative to the interval
func (i Interval) Cmp(ts model.Time) v1.BoundsCheck {
if ts.Before(i.Start) {
return v1.Before
} else if ts.After(i.End) || ts.Equal(i.End) {
return v1.After
}
return v1.Overlap
}

// Overlaps returns whether the interval overlaps (partially) with the target interval
func (i Interval) Overlaps(target Interval) bool {
return i.Cmp(target.Start) != v1.After && i.Cmp(target.End) != v1.Before
}

// Within returns whether the interval is fully within the target interval
func (i Interval) Within(target Interval) bool {
return i.Start >= target.Start && i.End <= target.End
}
50 changes: 50 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package bloomshipper

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func Test_Interval_String(t *testing.T) {
start := model.Time(0)
end := model.TimeFromUnix(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).Unix())
interval := NewInterval(start, end)
assert.Equal(t, "0000000000000-1704067200000", interval.String())
assert.Equal(t, "[1970-01-01 00:00:00 +0000 UTC, 2024-01-01 00:00:00 +0000 UTC)", interval.Repr())
}

func Test_Interval_Cmp(t *testing.T) {
interval := NewInterval(10, 20)
assert.Equal(t, v1.Before, interval.Cmp(0))
assert.Equal(t, v1.Overlap, interval.Cmp(10))
assert.Equal(t, v1.Overlap, interval.Cmp(15))
assert.Equal(t, v1.After, interval.Cmp(20)) // End is not inclusive
assert.Equal(t, v1.After, interval.Cmp(21))
}

func Test_Interval_Overlap(t *testing.T) {
interval := NewInterval(10, 20)
assert.True(t, interval.Overlaps(Interval{Start: 5, End: 15}))
assert.True(t, interval.Overlaps(Interval{Start: 15, End: 25}))
assert.True(t, interval.Overlaps(Interval{Start: 10, End: 20}))
assert.True(t, interval.Overlaps(Interval{Start: 5, End: 25}))
assert.False(t, interval.Overlaps(Interval{Start: 1, End: 9}))
assert.False(t, interval.Overlaps(Interval{Start: 20, End: 30})) // End is not inclusive
assert.False(t, interval.Overlaps(Interval{Start: 25, End: 30}))
}

func Test_Interval_Within(t *testing.T) {
target := NewInterval(10, 20)
assert.False(t, NewInterval(1, 9).Within(target))
assert.False(t, NewInterval(21, 30).Within(target))
assert.True(t, NewInterval(10, 20).Within(target))
assert.True(t, NewInterval(14, 15).Within(target))
assert.False(t, NewInterval(5, 15).Within(target))
assert.False(t, NewInterval(15, 25).Within(target))
assert.False(t, NewInterval(5, 25).Within(target))
}
24 changes: 3 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,12 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)

type Interval struct {
Start, End model.Time
}

func (i Interval) String() string {
return fmt.Sprintf("[%s, %s)", i.Start.Time(), i.End.Time())
}

func (i Interval) Cmp(other model.Time) v1.BoundsCheck {
if other.Before(i.Start) {
return v1.Before
} else if other.After(i.End) || other.Equal(i.End) {
return v1.After
}
return v1.Overlap
}

type BlockQuerierWithFingerprintRange struct {
*v1.BlockQuerier
v1.FingerprintBounds
Expand Down Expand Up @@ -203,14 +185,14 @@ func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintB
// isOutsideRange tests if a given BlockRef b is outside of search boundaries
// defined by min/max timestamp and min/max fingerprint.
// Fingerprint ranges must be sorted in ascending order.
func isOutsideRange(b BlockRef, interval Interval, keyspaces []v1.FingerprintBounds) bool {
func isOutsideRange(b BlockRef, interval Interval, bounds []v1.FingerprintBounds) bool {
// check time interval
if interval.Cmp(b.EndTimestamp) == v1.Before || interval.Cmp(b.StartTimestamp) == v1.After {
if !interval.Overlaps(b.Interval()) {
return true
}

// check fingerprint ranges
for _, keyspace := range keyspaces {
for _, keyspace := range bounds {
if keyspace.Within(b.Bounds()) || keyspace.Overlaps(b.Bounds()) {
return false
}
Expand Down
36 changes: 16 additions & 20 deletions pkg/storage/stores/shipper/bloomshipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func interval(start, end model.Time) Interval {
return Interval{Start: start, End: end}
}

func Test_Shipper_findBlocks(t *testing.T) {
t.Run("expected block that are specified in tombstones to be filtered out", func(t *testing.T) {
metas := []Meta{
Expand Down Expand Up @@ -46,10 +42,10 @@ func Test_Shipper_findBlocks(t *testing.T) {

ts := model.Now()

interval := Interval{
Start: ts.Add(-2 * time.Hour),
End: ts.Add(-1 * time.Hour),
}
interval := NewInterval(
ts.Add(-2*time.Hour),
ts.Add(-1*time.Hour),
)
blocks := BlocksForMetas(metas, interval, []v1.FingerprintBounds{{Min: 100, Max: 200}})

expectedBlockRefs := []BlockRef{
Expand Down Expand Up @@ -103,7 +99,7 @@ func Test_Shipper_findBlocks(t *testing.T) {
for name, data := range tests {
t.Run(name, func(t *testing.T) {
ref := createBlockRef("fake-block", data.minFingerprint, data.maxFingerprint, data.startTimestamp, data.endTimestamp)
blocks := BlocksForMetas([]Meta{{Blocks: []BlockRef{ref}}}, interval(300, 400), []v1.FingerprintBounds{{Min: 100, Max: 200}})
blocks := BlocksForMetas([]Meta{{Blocks: []BlockRef{ref}}}, NewInterval(300, 400), []v1.FingerprintBounds{{Min: 100, Max: 200}})
if data.filtered {
require.Empty(t, blocks)
return
Expand All @@ -120,67 +116,67 @@ func TestIsOutsideRange(t *testing.T) {

t.Run("is outside if startTs > through", func(t *testing.T) {
b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(0, 900), []v1.FingerprintBounds{})
isOutside := isOutsideRange(b, NewInterval(0, 900), []v1.FingerprintBounds{})
require.True(t, isOutside)
})

t.Run("is outside if startTs == through ", func(t *testing.T) {
b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(900, 1000), []v1.FingerprintBounds{})
isOutside := isOutsideRange(b, NewInterval(900, 1000), []v1.FingerprintBounds{})
require.True(t, isOutside)
})

t.Run("is outside if endTs < from", func(t *testing.T) {
b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(2100, 3000), []v1.FingerprintBounds{})
isOutside := isOutsideRange(b, NewInterval(2100, 3000), []v1.FingerprintBounds{})
require.True(t, isOutside)
})

t.Run("is outside if endFp < first fingerprint", func(t *testing.T) {
b := createBlockRef("block", 0, 90, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 100, Max: 199}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 100, Max: 199}})
require.True(t, isOutside)
})

t.Run("is outside if startFp > last fingerprint", func(t *testing.T) {
b := createBlockRef("block", 200, math.MaxUint64, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 49}, {Min: 100, Max: 149}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 49}, {Min: 100, Max: 149}})
require.True(t, isOutside)
})

t.Run("is outside if within gaps in fingerprints", func(t *testing.T) {
b := createBlockRef("block", 100, 199, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.True(t, isOutside)
})

t.Run("is not outside if within fingerprints 1", func(t *testing.T) {
b := createBlockRef("block", 10, 90, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.False(t, isOutside)
})

t.Run("is not outside if within fingerprints 2", func(t *testing.T) {
b := createBlockRef("block", 210, 290, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.False(t, isOutside)
})

t.Run("is not outside if spans across multiple fingerprint ranges", func(t *testing.T) {
b := createBlockRef("block", 50, 250, startTs, endTs)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}})
require.False(t, isOutside)
})

t.Run("is not outside if fingerprint range and time range are larger than block", func(t *testing.T) {
b := createBlockRef("block", math.MaxUint64/3, math.MaxUint64/3*2, startTs, endTs)
isOutside := isOutsideRange(b, interval(0, 3000), []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}})
isOutside := isOutsideRange(b, NewInterval(0, 3000), []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}})
require.False(t, isOutside)
})

t.Run("is not outside if block fingerprint range is bigger that search keyspace", func(t *testing.T) {
b := createBlockRef("block", 0x0000, 0xffff, model.Earliest, model.Latest)
isOutside := isOutsideRange(b, interval(startTs, endTs), []v1.FingerprintBounds{{Min: 0x0100, Max: 0xff00}})
isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0x0100, Max: 0xff00}})
require.False(t, isOutside)
})
}
Expand Down
Loading

0 comments on commit 9da52e7

Please sign in to comment.