Skip to content

Commit

Permalink
Remove bloomcompactor.DayTable in favour of config.DayTime (#11917)
Browse files Browse the repository at this point in the history
Both structs shared the same semantics.
This PR moves additional functionality from the `DayTable` to the `DayTime` struct.
To get the table name of a day (ordinal number of day since unix epoch), call `DayTime.Table()`.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 14, 2024
1 parent 25785e0 commit 1c43991
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 94 deletions.
15 changes: 7 additions & 8 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func runWithRetries(

type tenantTable struct {
tenant string
table DayTable
table config.DayTime
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table DayTable) (v1.Iterator[string], error) {
func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
Expand Down Expand Up @@ -214,10 +214,9 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)

fromDay := DayTable(model.TimeFromUnixNano(from))
throughDay := DayTable(model.TimeFromUnixNano(through))
fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
return newDayRangeIterator(fromDay, throughDay)

}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
Expand Down Expand Up @@ -295,10 +294,10 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro
}

type dayRangeIterator struct {
min, max, cur DayTable
min, max, cur config.DayTime
}

func newDayRangeIterator(min, max DayTable) *dayRangeIterator {
func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec()}
}

Expand All @@ -307,7 +306,7 @@ func (r *dayRangeIterator) Next() bool {
return r.cur.Before(r.max)
}

func (r *dayRangeIterator) At() DayTable {
func (r *dayRangeIterator) At() config.DayTime {
return r.cur
}

Expand Down
38 changes: 0 additions & 38 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"fmt"
"time"

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)
Expand Down Expand Up @@ -70,37 +66,3 @@ type Limits interface {
BloomFalsePositiveRate(tenantID string) float64
BloomCompactorMaxBlockSize(tenantID string) int
}

// TODO(owen-d): Remove this type in favor of config.DayTime
type DayTable model.Time

func (d DayTable) String() string {
return fmt.Sprintf("%d", d.ModelTime().Time().UnixNano()/int64(config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Inc() DayTable {
return DayTable(d.ModelTime().Add(config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Dec() DayTable {
return DayTable(d.ModelTime().Add(-config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Before(other DayTable) bool {
return d.ModelTime().Before(model.Time(other))
}

func (d DayTable) After(other DayTable) bool {
return d.ModelTime().After(model.Time(other))
}

func (d DayTable) ModelTime() model.Time {
return model.Time(d)
}

func (d DayTable) Bounds() bloomshipper.Interval {
return bloomshipper.Interval{
Start: model.Time(d),
End: model.Time(d.Inc()),
}
}
13 changes: 5 additions & 8 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) {

func (s *SimpleBloomController) buildBlocks(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
Expand All @@ -78,15 +79,11 @@ func (s *SimpleBloomController) buildBlocks(
}

// 2. Fetch metas
bounds := table.Bounds()
metas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: bloomshipper.Interval{
Start: bounds.Start,
End: bounds.End,
},
Interval: bloomshipper.NewInterval(table.Bounds()),
Keyspace: ownershipRange,
},
)
Expand Down Expand Up @@ -176,7 +173,7 @@ func (s *SimpleBloomController) buildBlocks(
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(tenant, table.String(), blk)
built, err := bloomshipper.BlockFrom(tenant, table.Table(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
return errors.Wrap(err, "failed to build block")
Expand Down Expand Up @@ -214,7 +211,7 @@ func (s *SimpleBloomController) buildBlocks(

func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
Expand Down
34 changes: 17 additions & 17 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -49,13 +49,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.String(), true) // bypass cache for ease of testing
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Table(), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, true) // bypass cache for ease of testing
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Table(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
}
Expand All @@ -80,14 +80,14 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenan

func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.String(), tenant, withCompression)
data, err := b.storage.GetUserFile(ctx, table.Table(), tenant, withCompression)
if err != nil {
return nil, errors.Wrap(err, "failed to get file")
}
Expand Down Expand Up @@ -244,11 +244,11 @@ func NewTSDBStores(
return res, nil
}

func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) {
func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
for i := len(s.schemaCfg.Configs) - 1; i >= 0; i-- {
period := s.schemaCfg.Configs[i]

if !table.Before(DayTable(period.From.Time)) {
if !table.Before(period.From) {
// we have the desired period config

if s.stores[i] != nil {
Expand All @@ -260,19 +260,19 @@ func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) {
return nil, errors.Errorf(
"store for period is not of TSDB type (%s) while looking up store for (%v)",
period.IndexType,
table.ModelTime().Time(),
table,
)
}

}

return nil, fmt.Errorf(
"There is no store matching no matching period found for table (%v) -- too early",
table.ModelTime().Time(),
"there is no store matching no matching period found for table (%v) -- too early",
table,
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
Expand All @@ -281,7 +281,7 @@ func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]stri
return store.UsersForPeriod(ctx, table)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
Expand All @@ -292,7 +292,7 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant st

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
tasksCh := make(chan Task, len(tasks))
for _, task := range tasks {
task := task
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "day", task.day, "series", len(task.series))
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
Expand Down
7 changes: 4 additions & 3 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
)

const (
Expand Down Expand Up @@ -69,7 +70,7 @@ type Task struct {
ctx context.Context

// TODO(chaudum): Investigate how to remove that.
day model.Time
table config.DayTime
}

// NewTask returns a new Task that can be enqueued to the task queue.
Expand All @@ -89,7 +90,7 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithBounds, filter
filters: filters,
series: refs.series,
bounds: refs.bounds,
day: refs.day,
table: refs.table,
ctx: ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(refs.series)),
Expand Down Expand Up @@ -129,7 +130,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
filters: t.filters,
series: series,
bounds: t.bounds,
day: t.day,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(series)),
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

Expand All @@ -35,10 +35,9 @@ type processor struct {
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) {
interval := bloomshipper.NewInterval(ts, ts.Add(Day))
for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
Expand All @@ -52,8 +51,9 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []v1.FingerprintBounds, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
interval := bloomshipper.NewInterval(day.Bounds())
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: interval,
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util/constants"
)
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
table: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
table: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

Expand Down Expand Up @@ -121,7 +122,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re

type seriesWithBounds struct {
bounds model.Interval
day model.Time
table config.DayTime
series []*logproto.GroupedChunkRefs
}

Expand Down Expand Up @@ -173,7 +174,7 @@ func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds {
Start: minTs,
End: maxTs,
},
day: day,
table: config.NewDayTime(day),
series: res,
})
}
Expand Down
Loading

0 comments on commit 1c43991

Please sign in to comment.