diff --git a/pkg/bloomcompactor/v2_meta.go b/pkg/bloomcompactor/v2_meta.go index 1be785c0934ac..adffb61dff5ed 100644 --- a/pkg/bloomcompactor/v2_meta.go +++ b/pkg/bloomcompactor/v2_meta.go @@ -62,7 +62,7 @@ type Meta struct { // is greater than or equal to the range of the actual data in the underlying blocks. OwnershipRange v1.FingerprintBounds - // Old blocks which can be deleted in the future. These should be from pervious compaction rounds. + // Old blocks which can be deleted in the future. These should be from previous compaction rounds. Tombstones []BlockRef // The specific TSDB files used to generate the block. @@ -119,17 +119,18 @@ func (m Meta) Checksum() (uint32, error) { } type TSDBStore interface { - ResolveTSDBs() ([]*tsdb.TSDBFile, error) + ResolveTSDBs() ([]*tsdb.SingleTenantTSDBIdentifier, error) + LoadTSDB(id tsdb.Identifier, bounds v1.FingerprintBounds) (v1.CloseableIterator[*v1.Series], error) } type MetaStore interface { + ResolveMetas(bounds v1.FingerprintBounds) ([]MetaRef, error) GetMetas([]MetaRef) ([]Meta, error) PutMeta(Meta) error - ResolveMetas(bounds v1.FingerprintBounds) ([]MetaRef, error) } type BlockStore interface { // TODO(owen-d): flesh out|integrate against bloomshipper.Client - GetBlocks([]BlockRef) ([]interface{}, error) + GetBlocks([]BlockRef) ([]*v1.Block, error) PutBlock(interface{}) error } diff --git a/pkg/bloomcompactor/v2controller.go b/pkg/bloomcompactor/v2controller.go index 3fbcd04cd93db..31f73740c1ff7 100644 --- a/pkg/bloomcompactor/v2controller.go +++ b/pkg/bloomcompactor/v2controller.go @@ -3,6 +3,7 @@ package bloomcompactor import ( "context" "fmt" + "sort" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -17,6 +18,9 @@ type SimpleBloomController struct { tsdbStore TSDBStore metaStore MetaStore blockStore BlockStore + chunkLoader ChunkLoader + rwFn func() (v1.BlockWriter, v1.BlockReader) + metrics *Metrics // TODO(owen-d): add metrics logger log.Logger @@ -27,6 +31,9 @@ func NewSimpleBloomController( tsdbStore TSDBStore, metaStore MetaStore, blockStore BlockStore, + chunkLoader ChunkLoader, + rwFn func() (v1.BlockWriter, v1.BlockReader), + metrics *Metrics, logger log.Logger, ) *SimpleBloomController { return &SimpleBloomController{ @@ -34,11 +41,14 @@ func NewSimpleBloomController( tsdbStore: tsdbStore, metaStore: metaStore, blockStore: blockStore, + chunkLoader: chunkLoader, + rwFn: rwFn, + metrics: metrics, logger: log.With(logger, "ownership", ownershipRange), } } -func (s *SimpleBloomController) do(_ context.Context) error { +func (s *SimpleBloomController) do(ctx context.Context) error { // 1. Resolve TSDBs tsdbs, err := s.tsdbStore.ResolveTSDBs() if err != nil { @@ -61,26 +71,30 @@ func (s *SimpleBloomController) do(_ context.Context) error { } ids := make([]tsdb.Identifier, 0, len(tsdbs)) - for _, idx := range tsdbs { - ids = append(ids, idx.Identifier) + for _, id := range tsdbs { + ids = append(ids, id) } // 4. Determine which TSDBs have gaps in the ownership range and need to // be processed. - work, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas) + tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas) if err != nil { level.Error(s.logger).Log("msg", "failed to find gaps", "err", err) return errors.Wrap(err, "failed to find gaps") } - if len(work) == 0 { + if len(tsdbsWithGaps) == 0 { level.Debug(s.logger).Log("msg", "blooms exist for all tsdbs") return nil } - // TODO(owen-d): finish - panic("not implemented") + work, err := blockPlansForGaps(tsdbsWithGaps, metas) + if err != nil { + level.Error(s.logger).Log("msg", "failed to create plan", "err", err) + return errors.Wrap(err, "failed to create plan") + } + // 5. Generate Blooms // Now that we have the gaps, we will generate a bloom block for each gap. // We can accelerate this by using existing blocks which may already contain // needed chunks in their blooms, for instance after a new TSDB version is generated @@ -89,8 +103,171 @@ func (s *SimpleBloomController) do(_ context.Context) error { // overlapping the ownership ranges we've identified as needing updates. // With these in hand, we can download the old blocks and use them to // accelerate bloom generation for the new blocks. + + var ( + blockCt int + tsdbCt = len(work) + ) + + for _, plan := range work { + + for _, gap := range plan.gaps { + // Fetch blocks that aren't up to date but are in the desired fingerprint range + // to try and accelerate bloom creation + seriesItr, preExistingBlocks, err := s.loadWorkForGap(plan.tsdb, gap) + if err != nil { + level.Error(s.logger).Log("msg", "failed to get series and blocks", "err", err) + return errors.Wrap(err, "failed to get series and blocks") + } + + gen := NewSimpleBloomGenerator( + v1.DefaultBlockOptions, + seriesItr, + s.chunkLoader, + preExistingBlocks, + s.rwFn, + s.metrics, + log.With(s.logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)), + ) + + _, newBlocks, err := gen.Generate(ctx) + if err != nil { + // TODO(owen-d): metrics + level.Error(s.logger).Log("msg", "failed to generate bloom", "err", err) + return errors.Wrap(err, "failed to generate bloom") + } + + // TODO(owen-d): dispatch this to a queue for writing, handling retries/backpressure, etc? + for newBlocks.Next() { + blockCt++ + blk := newBlocks.At() + if err := s.blockStore.PutBlock(blk); err != nil { + level.Error(s.logger).Log("msg", "failed to write block", "err", err) + return errors.Wrap(err, "failed to write block") + } + } + + if err := newBlocks.Err(); err != nil { + // TODO(owen-d): metrics + level.Error(s.logger).Log("msg", "failed to generate bloom", "err", err) + return errors.Wrap(err, "failed to generate bloom") + } + + } + } + + level.Debug(s.logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt) + return nil + +} + +func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*v1.Block, error) { + // load a series iterator for the gap + seriesItr, err := s.tsdbStore.LoadTSDB(id, gap.bounds) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load tsdb") + } + + blocks, err := s.blockStore.GetBlocks(gap.blocks) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to get blocks") + } + + return seriesItr, blocks, nil +} + +type gapWithBlocks struct { + bounds v1.FingerprintBounds + blocks []BlockRef +} + +// blockPlan is a plan for all the work needed to build a meta.json +// It includes: +// - the tsdb (source of truth) which contains all the series+chunks +// we need to ensure are indexed in bloom blocks +// - a list of gaps that are out of date and need to be checked+built +// - within each gap, a list of block refs which overlap the gap are included +// so we can use them to accelerate bloom generation. They likely contain many +// of the same chunks we need to ensure are indexed, just from previous tsdb iterations. +// This is a performance optimization to avoid expensive re-reindexing +type blockPlan struct { + tsdb tsdb.Identifier + gaps []gapWithBlocks +} + +// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks. +// This allows us to expedite bloom generation by using existing blocks to fill in the gaps +// since many will contain the same chunks. +func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) { + plans := make([]blockPlan, 0, len(tsdbs)) + + for _, idx := range tsdbs { + plan := blockPlan{ + tsdb: idx.tsdb, + gaps: make([]gapWithBlocks, 0, len(idx.gaps)), + } + + for _, gap := range idx.gaps { + planGap := gapWithBlocks{ + bounds: gap, + } + + for _, meta := range metas { + + if meta.OwnershipRange.Intersection(gap) == nil { + // this meta doesn't overlap the gap, skip + continue + } + + for _, block := range meta.Blocks { + if block.OwnershipRange.Intersection(gap) == nil { + // this block doesn't overlap the gap, skip + continue + } + // this block overlaps the gap, add it to the plan + // for this gap + planGap.blocks = append(planGap.blocks, block) + } + } + + // ensure we sort blocks so deduping iterator works as expected + sort.Slice(planGap.blocks, func(i, j int) bool { + return planGap.blocks[i].OwnershipRange.Less(planGap.blocks[j].OwnershipRange) + }) + + peekingBlocks := v1.NewPeekingIter[BlockRef]( + v1.NewSliceIter[BlockRef]( + planGap.blocks, + ), + ) + // dedupe blocks which could be in multiple metas + itr := v1.NewDedupingIter[BlockRef, BlockRef]( + func(a, b BlockRef) bool { + return a == b + }, + v1.Identity[BlockRef], + func(a, _ BlockRef) BlockRef { + return a + }, + peekingBlocks, + ) + + deduped, err := v1.Collect[BlockRef](itr) + if err != nil { + return nil, errors.Wrap(err, "failed to dedupe blocks") + } + planGap.blocks = deduped + + plan.gaps = append(plan.gaps, planGap) + } + + plans = append(plans, plan) + } + + return plans, nil } +// Used to signal the gaps that need to be populated for a tsdb type tsdbGaps struct { tsdb tsdb.Identifier gaps []v1.FingerprintBounds diff --git a/pkg/bloomcompactor/v2controller_test.go b/pkg/bloomcompactor/v2controller_test.go index 0a99f26d3ce1f..9f3f56153af32 100644 --- a/pkg/bloomcompactor/v2controller_test.go +++ b/pkg/bloomcompactor/v2controller_test.go @@ -113,22 +113,24 @@ func Test_findGaps(t *testing.T) { } } -func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { - id := func(n int) tsdb.SingleTenantTSDBIdentifier { - return tsdb.SingleTenantTSDBIdentifier{ - TS: time.Unix(int64(n), 0), - } +func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { + return tsdb.SingleTenantTSDBIdentifier{ + TS: time.Unix(int64(n), 0), } +} - meta := func(min, max model.Fingerprint, sources ...int) Meta { - m := Meta{ - OwnershipRange: v1.NewBounds(min, max), - } - for _, source := range sources { - m.Sources = append(m.Sources, id(source)) - } - return m +func genMeta(min, max model.Fingerprint, sources []int, blocks []BlockRef) Meta { + m := Meta{ + OwnershipRange: v1.NewBounds(min, max), + Blocks: blocks, + } + for _, source := range sources { + m.Sources = append(m.Sources, tsdbID(source)) } + return m +} + +func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { for _, tc := range []struct { desc string @@ -142,21 +144,21 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { desc: "non-overlapping tsdbs and metas", err: true, ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.Identifier{id(0)}, + tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - meta(11, 20, 0), + genMeta(11, 20, []int{0}, nil), }, }, { desc: "single tsdb", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.Identifier{id(0)}, + tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - meta(4, 8, 0), + genMeta(4, 8, []int{0}, nil), }, exp: []tsdbGaps{ { - tsdb: id(0), + tsdb: tsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(0, 3), v1.NewBounds(9, 10), @@ -167,20 +169,20 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { { desc: "multiple tsdbs with separate blocks", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.Identifier{id(0), id(1)}, + tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, metas: []Meta{ - meta(0, 5, 0), - meta(6, 10, 1), + genMeta(0, 5, []int{0}, nil), + genMeta(6, 10, []int{1}, nil), }, exp: []tsdbGaps{ { - tsdb: id(0), + tsdb: tsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(6, 10), }, }, { - tsdb: id(1), + tsdb: tsdbID(1), gaps: []v1.FingerprintBounds{ v1.NewBounds(0, 5), }, @@ -190,20 +192,20 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { { desc: "multiple tsdbs with the same blocks", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.Identifier{id(0), id(1)}, + tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, metas: []Meta{ - meta(0, 5, 0, 1), - meta(6, 8, 1), + genMeta(0, 5, []int{0, 1}, nil), + genMeta(6, 8, []int{1}, nil), }, exp: []tsdbGaps{ { - tsdb: id(0), + tsdb: tsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(6, 10), }, }, { - tsdb: id(1), + tsdb: tsdbID(1), gaps: []v1.FingerprintBounds{ v1.NewBounds(9, 10), }, @@ -221,3 +223,194 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { }) } } + +func genBlockRef(min, max model.Fingerprint) BlockRef { + bounds := v1.NewBounds(min, max) + return BlockRef{ + OwnershipRange: bounds, + } +} + +func Test_blockPlansForGaps(t *testing.T) { + for _, tc := range []struct { + desc string + ownershipRange v1.FingerprintBounds + tsdbs []tsdb.Identifier + metas []Meta + err bool + exp []blockPlan + }{ + { + desc: "single overlapping meta+no overlapping block", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.Identifier{tsdbID(0)}, + metas: []Meta{ + genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(11, 20)}), + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []gapWithBlocks{ + { + bounds: v1.NewBounds(0, 10), + }, + }, + }, + }, + }, + { + desc: "single overlapping meta+one overlapping block", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.Identifier{tsdbID(0)}, + metas: []Meta{ + genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(9, 20)}), + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []gapWithBlocks{ + { + bounds: v1.NewBounds(0, 10), + blocks: []BlockRef{genBlockRef(9, 20)}, + }, + }, + }, + }, + }, + { + // the range which needs to be generated doesn't overlap with existing blocks + // from other tsdb versions since theres an up to date tsdb version block, + // but we can trim the range needing generation + desc: "trims up to date area", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.Identifier{tsdbID(0)}, + metas: []Meta{ + genMeta(9, 20, []int{0}, []BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(9, 20, []int{1}, []BlockRef{genBlockRef(9, 20)}), // block for different tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []gapWithBlocks{ + { + bounds: v1.NewBounds(0, 8), + }, + }, + }, + }, + }, + { + desc: "uses old block for overlapping range", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.Identifier{tsdbID(0)}, + metas: []Meta{ + genMeta(9, 20, []int{0}, []BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(5, 20)}), // block for different tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []gapWithBlocks{ + { + bounds: v1.NewBounds(0, 8), + blocks: []BlockRef{genBlockRef(5, 20)}, + }, + }, + }, + }, + }, + { + desc: "multi case", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs + metas: []Meta{ + genMeta(0, 2, []int{0}, []BlockRef{ + genBlockRef(0, 1), + genBlockRef(1, 2), + }), // tsdb_0 + genMeta(6, 8, []int{0}, []BlockRef{genBlockRef(6, 8)}), // tsdb_0 + + genMeta(3, 5, []int{1}, []BlockRef{genBlockRef(3, 5)}), // tsdb_1 + genMeta(8, 10, []int{1}, []BlockRef{genBlockRef(8, 10)}), // tsdb_1 + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []gapWithBlocks{ + // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) + { + bounds: v1.NewBounds(3, 5), + blocks: []BlockRef{genBlockRef(3, 5)}, + }, + { + bounds: v1.NewBounds(9, 10), + blocks: []BlockRef{genBlockRef(8, 10)}, + }, + }, + }, + // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) + { + tsdb: tsdbID(1), + gaps: []gapWithBlocks{ + { + bounds: v1.NewBounds(0, 2), + blocks: []BlockRef{ + genBlockRef(0, 1), + genBlockRef(1, 2), + }, + }, + { + bounds: v1.NewBounds(6, 7), + blocks: []BlockRef{genBlockRef(6, 8)}, + }, + }, + }, + }, + }, + { + desc: "dedupes block refs", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.Identifier{tsdbID(0)}, + metas: []Meta{ + genMeta(9, 20, []int{1}, []BlockRef{ + genBlockRef(1, 4), + genBlockRef(9, 20), + }), // blocks for first diff tsdb + genMeta(5, 20, []int{2}, []BlockRef{ + genBlockRef(5, 10), + genBlockRef(9, 20), // same block references in prior meta (will be deduped) + }), // block for second diff tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []gapWithBlocks{ + { + bounds: v1.NewBounds(0, 10), + blocks: []BlockRef{ + genBlockRef(1, 4), + genBlockRef(5, 10), + genBlockRef(9, 20), + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested + // separately and it's used to generate input in our regular code path (easier to write tests this way). + gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) + require.NoError(t, err) + + plans, err := blockPlansForGaps(gaps, tc.metas) + if tc.err { + require.Error(t, err) + return + } + require.Equal(t, tc.exp, plans) + + }) + } +} diff --git a/pkg/bloomcompactor/v2spec.go b/pkg/bloomcompactor/v2spec.go index 49e74a47188a7..e0d964e9e9724 100644 --- a/pkg/bloomcompactor/v2spec.go +++ b/pkg/bloomcompactor/v2spec.go @@ -103,7 +103,7 @@ func NewSimpleBloomGenerator( store: store, chunkLoader: chunkLoader, blocks: blocks, - logger: logger, + logger: log.With(logger, "component", "bloom_generator"), readWriterFn: readWriterFn, metrics: metrics, diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 97c2571948096..d2722ad8f1496 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -56,7 +56,7 @@ type Task struct { // the last error of the task // needs to be a pointer so multiple copies of the task can modify its value err *wrappedError - // the respones received from the block queriers + // the responses received from the block queriers responses []v1.Output // series of the original request diff --git a/pkg/storage/bloom/v1/bounds.go b/pkg/storage/bloom/v1/bounds.go index fc22866285f82..6aff8ae2c1706 100644 --- a/pkg/storage/bloom/v1/bounds.go +++ b/pkg/storage/bloom/v1/bounds.go @@ -37,6 +37,13 @@ func (b FingerprintBounds) String() string { return b.Min.String() + "-" + b.Max.String() } +func (b FingerprintBounds) Less(other FingerprintBounds) bool { + if b.Min != other.Min { + return b.Min < other.Min + } + return b.Max <= other.Max +} + // Cmp returns the fingerprint's position relative to the bounds func (b FingerprintBounds) Cmp(fp model.Fingerprint) BoundsCheck { if fp < b.Min { diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index fc4868bd0de6a..26b9a39cfd7bf 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -15,6 +15,10 @@ import ( "github.com/grafana/loki/pkg/util/encoding" ) +var ( + DefaultBlockOptions = NewBlockOptions(4, 0) +) + type BlockOptions struct { // Schema determines the Schema of the block and cannot be changed Schema Schema @@ -521,7 +525,7 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { func(a, b *SeriesWithBloom) bool { return a.Series.Fingerprint == b.Series.Fingerprint }, - id[*SeriesWithBloom], + Identity[*SeriesWithBloom], func(a, b *SeriesWithBloom) *SeriesWithBloom { if len(a.Series.Chunks) > len(b.Series.Chunks) { return a diff --git a/pkg/storage/bloom/v1/dedupe.go b/pkg/storage/bloom/v1/dedupe.go index a322d8b4b2ef2..2e1a7cca42f36 100644 --- a/pkg/storage/bloom/v1/dedupe.go +++ b/pkg/storage/bloom/v1/dedupe.go @@ -12,7 +12,7 @@ type DedupeIter[A, B any] struct { } // general helper, in this case created for DedupeIter[T,T] -func id[A any](a A) A { return a } +func Identity[A any](a A) A { return a } func NewDedupingIter[A, B any]( eq func(A, B) bool, @@ -52,3 +52,20 @@ func (it *DedupeIter[A, B]) Err() error { func (it *DedupeIter[A, B]) At() B { return it.tmp } + +// Collect collects an interator into a slice. It uses +// CollectInto with a new slice +func Collect[T any](itr Iterator[T]) ([]T, error) { + return CollectInto(itr, nil) +} + +// CollectInto collects the elements of an iterator into a provided slice +// which is returned +func CollectInto[T any](itr Iterator[T], into []T) ([]T, error) { + into = into[:0] + + for itr.Next() { + into = append(into, itr.At()) + } + return into, itr.Err() +} diff --git a/pkg/storage/bloom/v1/dedupe_test.go b/pkg/storage/bloom/v1/dedupe_test.go index 443d8e3e3750e..524e3d4a13a56 100644 --- a/pkg/storage/bloom/v1/dedupe_test.go +++ b/pkg/storage/bloom/v1/dedupe_test.go @@ -28,7 +28,7 @@ func TestMergeDedupeIter(t *testing.T) { } deduper := NewDedupingIter[*SeriesWithBloom, *SeriesWithBloom]( eq, - id[*SeriesWithBloom], + Identity[*SeriesWithBloom], merge, NewPeekingIter[*SeriesWithBloom](mbq), ) diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 1716feef35b03..31fcdc643936b 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -241,3 +241,8 @@ func PointerSlice[T any](xs []T) []*T { } return out } + +type CloseableIterator[T any] interface { + Iterator[T] + Close() error +}