From 3466cdc0cfbd77ec47f7a07ad670843c5c6ba5ca Mon Sep 17 00:00:00 2001 From: Salva Corts <salva.corts@grafana.com> Date: Fri, 16 Feb 2024 15:08:47 +0100 Subject: [PATCH 1/4] Use TSDB index prefix on blooms directory --- pkg/bloomcompactor/bloomcompactor.go | 23 +++++++++++++++++------ pkg/bloomcompactor/controller.go | 24 ++++++++++++++---------- pkg/bloomcompactor/tsdb.go | 27 +++++++++++++++------------ pkg/storage/config/schema_config.go | 4 ++-- 4 files changed, 48 insertions(+), 30 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index cc752c2224a63..920161caf258a 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -180,11 +180,12 @@ func runWithRetries( type tenantTable struct { tenant string table config.DayTime + period config.PeriodConfig ownershipRange v1.FingerprintBounds } -func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) { - tenants, err := c.tsdbStore.UsersForPeriod(ctx, table) +func (c *Compactor) tenants(ctx context.Context, table config.DayTime, period config.PeriodConfig) (v1.Iterator[string], error) { + tenants, err := c.tsdbStore.UsersForPeriod(ctx, table, period) if err != nil { return nil, errors.Wrap(err, "getting tenants") } @@ -248,9 +249,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { tables := c.tables(time.Now()) for tables.Next() && tables.Err() == nil && ctx.Err() == nil { - table := tables.At() - tenants, err := c.tenants(ctx, table) + + period, err := c.schemaCfg.SchemaForTime(table.ModelTime()) + if err != nil { + return errors.Wrap(err, "getting schema for time") + } + + tenants, err := c.tenants(ctx, table, period) if err != nil { return errors.Wrap(err, "getting tenants") } @@ -269,7 +275,12 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { c.metrics.tenantsOwned.Inc() select { - case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}: + case ch <- tenantTable{ + tenant: tenant, + table: table, + period: period, + ownershipRange: ownershipRange, + }: case <-ctx.Done(): return ctx.Err() } @@ -327,7 +338,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error { level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange) - return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange) + return c.controller.compactTenant(ctx, tt.table, tt.period, tt.tenant, tt.ownershipRange) } type dayRangeIterator struct { diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index cee0e6f058201..e05e3ae8818c4 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -67,14 +67,15 @@ Compaction works as follows, split across many functions for clarity: func (s *SimpleBloomController) compactTenant( ctx context.Context, table config.DayTime, + period config.PeriodConfig, tenant string, ownershipRange v1.FingerprintBounds, ) error { - logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table) + logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.AddrWithPreffix(&period)) client, err := s.bloomStore.Client(table.ModelTime()) if err != nil { - level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr()) + level.Error(logger).Log("msg", "failed to get client", "err", err) return errors.Wrap(err, "failed to get client") } @@ -93,13 +94,13 @@ func (s *SimpleBloomController) compactTenant( } // build compaction plans - work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) + work, err := s.findOutdatedGaps(ctx, tenant, table, period, ownershipRange, metas, logger) if err != nil { return errors.Wrap(err, "failed to find outdated gaps") } // build new blocks - built, err := s.buildGaps(ctx, tenant, table, client, work, logger) + built, err := s.buildGaps(ctx, tenant, table, period, client, work, logger) if err != nil { return errors.Wrap(err, "failed to build gaps") } @@ -176,12 +177,13 @@ func (s *SimpleBloomController) findOutdatedGaps( ctx context.Context, tenant string, table config.DayTime, + period config.PeriodConfig, ownershipRange v1.FingerprintBounds, metas []bloomshipper.Meta, logger log.Logger, ) ([]blockPlan, error) { // Resolve TSDBs - tsdbs, err := s.tsdbStore.ResolveTSDBs(ctx, table, tenant) + tsdbs, err := s.tsdbStore.ResolveTSDBs(ctx, table, period, tenant) if err != nil { level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err) return nil, errors.Wrap(err, "failed to resolve tsdbs") @@ -216,12 +218,13 @@ func (s *SimpleBloomController) findOutdatedGaps( func (s *SimpleBloomController) loadWorkForGap( ctx context.Context, table config.DayTime, + period config.PeriodConfig, tenant string, id tsdb.Identifier, gap gapWithBlocks, ) (v1.CloseableIterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) { // load a series iterator for the gap - seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds) + seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, period, tenant, id, gap.bounds) if err != nil { return nil, nil, errors.Wrap(err, "failed to load tsdb") } @@ -242,6 +245,7 @@ func (s *SimpleBloomController) buildGaps( ctx context.Context, tenant string, table config.DayTime, + period config.PeriodConfig, client bloomshipper.Client, work []blockPlan, logger log.Logger, @@ -276,7 +280,7 @@ func (s *SimpleBloomController) buildGaps( MetaRef: bloomshipper.MetaRef{ Ref: bloomshipper.Ref{ TenantID: tenant, - TableName: table.Addr(), + TableName: table.AddrWithPreffix(&period), Bounds: gap.bounds, }, }, @@ -285,7 +289,7 @@ func (s *SimpleBloomController) buildGaps( // Fetch blocks that aren't up to date but are in the desired fingerprint range // to try and accelerate bloom creation - seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap) + seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, period, tenant, plan.tsdb, gap) if err != nil { level.Error(logger).Log("msg", "failed to get series and blocks", "err", err) return nil, errors.Wrap(err, "failed to get series and blocks") @@ -318,7 +322,7 @@ func (s *SimpleBloomController) buildGaps( blockCt++ blk := newBlocks.At() - built, err := bloomshipper.BlockFrom(tenant, table.Addr(), blk) + built, err := bloomshipper.BlockFrom(tenant, table.AddrWithPreffix(&period), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) blocksIter.Close() @@ -346,7 +350,7 @@ func (s *SimpleBloomController) buildGaps( blocksIter.Close() // Write the new meta - ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks) + ref, err := bloomshipper.MetaRefFrom(tenant, table.AddrWithPreffix(&period), gap.bounds, meta.Sources, meta.Blocks) if err != nil { level.Error(logger).Log("msg", "failed to checksum meta", "err", err) return nil, errors.Wrap(err, "failed to checksum meta") diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index d19e185a9275b..d4b7af4c5d5c3 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -26,11 +26,12 @@ const ( ) type TSDBStore interface { - UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) - ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) + UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) + ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) LoadTSDB( ctx context.Context, table config.DayTime, + period config.PeriodConfig, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, @@ -49,13 +50,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore { } } -func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) { - _, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing +func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) { + _, users, err := b.storage.ListFiles(ctx, table.AddrWithPreffix(&period), true) // bypass cache for ease of testing return users, err } -func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { - indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing +func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + indices, err := b.storage.ListUserFiles(ctx, table.AddrWithPreffix(&period), tenant, true) // bypass cache for ease of testing if err != nil { return nil, errors.Wrap(err, "failed to list user files") } @@ -81,13 +82,14 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, func (b *BloomTSDBStore) LoadTSDB( ctx context.Context, table config.DayTime, + period config.PeriodConfig, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, ) (v1.CloseableIterator[*v1.Series], error) { withCompression := id.Name() + gzipExtension - data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression) + data, err := b.storage.GetUserFile(ctx, table.AddrWithPreffix(&period), tenant, withCompression) if err != nil { return nil, errors.Wrap(err, "failed to get file") } @@ -272,27 +274,28 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) { ) } -func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) { +func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) { store, err := s.storeForPeriod(table) if err != nil { return nil, err } - return store.UsersForPeriod(ctx, table) + return store.UsersForPeriod(ctx, table, period) } -func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { +func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { store, err := s.storeForPeriod(table) if err != nil { return nil, err } - return store.ResolveTSDBs(ctx, table, tenant) + return store.ResolveTSDBs(ctx, table, period, tenant) } func (s *TSDBStores) LoadTSDB( ctx context.Context, table config.DayTime, + period config.PeriodConfig, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, @@ -302,5 +305,5 @@ func (s *TSDBStores) LoadTSDB( return nil, err } - return store.LoadTSDB(ctx, table, tenant, id, bounds) + return store.LoadTSDB(ctx, table, period, tenant, id, bounds) } diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index b7c92c62c3d94..fcedec4c1ee4b 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -201,7 +201,7 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa } func (cfg *PeriodConfig) GetFullTableName(t model.Time) string { - return NewDayTime(t).TableWithPrefix(cfg) + return NewDayTime(t).AddrWithPreffix(cfg) } func NewDayTime(d model.Time) DayTime { @@ -244,7 +244,7 @@ func (d DayTime) Addr() string { d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) } -func (d DayTime) TableWithPrefix(cfg *PeriodConfig) string { +func (d DayTime) AddrWithPreffix(cfg *PeriodConfig) string { return fmt.Sprintf("%s%d", cfg.IndexTables.Prefix, d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) From df2f0687b9790620647ed0bb5d099b7e9a40c20a Mon Sep 17 00:00:00 2001 From: Salva Corts <salva.corts@grafana.com> Date: Fri, 16 Feb 2024 16:36:19 +0100 Subject: [PATCH 2/4] use DayTable --- pkg/bloomcompactor/bloomcompactor.go | 41 ++++++++++++++++------------ pkg/bloomcompactor/controller.go | 30 +++++++++----------- pkg/bloomcompactor/tsdb.go | 39 ++++++++++++-------------- pkg/bloomgateway/util_test.go | 2 +- pkg/storage/config/schema_config.go | 37 +++++++++++++------------ 5 files changed, 76 insertions(+), 73 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 920161caf258a..bc4a7f005be14 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -179,13 +179,12 @@ func runWithRetries( type tenantTable struct { tenant string - table config.DayTime - period config.PeriodConfig + table config.DayTable ownershipRange v1.FingerprintBounds } -func (c *Compactor) tenants(ctx context.Context, table config.DayTime, period config.PeriodConfig) (v1.Iterator[string], error) { - tenants, err := c.tsdbStore.UsersForPeriod(ctx, table, period) +func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iterator[string], error) { + tenants, err := c.tsdbStore.UsersForPeriod(ctx, table) if err != nil { return nil, errors.Wrap(err, "getting tenants") } @@ -242,7 +241,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator { fromDay := config.NewDayTime(model.TimeFromUnixNano(from)) throughDay := config.NewDayTime(model.TimeFromUnixNano(through)) - return newDayRangeIterator(fromDay, throughDay) + return newDayRangeIterator(fromDay, throughDay, c.schemaCfg) } func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { @@ -251,12 +250,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { for tables.Next() && tables.Err() == nil && ctx.Err() == nil { table := tables.At() - period, err := c.schemaCfg.SchemaForTime(table.ModelTime()) - if err != nil { - return errors.Wrap(err, "getting schema for time") - } - - tenants, err := c.tenants(ctx, table, period) + tenants, err := c.tenants(ctx, table) if err != nil { return errors.Wrap(err, "getting tenants") } @@ -278,7 +272,6 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { case ch <- tenantTable{ tenant: tenant, table: table, - period: period, ownershipRange: ownershipRange, }: case <-ctx.Done(): @@ -338,24 +331,38 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error { level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange) - return c.controller.compactTenant(ctx, tt.table, tt.period, tt.tenant, tt.ownershipRange) + return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange) } type dayRangeIterator struct { min, max, cur config.DayTime + curPeriod config.PeriodConfig + schemaCfg config.SchemaConfig + err error } -func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator { +func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator { return &dayRangeIterator{min: min, max: max, cur: min.Dec()} } func (r *dayRangeIterator) Next() bool { r.cur = r.cur.Inc() - return r.cur.Before(r.max) + if !r.cur.Before(r.max) { + return false + } + + period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime()) + if err != nil { + r.err = errors.Wrapf(err, "getting schema for time (%s)", r.cur) + return false + } + r.curPeriod = period + + return true } -func (r *dayRangeIterator) At() config.DayTime { - return r.cur +func (r *dayRangeIterator) At() config.DayTable { + return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix) } func (r *dayRangeIterator) Err() error { diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index e05e3ae8818c4..ef41ec2d8efbb 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -66,12 +66,11 @@ Compaction works as follows, split across many functions for clarity: */ func (s *SimpleBloomController) compactTenant( ctx context.Context, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, tenant string, ownershipRange v1.FingerprintBounds, ) error { - logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.AddrWithPreffix(&period)) + logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr()) client, err := s.bloomStore.Client(table.ModelTime()) if err != nil { @@ -94,13 +93,13 @@ func (s *SimpleBloomController) compactTenant( } // build compaction plans - work, err := s.findOutdatedGaps(ctx, tenant, table, period, ownershipRange, metas, logger) + work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) if err != nil { return errors.Wrap(err, "failed to find outdated gaps") } // build new blocks - built, err := s.buildGaps(ctx, tenant, table, period, client, work, logger) + built, err := s.buildGaps(ctx, tenant, table, client, work, logger) if err != nil { return errors.Wrap(err, "failed to build gaps") } @@ -176,14 +175,13 @@ func (s *SimpleBloomController) compactTenant( func (s *SimpleBloomController) findOutdatedGaps( ctx context.Context, tenant string, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, ownershipRange v1.FingerprintBounds, metas []bloomshipper.Meta, logger log.Logger, ) ([]blockPlan, error) { // Resolve TSDBs - tsdbs, err := s.tsdbStore.ResolveTSDBs(ctx, table, period, tenant) + tsdbs, err := s.tsdbStore.ResolveTSDBs(ctx, table, tenant) if err != nil { level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err) return nil, errors.Wrap(err, "failed to resolve tsdbs") @@ -217,14 +215,13 @@ func (s *SimpleBloomController) findOutdatedGaps( func (s *SimpleBloomController) loadWorkForGap( ctx context.Context, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, tenant string, id tsdb.Identifier, gap gapWithBlocks, ) (v1.CloseableIterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) { // load a series iterator for the gap - seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, period, tenant, id, gap.bounds) + seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds) if err != nil { return nil, nil, errors.Wrap(err, "failed to load tsdb") } @@ -244,8 +241,7 @@ func (s *SimpleBloomController) loadWorkForGap( func (s *SimpleBloomController) buildGaps( ctx context.Context, tenant string, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, client bloomshipper.Client, work []blockPlan, logger log.Logger, @@ -280,7 +276,7 @@ func (s *SimpleBloomController) buildGaps( MetaRef: bloomshipper.MetaRef{ Ref: bloomshipper.Ref{ TenantID: tenant, - TableName: table.AddrWithPreffix(&period), + TableName: table.Addr(), Bounds: gap.bounds, }, }, @@ -289,7 +285,7 @@ func (s *SimpleBloomController) buildGaps( // Fetch blocks that aren't up to date but are in the desired fingerprint range // to try and accelerate bloom creation - seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, period, tenant, plan.tsdb, gap) + seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap) if err != nil { level.Error(logger).Log("msg", "failed to get series and blocks", "err", err) return nil, errors.Wrap(err, "failed to get series and blocks") @@ -322,7 +318,7 @@ func (s *SimpleBloomController) buildGaps( blockCt++ blk := newBlocks.At() - built, err := bloomshipper.BlockFrom(tenant, table.AddrWithPreffix(&period), blk) + built, err := bloomshipper.BlockFrom(tenant, table.Addr(), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) blocksIter.Close() @@ -350,7 +346,7 @@ func (s *SimpleBloomController) buildGaps( blocksIter.Close() // Write the new meta - ref, err := bloomshipper.MetaRefFrom(tenant, table.AddrWithPreffix(&period), gap.bounds, meta.Sources, meta.Blocks) + ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks) if err != nil { level.Error(logger).Log("msg", "failed to checksum meta", "err", err) return nil, errors.Wrap(err, "failed to checksum meta") diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index d4b7af4c5d5c3..6159ce02a804a 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -26,12 +26,11 @@ const ( ) type TSDBStore interface { - UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) - ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) + UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) + ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) LoadTSDB( ctx context.Context, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, @@ -50,13 +49,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore { } } -func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) { - _, users, err := b.storage.ListFiles(ctx, table.AddrWithPreffix(&period), true) // bypass cache for ease of testing +func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { + _, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing return users, err } -func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { - indices, err := b.storage.ListUserFiles(ctx, table.AddrWithPreffix(&period), tenant, true) // bypass cache for ease of testing +func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing if err != nil { return nil, errors.Wrap(err, "failed to list user files") } @@ -81,15 +80,14 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, func (b *BloomTSDBStore) LoadTSDB( ctx context.Context, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, ) (v1.CloseableIterator[*v1.Series], error) { withCompression := id.Name() + gzipExtension - data, err := b.storage.GetUserFile(ctx, table.AddrWithPreffix(&period), tenant, withCompression) + data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression) if err != nil { return nil, errors.Wrap(err, "failed to get file") } @@ -274,36 +272,35 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) { ) } -func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) { - store, err := s.storeForPeriod(table) +func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { + store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } - return store.UsersForPeriod(ctx, table, period) + return store.UsersForPeriod(ctx, table) } -func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { - store, err := s.storeForPeriod(table) +func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } - return store.ResolveTSDBs(ctx, table, period, tenant) + return store.ResolveTSDBs(ctx, table, tenant) } func (s *TSDBStores) LoadTSDB( ctx context.Context, - table config.DayTime, - period config.PeriodConfig, + table config.DayTable, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, ) (v1.CloseableIterator[*v1.Series], error) { - store, err := s.storeForPeriod(table) + store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } - return store.LoadTSDB(ctx, table, period, tenant, id, bounds) + return store.LoadTSDB(ctx, table, tenant, id, bounds) } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 9b5ce6e897bb9..281feba4b29a5 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -311,7 +311,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, } ref := bloomshipper.Ref{ TenantID: tenant, - TableName: config.NewDayTime(truncateDay(from)).Addr(), + TableName: config.NewDayTable(config.NewDayTime(truncateDay(from)), "").Addr(), Bounds: v1.NewBounds(fromFp, throughFp), StartTimestamp: from, EndTimestamp: through, diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index fcedec4c1ee4b..b916ef19d3fe4 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -200,10 +200,6 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa } } -func (cfg *PeriodConfig) GetFullTableName(t model.Time) string { - return NewDayTime(t).AddrWithPreffix(cfg) -} - func NewDayTime(d model.Time) DayTime { return DayTime{d} } @@ -237,19 +233,6 @@ func (d DayTime) String() string { return d.Time.Time().UTC().Format("2006-01-02") } -// Addr returns the unix day offset as a string, which is used -// as the address for the index table in storage. -func (d DayTime) Addr() string { - return fmt.Sprintf("%d", - d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) -} - -func (d DayTime) AddrWithPreffix(cfg *PeriodConfig) string { - return fmt.Sprintf("%s%d", - cfg.IndexTables.Prefix, - d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) -} - func (d DayTime) Inc() DayTime { return DayTime{d.Add(ObjectStorageIndexRequiredPeriod)} } @@ -274,6 +257,26 @@ func (d DayTime) Bounds() (model.Time, model.Time) { return d.Time, d.Inc().Time } +type DayTable struct { + DayTime + Prefix string +} + +func NewDayTable(d DayTime, prefix string) DayTable { + return DayTable{ + DayTime: d, + Prefix: prefix, + } +} + +// Addr returns the prefix (if any) and the unix day offset as a string, which is used +// as the address for the index table in storage. +func (d DayTable) Addr() string { + return fmt.Sprintf("%s%d", + d.Prefix, + d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` From 972cd39a488a824c4b177f00392a469deab967d1 Mon Sep 17 00:00:00 2001 From: Salva Corts <salva.corts@grafana.com> Date: Fri, 16 Feb 2024 16:41:26 +0100 Subject: [PATCH 3/4] fix --- pkg/bloomcompactor/bloomcompactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index bc4a7f005be14..3bb1c815e8295 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -342,7 +342,7 @@ type dayRangeIterator struct { } func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator { - return &dayRangeIterator{min: min, max: max, cur: min.Dec()} + return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg} } func (r *dayRangeIterator) Next() bool { From 2dda18c5178145eb65a34c851c155cc14dde3ea5 Mon Sep 17 00:00:00 2001 From: Salva Corts <salva.corts@grafana.com> Date: Fri, 16 Feb 2024 16:50:56 +0100 Subject: [PATCH 4/4] string method override --- pkg/storage/config/schema_config.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index b916ef19d3fe4..968ca87e609b7 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -262,6 +262,10 @@ type DayTable struct { Prefix string } +func (d DayTable) String() string { + return d.Addr() +} + func NewDayTable(d DayTime, prefix string) DayTable { return DayTable{ DayTime: d,