Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TSDB index prefix on blooms directory #11977

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ func runWithRetries(

type tenantTable struct {
tenant string
table config.DayTime
table config.DayTable
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
Expand Down Expand Up @@ -241,15 +241,15 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
return newDayRangeIterator(fromDay, throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
tables := c.tables(time.Now())

for tables.Next() && tables.Err() == nil && ctx.Err() == nil {

table := tables.At()

tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
Expand All @@ -269,7 +269,11 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}:
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -332,19 +336,33 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro

type dayRangeIterator struct {
min, max, cur config.DayTime
curPeriod config.PeriodConfig
schemaCfg config.SchemaConfig
err error
}

func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec()}
func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg}
}

func (r *dayRangeIterator) Next() bool {
r.cur = r.cur.Inc()
return r.cur.Before(r.max)
if !r.cur.Before(r.max) {
return false
}

period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime())
if err != nil {
r.err = errors.Wrapf(err, "getting schema for time (%s)", r.cur)
return false
}
r.curPeriod = period

return true
}

func (r *dayRangeIterator) At() config.DayTime {
return r.cur
func (r *dayRangeIterator) At() config.DayTable {
return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix)
}

func (r *dayRangeIterator) Err() error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ Compaction works as follows, split across many functions for clarity:
*/
func (s *SimpleBloomController) compactTenant(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change the signature to accept the struct, not a pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to log the full table name, the day is probably more convenient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say seeing the full table name is clearer wrt which TSDB table are we working with (and where will we write to)

logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr())

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr())
level.Error(logger).Log("msg", "failed to get client", "err", err)
return errors.Wrap(err, "failed to get client")
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *SimpleBloomController) compactTenant(
func (s *SimpleBloomController) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTime,
table config.DayTable,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
Expand Down Expand Up @@ -215,7 +215,7 @@ func (s *SimpleBloomController) findOutdatedGaps(

func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
Expand All @@ -241,7 +241,7 @@ func (s *SimpleBloomController) loadWorkForGap(
func (s *SimpleBloomController) buildGaps(
ctx context.Context,
tenant string,
table config.DayTime,
table config.DayTable,
client bloomshipper.Client,
work []blockPlan,
logger log.Logger,
Expand Down
24 changes: 12 additions & 12 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -49,12 +49,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
Expand All @@ -80,7 +80,7 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime,

func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand Down Expand Up @@ -272,17 +272,17 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
store, err := s.storeForPeriod(table)
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}

return store.UsersForPeriod(ctx, table)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
Expand All @@ -292,12 +292,12 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, ten

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
store, err := s.storeForPeriod(table)
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
}
ref := bloomshipper.Ref{
TenantID: tenant,
TableName: config.NewDayTime(truncateDay(from)).Addr(),
TableName: config.NewDayTable(config.NewDayTime(truncateDay(from)), "").Addr(),
Bounds: v1.NewBounds(fromFp, throughFp),
StartTimestamp: from,
EndTimestamp: through,
Expand Down
41 changes: 24 additions & 17 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa
}
}

func (cfg *PeriodConfig) GetFullTableName(t model.Time) string {
return NewDayTime(t).TableWithPrefix(cfg)
}

func NewDayTime(d model.Time) DayTime {
return DayTime{d}
}
Expand Down Expand Up @@ -237,19 +233,6 @@ func (d DayTime) String() string {
return d.Time.Time().UTC().Format("2006-01-02")
}

// Addr returns the unix day offset as a string, which is used
// as the address for the index table in storage.
func (d DayTime) Addr() string {
return fmt.Sprintf("%d",
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

func (d DayTime) TableWithPrefix(cfg *PeriodConfig) string {
return fmt.Sprintf("%s%d",
cfg.IndexTables.Prefix,
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

func (d DayTime) Inc() DayTime {
return DayTime{d.Add(ObjectStorageIndexRequiredPeriod)}
}
Expand All @@ -274,6 +257,30 @@ func (d DayTime) Bounds() (model.Time, model.Time) {
return d.Time, d.Inc().Time
}

type DayTable struct {
DayTime
Prefix string
}

func (d DayTable) String() string {
return d.Addr()
}

func NewDayTable(d DayTime, prefix string) DayTable {
return DayTable{
DayTime: d,
Prefix: prefix,
}
}

// Addr returns the prefix (if any) and the unix day offset as a string, which is used
// as the address for the index table in storage.
func (d DayTable) Addr() string {
return fmt.Sprintf("%s%d",
d.Prefix,
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

// SchemaConfig contains the config for our chunk index schemas
type SchemaConfig struct {
Configs []PeriodConfig `yaml:"configs"`
Expand Down
Loading