Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blooms/integration fixes #11979

Merged
merged 14 commits into from
Feb 19, 2024
14 changes: 7 additions & 7 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,10 @@ func (i *blockLoadingIter) loadNext() bool {
// check if there are more overlapping groups to load
if !i.overlapping.Next() {
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}
if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
}

if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
return false
}

Expand All @@ -300,7 +299,7 @@ func (i *blockLoadingIter) loadNext() bool {
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
for filtered.Next() && filtered.Err() == nil {
for filtered.Next() {
bq := loader.At()
if _, ok := i.loaded[bq]; !ok {
i.loaded[bq] = struct{}{}
Expand All @@ -309,8 +308,9 @@ func (i *blockLoadingIter) loadNext() bool {
iters = append(iters, iter)
}

if loader.Err() != nil {
i.err = loader.Err()
if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error

// runs a single round of compaction for all relevant tenants and tables
func (c *Compactor) runOne(ctx context.Context) error {
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var wg sync.WaitGroup
ch := make(chan tenantTable)
Expand All @@ -226,7 +227,11 @@ func (c *Compactor) runOne(ctx context.Context) error {
err := c.loadWork(ctx, ch)

wg.Wait()
return multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(workersErr, err, ctx.Err()).Err()
if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err)
}
return err
}

func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
Expand All @@ -241,6 +246,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
level.Debug(c.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}

Expand All @@ -250,6 +256,8 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()

level.Debug(c.logger).Log("msg", "loading work for table", "table", table)

tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
Expand All @@ -262,6 +270,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns)
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
Expand All @@ -280,12 +289,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
}

if err := tenants.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tenants", "err", err)
return errors.Wrap(err, "iterating tenants")
}

}

if err := tables.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tables", "err", err)
return errors.Wrap(err, "iterating tables")
}

Expand Down Expand Up @@ -330,7 +341,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
}

func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error {
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange)
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange.String())
return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange)
}

Expand Down
136 changes: 100 additions & 36 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *SimpleBloomController) compactTenant(
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr())
logger := log.With(s.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String())

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
Expand All @@ -92,6 +92,15 @@ func (s *SimpleBloomController) compactTenant(
return errors.Wrap(err, "failed to get metas")
}

level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas))

// fetch all metas overlapping all metas overlapping our ownership range so we can safely
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// fetch all metas overlapping all metas overlapping our ownership range so we can safely
// fetch all metas overlapping our ownership range so we can safely

// check which metas can be deleted even if they only partially overlap out ownership range
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// check which metas can be deleted even if they only partially overlap out ownership range
// check which metas can be deleted even if they only partially overlap our ownership range

superset, err := s.fetchSuperSet(ctx, tenant, table, ownershipRange, metas, logger)
if err != nil {
return errors.Wrap(err, "failed to fetch superset")
}

// build compaction plans
work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger)
if err != nil {
Expand All @@ -104,6 +113,51 @@ func (s *SimpleBloomController) compactTenant(
return errors.Wrap(err, "failed to build gaps")
}

// combine built and superset metas
// in preparation for removing outdated ones
combined := append(superset, built...)

outdated := outdatedMetas(combined)
level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
for _, meta := range outdated {
for _, block := range meta.Blocks {
err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
return errors.Wrap(err, "failed to delete block")
}
}
level.Debug(logger).Log("msg", "removed outdated block", "block", block.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have a metric to track how many blocks/metas are deleted

}

err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
return errors.Wrap(err, "failed to delete meta")
}
}
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String())
}

level.Debug(logger).Log("msg", "finished compaction")
return nil
}

// fetchSuperSet fetches all metas which overlap the ownership range of the first set of metas we've resolved
func (s *SimpleBloomController) fetchSuperSet(
ctx context.Context,
tenant string,
table config.DayTable,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]bloomshipper.Meta, error) {
// in order to delete outdates metas which only partially fall within the ownership range,
// we need to fetcha all metas in the entire bound range of the first set of metas we've resolved
/*
Expand All @@ -121,55 +175,49 @@ func (s *SimpleBloomController) compactTenant(
union := superset.Union(meta.Bounds)
if len(union) > 1 {
level.Error(logger).Log("msg", "meta bounds union is not a single range", "union", union)
return errors.New("meta bounds union is not a single range")
return nil, errors.New("meta bounds union is not a single range")
}
superset = union[0]
}

metas, err = s.bloomStore.FetchMetas(
within := superset.Within(ownershipRange)
level.Debug(logger).Log(
"msg", "looking for superset metas",
"superset", superset.String(),
"superset_within", within,
)

if within {
// we don't need to fetch any more metas
// NB(owen-d): here we copy metas into the output. This is slightly inefficient, but
// helps prevent mutability bugs by returning the same slice as the input.
results := make([]bloomshipper.Meta, len(metas))
copy(results, metas)
return results, nil
}

supersetMetas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: bloomshipper.NewInterval(table.Bounds()),
Keyspace: superset,
},
)

if err != nil {
level.Error(logger).Log("msg", "failed to get meta superset range", "err", err, "superset", superset)
return errors.Wrap(err, "failed to get meta supseret range")
return nil, errors.Wrap(err, "failed to get meta supseret range")
}

// combine built and pre-existing metas
// in preparation for removing outdated metas
metas = append(metas, built...)

outdated := outdatedMetas(metas)
for _, meta := range outdated {
for _, block := range meta.Blocks {
if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block)
continue
}

level.Error(logger).Log("msg", "failed to delete blocks", "err", err)
return errors.Wrap(err, "failed to delete blocks")
}
}

if err := client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef)
} else {
level.Error(logger).Log("msg", "failed to delete metas", "err", err)
return errors.Wrap(err, "failed to delete metas")
}
}
}

level.Debug(logger).Log("msg", "finished compaction")
return nil
level.Debug(logger).Log(
"msg", "found superset metas",
"metas", len(metas),
"fresh_metas", len(supersetMetas),
"delta", len(supersetMetas)-len(metas),
)

return supersetMetas, nil
}

func (s *SimpleBloomController) findOutdatedGaps(
Expand Down Expand Up @@ -271,6 +319,7 @@ func (s *SimpleBloomController) buildGaps(

for i := range plan.gaps {
gap := plan.gaps[i]
logger := log.With(logger, "gap", gap.bounds.String(), "tsdb", plan.tsdb.Name())

meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Expand Down Expand Up @@ -304,9 +353,11 @@ func (s *SimpleBloomController) buildGaps(
blocksIter,
s.rwFn,
s.metrics,
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
logger,
)

level.Debug(logger).Log("msg", "generating blocks", "overlapping_blocks", len(gap.blocks))

newBlocks := gen.Generate(ctx)
if err != nil {
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
Expand All @@ -333,6 +384,16 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()
return nil, errors.Wrap(err, "failed to write block")
}
s.metrics.blocksCreated.Inc()

totalGapKeyspace := (gap.bounds.Max - gap.bounds.Min)
progress := (built.Bounds.Max - gap.bounds.Min)
pct := float64(progress) / float64(totalGapKeyspace) * 100
level.Debug(logger).Log(
"msg", "uploaded block",
"block", built.BlockRef.String(),
"progress_pct", fmt.Sprintf("%.2f", pct),
)

meta.Blocks = append(meta.Blocks, built.BlockRef)
}
Expand All @@ -346,6 +407,7 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()

// Write the new meta
// TODO(owen-d): put total size in log, total time in metrics+log
ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks)
if err != nil {
level.Error(logger).Log("msg", "failed to checksum meta", "err", err)
Expand All @@ -357,8 +419,10 @@ func (s *SimpleBloomController) buildGaps(
level.Error(logger).Log("msg", "failed to write meta", "err", err)
return nil, errors.Wrap(err, "failed to write meta")
}
created = append(created, meta)
s.metrics.metasCreated.Inc()
level.Debug(logger).Log("msg", "uploaded meta", "meta", meta.MetaRef.String())

created = append(created, meta)
totalSeries += uint64(seriesItrWithCounter.Count())
}
}
Expand Down
Loading
Loading