Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Delete outdated metas during planning #13363

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

statusSuccess = "success"
statusFailure = "failure"

phasePlanning = "planning"
phaseBuilding = "building"
)

type Metrics struct {
Expand All @@ -33,8 +36,8 @@ type Metrics struct {
buildTime *prometheus.HistogramVec
buildLastSuccess prometheus.Gauge

blocksDeleted prometheus.Counter
metasDeleted prometheus.Counter
blocksDeleted *prometheus.CounterVec
metasDeleted *prometheus.CounterVec

tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
Expand Down Expand Up @@ -127,18 +130,18 @@ func NewMetrics(
Help: "Unix timestamp of the last successful build cycle.",
}),

blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
blocksDeleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "blocks_deleted_total",
Help: "Number of blocks deleted",
}),
metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
}, []string{"phase"}),
metasDeleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "metas_deleted_total",
Help: "Number of metas deleted",
}),
}, []string{"phase"}),

tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down
55 changes: 34 additions & 21 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to get metas: %w", err)
}

// In case the planner restarted before deleting outdated metas in the previous iteration,
// we delete them during the planning phase to avoid reprocessing them.
metas, err = p.deleteOutdatedMetas(ctx, table, tenant, metas, phasePlanning)
if err != nil {
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())

Expand All @@ -350,9 +357,6 @@ func (p *Planner) computeTasks(
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}
if len(gaps) == 0 {
continue
}

for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
Expand Down Expand Up @@ -423,51 +427,60 @@ func (p *Planner) processTenantTaskResults(
}

combined := append(originalMetas, newMetas...)
outdated := outdatedMetas(combined)
if len(outdated) == 0 {
level.Debug(logger).Log("msg", "no outdated metas found")
return tasksSucceed, nil
}

level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil {
if _, err := p.deleteOutdatedMetas(ctx, table, tenant, combined, phaseBuilding); err != nil {
return 0, fmt.Errorf("failed to delete outdated metas: %w", err)
}

return tasksSucceed, nil
}

func (p *Planner) deleteOutdatedMetasAndBlocks(
func (p *Planner) deleteOutdatedMetas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The function name is slightly misleading. Maybe add a comment what the function does and what it returns.

ctx context.Context,
table config.DayTable,
tenant string,
metas []bloomshipper.Meta,
) error {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
phase string,
) ([]bloomshipper.Meta, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "phase", phase)

upToDate, outdated := outdatedMetas(metas)
if len(outdated) == 0 {
level.Debug(logger).Log(
"msg", "no outdated metas found",
"upToDate", len(upToDate),
)
return upToDate, nil
}

level.Debug(logger).Log(
"msg", "found outdated metas",
"outdated", len(outdated),
"upToDate", len(upToDate),
)

client, err := p.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
return errors.Wrap(err, "failed to get client")
return nil, errors.Wrap(err, "failed to get client")
}

var (
deletedMetas int
deletedBlocks int
)
defer func() {
p.metrics.metasDeleted.Add(float64(deletedMetas))
p.metrics.blocksDeleted.Add(float64(deletedBlocks))
p.metrics.metasDeleted.WithLabelValues(phase).Add(float64(deletedMetas))
p.metrics.blocksDeleted.WithLabelValues(phase).Add(float64(deletedBlocks))
}()

for _, meta := range metas {
for _, meta := range outdated {
for _, block := range meta.Blocks {
if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
return errors.Wrap(err, "failed to delete block")
return nil, errors.Wrap(err, "failed to delete block")
}
}

Expand All @@ -481,7 +494,7 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
return errors.Wrap(err, "failed to delete meta")
return nil, errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
Expand All @@ -494,7 +507,7 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
"blocks", deletedBlocks,
)

return nil
return upToDate, nil
}

func (p *Planner) tables(ts time.Time) *dayRangeIterator {
Expand Down
113 changes: 100 additions & 13 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func createPlanner(
HardLimit: flagext.Bytes(20 << 20),
TTL: time.Hour,
},
CacheListOps: false,
},
FSConfig: local.FSConfig{
Directory: t.TempDir(),
Expand Down Expand Up @@ -796,19 +797,7 @@ func Test_processTenantTaskResults(t *testing.T) {
},
)
require.NoError(t, err)

// TODO(salvacorts): Fix this
// For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB.
// As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the
// comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the
// fetched metas
for i := range metas {
for j := range metas[i].Sources {
sec := metas[i].Sources[j].TS.Unix()
nsec := metas[i].Sources[j].TS.Nanosecond()
metas[i].Sources[j].TS = time.Unix(sec, int64(nsec))
}
}
removeLocFromMetasSources(metas)

// Compare metas
require.Equal(t, len(tc.expectedMetas), len(metas))
Expand All @@ -817,6 +806,104 @@ func Test_processTenantTaskResults(t *testing.T) {
}
}

// For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB.
// As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the
// comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the
// fetched metas
func removeLocFromMetasSources(metas []bloomshipper.Meta) []bloomshipper.Meta {
for i := range metas {
for j := range metas[i].Sources {
sec := metas[i].Sources[j].TS.Unix()
nsec := metas[i].Sources[j].TS.Nanosecond()
metas[i].Sources[j].TS = time.Unix(sec, int64(nsec))
}
}

return metas
}

func Test_deleteOutdatedMetas(t *testing.T) {
for _, tc := range []struct {
name string
originalMetas []bloomshipper.Meta
expectedUpToDateMetas []bloomshipper.Meta
}{
{
name: "no metas",
},
{
name: "only up to date metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
},
{
name: "outdated metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}),
genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}),
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

bloomClient, err := planner.bloomStore.Client(testDay.ModelTime())
require.NoError(t, err)

// Create original metas and blocks
err = putMetas(bloomClient, tc.originalMetas)
require.NoError(t, err)

// Get all metas
metas, err := planner.bloomStore.FetchMetas(
context.Background(),
bloomshipper.MetaSearchParams{
TenantID: "fakeTenant",
Interval: bloomshipper.NewInterval(testTable.Bounds()),
Keyspace: v1.NewBounds(0, math.MaxUint64),
},
)
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.originalMetas, metas)

upToDate, err := planner.deleteOutdatedMetas(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning)
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate)

// Get all metas
metas, err = planner.bloomStore.FetchMetas(
context.Background(),
bloomshipper.MetaSearchParams{
TenantID: "fakeTenant",
Interval: bloomshipper.NewInterval(testTable.Bounds()),
Keyspace: v1.NewBounds(0, math.MaxUint64),
},
)
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.expectedUpToDateMetas, metas)
})
}
}

type fakeBuilder struct {
mx sync.Mutex // Protects tasks and currTaskIdx.
id string
Expand Down
14 changes: 12 additions & 2 deletions pkg/bloombuild/planner/versioned_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ func (t tsdbTokenRange) reassemble(from int) tsdbTokenRange {
return t[:len(t)-(reassembleTo-from)]
}

func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta {
func outdatedMetas(metas []bloomshipper.Meta) ([]bloomshipper.Meta, []bloomshipper.Meta) {
var outdated []bloomshipper.Meta
var upToDate []bloomshipper.Meta

// Sort metas descending by most recent source when checking
// for outdated metas (older metas are discarded if they don't change the range).
Expand Down Expand Up @@ -254,8 +255,17 @@ func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta {
tokenRange, added = tokenRange.Add(version, meta.Bounds)
if !added {
outdated = append(outdated, meta)
continue
}

upToDate = append(upToDate, meta)
}

return outdated
// We previously sorted the input metas by their TSDB source TS, therefore, they may not be sorted by FP anymore.
// We need to re-sort them by their FP to match the original order.
sort.Slice(upToDate, func(i, j int) bool {
return upToDate[i].Bounds.Less(upToDate[j].Bounds)
})

return upToDate, outdated
}
Loading
Loading