From 4e5140d8ab0320cef93ea9ddb2fcff19956e2dc6 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 1 Jul 2024 13:16:31 +0200 Subject: [PATCH 1/4] Delete outdated metas during planning --- pkg/bloombuild/planner/metrics.go | 15 ++-- pkg/bloombuild/planner/planner.go | 45 ++++++----- pkg/bloombuild/planner/planner_test.go | 81 +++++++++++++++++++ pkg/bloombuild/planner/versioned_range.go | 8 +- .../planner/versioned_range_test.go | 44 +++++++--- 5 files changed, 154 insertions(+), 39 deletions(-) diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index 3f0fe684ab24b..77ae68687b35a 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -15,6 +15,9 @@ const ( statusSuccess = "success" statusFailure = "failure" + + phasePlanning = "planning" + phaseBuilding = "building" ) type Metrics struct { @@ -33,8 +36,8 @@ type Metrics struct { buildTime *prometheus.HistogramVec buildLastSuccess prometheus.Gauge - blocksDeleted prometheus.Counter - metasDeleted prometheus.Counter + blocksDeleted *prometheus.CounterVec + metasDeleted *prometheus.CounterVec tenantsDiscovered prometheus.Counter tenantTasksPlanned *prometheus.GaugeVec @@ -127,18 +130,18 @@ func NewMetrics( Help: "Unix timestamp of the last successful build cycle.", }), - blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + blocksDeleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "blocks_deleted_total", Help: "Number of blocks deleted", - }), - metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + }, []string{"phase"}), + metasDeleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "metas_deleted_total", Help: "Number of metas deleted", - }), + }, []string{"phase"}), tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index ccbd462aaabe0..45727f4daa565 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -338,6 +338,13 @@ func (p *Planner) computeTasks( return nil, nil, fmt.Errorf("failed to get metas: %w", err) } + // In case the planner restarted before deleting outdated metas in the previous iteration, + // we delete them during the planning phase to avoid reprocessing them. + metas, err = p.deleteOutdatedMetas(ctx, table, tenant, metas, phasePlanning) + if err != nil { + return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err) + } + for _, ownershipRange := range ownershipRanges { logger := log.With(logger, "ownership", ownershipRange.String()) @@ -423,32 +430,34 @@ func (p *Planner) processTenantTaskResults( } combined := append(originalMetas, newMetas...) - outdated := outdatedMetas(combined) - if len(outdated) == 0 { - level.Debug(logger).Log("msg", "no outdated metas found") - return tasksSucceed, nil - } - - level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) - if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil { + if _, err := p.deleteOutdatedMetas(ctx, table, tenant, combined, phaseBuilding); err != nil { return 0, fmt.Errorf("failed to delete outdated metas: %w", err) } return tasksSucceed, nil } -func (p *Planner) deleteOutdatedMetasAndBlocks( +func (p *Planner) deleteOutdatedMetas( ctx context.Context, table config.DayTable, tenant string, metas []bloomshipper.Meta, -) error { - logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant) + phase string, +) ([]bloomshipper.Meta, error) { + logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "phase", phase) + + upToDate, outdated := outdatedMetas(metas) + if len(outdated) == 0 { + level.Debug(logger).Log("msg", "no outdated metas found") + return upToDate, nil + } + + level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) client, err := p.bloomStore.Client(table.ModelTime()) if err != nil { level.Error(logger).Log("msg", "failed to get client", "err", err) - return errors.Wrap(err, "failed to get client") + return nil, errors.Wrap(err, "failed to get client") } var ( @@ -456,18 +465,18 @@ func (p *Planner) deleteOutdatedMetasAndBlocks( deletedBlocks int ) defer func() { - p.metrics.metasDeleted.Add(float64(deletedMetas)) - p.metrics.blocksDeleted.Add(float64(deletedBlocks)) + p.metrics.metasDeleted.WithLabelValues(phase).Add(float64(deletedMetas)) + p.metrics.blocksDeleted.WithLabelValues(phase).Add(float64(deletedBlocks)) }() - for _, meta := range metas { + for _, meta := range outdated { for _, block := range meta.Blocks { if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil { if client.IsObjectNotFoundErr(err) { level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) } else { level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String()) - return errors.Wrap(err, "failed to delete block") + return nil, errors.Wrap(err, "failed to delete block") } } @@ -481,7 +490,7 @@ func (p *Planner) deleteOutdatedMetasAndBlocks( level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String()) } else { level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String()) - return errors.Wrap(err, "failed to delete meta") + return nil, errors.Wrap(err, "failed to delete meta") } } deletedMetas++ @@ -494,7 +503,7 @@ func (p *Planner) deleteOutdatedMetasAndBlocks( "blocks", deletedBlocks, ) - return nil + return upToDate, nil } func (p *Planner) tables(ts time.Time) *dayRangeIterator { diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index a20ce4e9a98a3..4b39c11bcc550 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -403,6 +403,7 @@ func createPlanner( HardLimit: flagext.Bytes(20 << 20), TTL: time.Hour, }, + CacheListOps: false, }, FSConfig: local.FSConfig{ Directory: t.TempDir(), @@ -817,6 +818,86 @@ func Test_processTenantTaskResults(t *testing.T) { } } +func Test_deleteOutdatedMetas(t *testing.T) { + for _, tc := range []struct { + name string + originalMetas []bloomshipper.Meta + expectedUpToDateMetas []bloomshipper.Meta + }{ + { + name: "no metas", + }, + { + name: "only up to date metas", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + expectedUpToDateMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + }, + }, + { + name: "outdated metas", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), + genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}), + genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + }, + expectedUpToDateMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + cfg := Config{ + PlanningInterval: 1 * time.Hour, + MaxQueuedTasksPerTenant: 10000, + } + planner := createPlanner(t, cfg, &fakeLimits{}, logger) + + bloomClient, err := planner.bloomStore.Client(testDay.ModelTime()) + require.NoError(t, err) + + // Create original metas and blocks + err = putMetas(bloomClient, tc.originalMetas) + require.NoError(t, err) + + // Get all metas + metas, err := planner.bloomStore.FetchMetas( + context.Background(), + bloomshipper.MetaSearchParams{ + TenantID: "fakeTenant", + Interval: bloomshipper.NewInterval(testTable.Bounds()), + Keyspace: v1.NewBounds(0, math.MaxUint64), + }, + ) + require.NoError(t, err) + require.ElementsMatch(t, tc.originalMetas, metas) + + upToDate, err := planner.deleteOutdatedMetas(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning) + require.NoError(t, err) + require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate) + + // Get all metas + metas, err = planner.bloomStore.FetchMetas( + context.Background(), + bloomshipper.MetaSearchParams{ + TenantID: "fakeTenant", + Interval: bloomshipper.NewInterval(testTable.Bounds()), + Keyspace: v1.NewBounds(0, math.MaxUint64), + }, + ) + require.NoError(t, err) + require.ElementsMatch(t, tc.expectedUpToDateMetas, metas) + }) + } +} + type fakeBuilder struct { mx sync.Mutex // Protects tasks and currTaskIdx. id string diff --git a/pkg/bloombuild/planner/versioned_range.go b/pkg/bloombuild/planner/versioned_range.go index 578b5d7ef83a6..1bde63ba55fff 100644 --- a/pkg/bloombuild/planner/versioned_range.go +++ b/pkg/bloombuild/planner/versioned_range.go @@ -210,8 +210,9 @@ func (t tsdbTokenRange) reassemble(from int) tsdbTokenRange { return t[:len(t)-(reassembleTo-from)] } -func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta { +func outdatedMetas(metas []bloomshipper.Meta) ([]bloomshipper.Meta, []bloomshipper.Meta) { var outdated []bloomshipper.Meta + var upToDate []bloomshipper.Meta // Sort metas descending by most recent source when checking // for outdated metas (older metas are discarded if they don't change the range). @@ -254,8 +255,11 @@ func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta { tokenRange, added = tokenRange.Add(version, meta.Bounds) if !added { outdated = append(outdated, meta) + continue } + + upToDate = append(upToDate, meta) } - return outdated + return upToDate, outdated } diff --git a/pkg/bloombuild/planner/versioned_range_test.go b/pkg/bloombuild/planner/versioned_range_test.go index e58f143842f1c..3eb2df160c36b 100644 --- a/pkg/bloombuild/planner/versioned_range_test.go +++ b/pkg/bloombuild/planner/versioned_range_test.go @@ -252,21 +252,25 @@ func Test_OutdatedMetas(t *testing.T) { } for _, tc := range []struct { - desc string - metas []bloomshipper.Meta - exp []bloomshipper.Meta + desc string + metas []bloomshipper.Meta + expOutdated []bloomshipper.Meta + expUpToDate []bloomshipper.Meta }{ { - desc: "no metas", - metas: nil, - exp: nil, + desc: "no metas", + metas: nil, + expOutdated: nil, }, { desc: "single meta", metas: []bloomshipper.Meta{ gen(v1.NewBounds(0, 10), 0), }, - exp: nil, + expOutdated: nil, + expUpToDate: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 0), + }, }, { desc: "single outdated meta", @@ -274,9 +278,12 @@ func Test_OutdatedMetas(t *testing.T) { gen(v1.NewBounds(0, 10), 0), gen(v1.NewBounds(0, 10), 1), }, - exp: []bloomshipper.Meta{ + expOutdated: []bloomshipper.Meta{ gen(v1.NewBounds(0, 10), 0), }, + expUpToDate: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 1), + }, }, { desc: "single outdated via partitions", @@ -285,10 +292,13 @@ func Test_OutdatedMetas(t *testing.T) { gen(v1.NewBounds(6, 10), 0), gen(v1.NewBounds(0, 10), 1), }, - exp: []bloomshipper.Meta{ + expOutdated: []bloomshipper.Meta{ gen(v1.NewBounds(6, 10), 0), gen(v1.NewBounds(0, 5), 0), }, + expUpToDate: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 1), + }, }, { desc: "same tsdb versions", @@ -297,10 +307,13 @@ func Test_OutdatedMetas(t *testing.T) { gen(v1.NewBounds(6, 10), 0), gen(v1.NewBounds(0, 10), 1), }, - exp: []bloomshipper.Meta{ + expOutdated: []bloomshipper.Meta{ gen(v1.NewBounds(6, 10), 0), gen(v1.NewBounds(0, 5), 0), }, + expUpToDate: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 1), + }, }, { desc: "multi version ordering", @@ -309,14 +322,19 @@ func Test_OutdatedMetas(t *testing.T) { gen(v1.NewBounds(0, 10), 1), // only part of the range is outdated, must keep gen(v1.NewBounds(8, 10), 2), }, - exp: []bloomshipper.Meta{ + expOutdated: []bloomshipper.Meta{ gen(v1.NewBounds(0, 5), 0), }, + expUpToDate: []bloomshipper.Meta{ + gen(v1.NewBounds(0, 10), 1), + gen(v1.NewBounds(8, 10), 2), + }, }, } { t.Run(tc.desc, func(t *testing.T) { - outdated := outdatedMetas(tc.metas) - require.Equal(t, tc.exp, outdated) + upToDate, outdated := outdatedMetas(tc.metas) + require.ElementsMatch(t, tc.expOutdated, outdated) + require.ElementsMatch(t, tc.expUpToDate, upToDate) }) } } From cda3f7f51b6645fa1899b558683cc4b80cbc30e8 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 1 Jul 2024 13:28:07 +0200 Subject: [PATCH 2/4] fix test --- pkg/bloombuild/planner/planner_test.go | 32 +++++++++++++++----------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 4b39c11bcc550..5c7454bd7e6f7 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -797,19 +797,7 @@ func Test_processTenantTaskResults(t *testing.T) { }, ) require.NoError(t, err) - - // TODO(salvacorts): Fix this - // For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB. - // As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the - // comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the - // fetched metas - for i := range metas { - for j := range metas[i].Sources { - sec := metas[i].Sources[j].TS.Unix() - nsec := metas[i].Sources[j].TS.Nanosecond() - metas[i].Sources[j].TS = time.Unix(sec, int64(nsec)) - } - } + removeLocFromMetasSources(metas) // Compare metas require.Equal(t, len(tc.expectedMetas), len(metas)) @@ -818,6 +806,22 @@ func Test_processTenantTaskResults(t *testing.T) { } } +// For some reason, when the tests are run in the CI, we do not encode the `loc` of model.Time for each TSDB. +// As a result, when we fetch them, the loc is empty whereas in the original metas, it is not. Therefore the +// comparison fails. As a workaround to fix the issue, we will manually reset the TS of the sources to the +// fetched metas +func removeLocFromMetasSources(metas []bloomshipper.Meta) []bloomshipper.Meta { + for i := range metas { + for j := range metas[i].Sources { + sec := metas[i].Sources[j].TS.Unix() + nsec := metas[i].Sources[j].TS.Nanosecond() + metas[i].Sources[j].TS = time.Unix(sec, int64(nsec)) + } + } + + return metas +} + func Test_deleteOutdatedMetas(t *testing.T) { for _, tc := range []struct { name string @@ -877,6 +881,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { }, ) require.NoError(t, err) + removeLocFromMetasSources(metas) require.ElementsMatch(t, tc.originalMetas, metas) upToDate, err := planner.deleteOutdatedMetas(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning) @@ -893,6 +898,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { }, ) require.NoError(t, err) + removeLocFromMetasSources(metas) require.ElementsMatch(t, tc.expectedUpToDateMetas, metas) }) } From 2816a3029287393437bfdd25331d63f632c045d3 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 1 Jul 2024 16:07:23 +0200 Subject: [PATCH 3/4] fix test --- pkg/bloombuild/planner/planner.go | 14 +++++++++----- pkg/bloombuild/planner/versioned_range.go | 6 ++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 45727f4daa565..429f1b7d7112f 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -357,9 +357,6 @@ func (p *Planner) computeTasks( level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err) continue } - if len(gaps) == 0 { - continue - } for _, gap := range gaps { tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) @@ -448,11 +445,18 @@ func (p *Planner) deleteOutdatedMetas( upToDate, outdated := outdatedMetas(metas) if len(outdated) == 0 { - level.Debug(logger).Log("msg", "no outdated metas found") + level.Debug(logger).Log( + "msg", "no outdated metas found", + "upToDate", len(upToDate), + ) return upToDate, nil } - level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) + level.Debug(logger).Log( + "msg", "found outdated metas", + "outdated", len(outdated), + "upToDate", len(upToDate), + ) client, err := p.bloomStore.Client(table.ModelTime()) if err != nil { diff --git a/pkg/bloombuild/planner/versioned_range.go b/pkg/bloombuild/planner/versioned_range.go index 1bde63ba55fff..eefddffa4abeb 100644 --- a/pkg/bloombuild/planner/versioned_range.go +++ b/pkg/bloombuild/planner/versioned_range.go @@ -261,5 +261,11 @@ func outdatedMetas(metas []bloomshipper.Meta) ([]bloomshipper.Meta, []bloomshipp upToDate = append(upToDate, meta) } + // We previously sorted the input metas by their TSDB source TS, therefore, they may not be sorted by FP anymore. + // We need to re-sort them by their FP to match the original order. + sort.Slice(upToDate, func(i, j int) bool { + return upToDate[i].Bounds.Less(upToDate[j].Bounds) + }) + return upToDate, outdated } From 2d30a324f59ec28dee6a4667a6988fdf71b3d785 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 3 Jul 2024 11:39:06 +0200 Subject: [PATCH 4/4] CR feedback --- pkg/bloombuild/planner/planner.go | 8 +++++--- pkg/bloombuild/planner/planner_test.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 429f1b7d7112f..5526c612f4912 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -340,7 +340,7 @@ func (p *Planner) computeTasks( // In case the planner restarted before deleting outdated metas in the previous iteration, // we delete them during the planning phase to avoid reprocessing them. - metas, err = p.deleteOutdatedMetas(ctx, table, tenant, metas, phasePlanning) + metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, metas, phasePlanning) if err != nil { return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err) } @@ -427,14 +427,16 @@ func (p *Planner) processTenantTaskResults( } combined := append(originalMetas, newMetas...) - if _, err := p.deleteOutdatedMetas(ctx, table, tenant, combined, phaseBuilding); err != nil { + if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, combined, phaseBuilding); err != nil { return 0, fmt.Errorf("failed to delete outdated metas: %w", err) } return tasksSucceed, nil } -func (p *Planner) deleteOutdatedMetas( +// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store. +// It returns the up-to-date metas from the `metas` argument. +func (p *Planner) deleteOutdatedMetasAndBlocks( ctx context.Context, table config.DayTable, tenant string, diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 5c7454bd7e6f7..0e119cc1af229 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -884,7 +884,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { removeLocFromMetasSources(metas) require.ElementsMatch(t, tc.originalMetas, metas) - upToDate, err := planner.deleteOutdatedMetas(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning) + upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning) require.NoError(t, err) require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate)