diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 0386604bfae5a..cf28fab615aae 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -21,6 +21,7 @@ type Block struct { reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)? initialized bool + dataRange SeriesHeader } func NewBlock(reader BlockReader) *Block { @@ -41,6 +42,13 @@ func (b *Block) LoadHeaders() error { return errors.Wrap(err, "decoding index") } + // TODO(owen-d): better pattern + xs := make([]SeriesHeader, 0, len(b.index.pageHeaders)) + for _, h := range b.index.pageHeaders { + xs = append(xs, h.SeriesHeader) + } + b.dataRange = aggregateHeaders(xs) + blooms, err := b.reader.Blooms() if err != nil { return errors.Wrap(err, "getting blooms reader") diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 30885d9b873e0..317d1e598414a 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -22,11 +22,6 @@ type BlockWriter interface { Size() (int, error) // byte size of accumualted index & blooms } -type BlockReader interface { - Index() (io.ReadSeeker, error) - Blooms() (io.ReadSeeker, error) -} - // in memory impl type MemoryBlockWriter struct { index, blooms *bytes.Buffer @@ -116,71 +111,3 @@ func (b *DirectoryBlockWriter) Size() (int, error) { } return size, nil } - -// In memory reader -type ByteReader struct { - index, blooms *bytes.Buffer -} - -func NewByteReader(index, blooms *bytes.Buffer) *ByteReader { - return &ByteReader{index: index, blooms: blooms} -} - -func (r *ByteReader) Index() (io.ReadSeeker, error) { - return bytes.NewReader(r.index.Bytes()), nil -} - -func (r *ByteReader) Blooms() (io.ReadSeeker, error) { - return bytes.NewReader(r.blooms.Bytes()), nil -} - -// File reader -type DirectoryBlockReader struct { - dir string - blooms, index *os.File - - initialized bool -} - -func NewDirectoryBlockReader(dir string) *DirectoryBlockReader { - return &DirectoryBlockReader{ - dir: dir, - initialized: false, - } -} - -func (r *DirectoryBlockReader) Init() error { - if !r.initialized { - var err error - r.index, err = os.Open(filepath.Join(r.dir, seriesFileName)) - if err != nil { - return errors.Wrap(err, "opening series file") - } - - r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName)) - if err != nil { - return errors.Wrap(err, "opening bloom file") - } - - r.initialized = true - } - return nil -} - -func (r *DirectoryBlockReader) Index() (io.ReadSeeker, error) { - if !r.initialized { - if err := r.Init(); err != nil { - return nil, err - } - } - return r.index, nil -} - -func (r *DirectoryBlockReader) Blooms() (io.ReadSeeker, error) { - if !r.initialized { - if err := r.Init(); err != nil { - return nil, err - } - } - return r.blooms, nil -} diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index ce3d85c84c4dd..76ff08f3860e8 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -5,11 +5,13 @@ import ( "fmt" "hash" "io" + "sort" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/pkg/util/encoding" ) @@ -62,19 +64,10 @@ type SeriesWithBloom struct { func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) error { for itr.Next() { - series := itr.At() - - offset, err := b.blooms.Append(series) - if err != nil { - return errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint) + if err := b.AddSeries(itr.At()); err != nil { + return err } - if err := b.index.Append(SeriesWithOffset{ - Offset: offset, - Series: *series.Series, - }); err != nil { - return errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint) - } } if err := itr.Err(); err != nil { @@ -90,6 +83,22 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) error { return nil } +func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { + offset, err := b.blooms.Append(series) + if err != nil { + return errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint) + } + + if err := b.index.Append(SeriesWithOffset{ + Offset: offset, + Series: *series.Series, + }); err != nil { + return errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint) + } + + return nil +} + type BloomBlockBuilder struct { opts BlockOptions writer io.WriteCloser @@ -429,3 +438,134 @@ func (b *IndexBuilder) Close() error { } return errors.Wrap(b.writer.Close(), "closing series writer") } + +// SortBlocksIntoOverlappingGroups sorts a list of blocks into a sorted list of lists, +// where each list contains blocks that overlap with each other. +// TODO(owen-d): implement as an iterator so we don't have to load all blocks at once +// NB: unused now, but likely useful when we want to optimize compaction. I wrote this expecting to need it now +// but it feels unsavory to remove it +func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) { + sort.Slice(xs, func(i, j int) bool { + a, b := xs[i].index, xs[j].index + return a.pageHeaders[0].FromFp <= b.pageHeaders[0].FromFp + }) + + var curGroup []*Block + for _, x := range xs { + switch { + case len(curGroup) == 0: + curGroup = append(curGroup, x) + case curGroup[len(curGroup)-1].dataRange.OverlapFingerprintRange(x.dataRange): + curGroup = append(curGroup, x) + default: + groups = append(groups, curGroup) + curGroup = []*Block{x} + } + } + + if len(curGroup) > 0 { + groups = append(groups, curGroup) + } + return groups +} + +// Simplistic implementation of a merge builder that builds a single block +// from a list of blocks and a store of series. +type MergeBuilder struct { + // existing blocks + blocks []*Block + // store + store Iterator[*Series] + // Add chunks to a bloom + populate func(*Series, *Bloom) error +} + +func NewMergeBuilder(blocks []*Block, store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { + return &MergeBuilder{ + blocks: blocks, + store: store, + populate: populate, + } +} + +// NB: this will build one block. Ideally we would build multiple blocks once a target size threshold is met +// but this gives us a good starting point. +func (mb *MergeBuilder) Build(builder *BlockBuilder) error { + var ( + xs = make([]PeekingIterator[*SeriesWithBloom], 0, len(mb.blocks)) + nextInBlocks *SeriesWithBloom + ) + + for _, block := range mb.blocks { + xs = append(xs, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(block))) + } + + // Turn the list of blocks into a single iterator that returns the next series + mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewMergeBlockQuerier(xs...)) + // two overlapping blocks can conceivably have the same series, so we need to dedupe, + // preferring the one with the most chunks already indexed since we'll have + // to add fewer chunks to the bloom + deduped := NewDedupingIter[*SeriesWithBloom]( + func(a, b *SeriesWithBloom) bool { + return a.Series.Fingerprint == b.Series.Fingerprint + }, + func(a, b *SeriesWithBloom) *SeriesWithBloom { + if len(a.Series.Chunks) > len(b.Series.Chunks) { + return a + } + return b + }, + mergedBlocks, + ) + + for mb.store.Next() { + nextInStore := mb.store.At() + + // advance the merged blocks iterator until we find a series that is + // greater than or equal to the next series in the store. + // TODO(owen-d): expensive, but Seek is not implemented for this itr. + // It's also more efficient to build an iterator over the Series file in the index + // without the blooms until we find a bloom we actually need to unpack from the blooms file. + for nextInBlocks == nil || nextInBlocks.Series.Fingerprint < mb.store.At().Fingerprint { + if !deduped.Next() { + // we've exhausted all the blocks + nextInBlocks = nil + break + } + nextInBlocks = deduped.At() + } + + cur := nextInBlocks + chunksToAdd := nextInStore.Chunks + // The next series from the store doesn't exist in the blocks, so we add it + // in its entirety + if nextInBlocks == nil || nextInBlocks.Series.Fingerprint > nextInStore.Fingerprint { + cur = &SeriesWithBloom{ + Series: nextInStore, + Bloom: &Bloom{ + ScalableBloomFilter: *filter.NewScalableBloomFilter(1024, 0.01, 0.8), + }, + } + } else { + // if the series already exists in the block, we only need to add the new chunks + chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks) + } + + if len(chunksToAdd) > 0 { + if err := mb.populate( + &Series{ + Fingerprint: nextInStore.Fingerprint, + Chunks: chunksToAdd, + }, + cur.Bloom, + ); err != nil { + return errors.Wrapf(err, "populating bloom for series with fingerprint: %v", nextInStore.Fingerprint) + } + } + + if err := builder.AddSeries(*cur); err != nil { + return errors.Wrap(err, "adding series to block") + } + } + return nil +} diff --git a/pkg/storage/bloom/v1/dedupe.go b/pkg/storage/bloom/v1/dedupe.go index 500f26489c640..759de7e686637 100644 --- a/pkg/storage/bloom/v1/dedupe.go +++ b/pkg/storage/bloom/v1/dedupe.go @@ -1,9 +1,9 @@ package v1 -// MergeDedupeIter is a deduplicating iterator that merges adjacent elements +// DedupeIter is a deduplicating iterator that merges adjacent elements // It's intended to be used when merging multiple blocks, // each of which may contain the same fingerprints -type MergeDedupeIter[T any] struct { +type DedupeIter[T any] struct { eq func(T, T) bool merge func(T, T) T itr PeekingIterator[T] @@ -11,19 +11,19 @@ type MergeDedupeIter[T any] struct { tmp []T } -func NewMergeDedupingIter[T any]( +func NewDedupingIter[T any]( eq func(T, T) bool, merge func(T, T) T, itr PeekingIterator[T], -) *MergeDedupeIter[T] { - return &MergeDedupeIter[T]{ +) *DedupeIter[T] { + return &DedupeIter[T]{ eq: eq, merge: merge, itr: itr, } } -func (it *MergeDedupeIter[T]) Next() bool { +func (it *DedupeIter[T]) Next() bool { it.tmp = it.tmp[:0] if !it.itr.Next() { return false @@ -40,17 +40,16 @@ func (it *MergeDedupeIter[T]) Next() bool { } // merge all the elements in tmp - for len(it.tmp) > 1 { - it.tmp[len(it.tmp)-2] = it.merge(it.tmp[len(it.tmp)-2], it.tmp[len(it.tmp)-1]) - it.tmp = it.tmp[:len(it.tmp)-1] + for i := len(it.tmp) - 1; i > 0; i-- { + it.tmp[i-1] = it.merge(it.tmp[i-1], it.tmp[i]) } return true } -func (it *MergeDedupeIter[T]) Err() error { +func (it *DedupeIter[T]) Err() error { return it.itr.Err() } -func (it *MergeDedupeIter[T]) At() T { +func (it *DedupeIter[T]) At() T { return it.tmp[0] } diff --git a/pkg/storage/bloom/v1/dedupe_test.go b/pkg/storage/bloom/v1/dedupe_test.go index b7aefc75db9ea..08e0ea2f85a19 100644 --- a/pkg/storage/bloom/v1/dedupe_test.go +++ b/pkg/storage/bloom/v1/dedupe_test.go @@ -26,7 +26,7 @@ func TestMergeDedupeIter(t *testing.T) { merge := func(a, _ *SeriesWithBloom) *SeriesWithBloom { return a } - deduper := NewMergeDedupingIter[*SeriesWithBloom]( + deduper := NewDedupingIter[*SeriesWithBloom]( eq, merge, NewPeekingIter[*SeriesWithBloom](mbq), diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index f0a6257d53170..98e170b183e7c 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -198,6 +198,10 @@ type SeriesHeader struct { FromTs, ThroughTs model.Time } +func (h SeriesHeader) OverlapFingerprintRange(other SeriesHeader) bool { + return h.ThroughFp >= other.FromFp && h.FromFp <= other.ThroughFp +} + // build one aggregated header for the entire block func aggregateHeaders(xs []SeriesHeader) SeriesHeader { if len(xs) == 0 { @@ -333,7 +337,7 @@ func (d *SeriesPageDecoder) Err() error { type Series struct { Fingerprint model.Fingerprint - Chunks []ChunkRef + Chunks ChunkRefs } type SeriesWithOffset struct { diff --git a/pkg/storage/bloom/v1/reader.go b/pkg/storage/bloom/v1/reader.go new file mode 100644 index 0000000000000..e4de9609b9082 --- /dev/null +++ b/pkg/storage/bloom/v1/reader.go @@ -0,0 +1,83 @@ +package v1 + +import ( + "bytes" + "io" + "os" + "path/filepath" + + "github.com/pkg/errors" +) + +type BlockReader interface { + Index() (io.ReadSeeker, error) + Blooms() (io.ReadSeeker, error) +} + +// In memory reader +type ByteReader struct { + index, blooms *bytes.Buffer +} + +func NewByteReader(index, blooms *bytes.Buffer) *ByteReader { + return &ByteReader{index: index, blooms: blooms} +} + +func (r *ByteReader) Index() (io.ReadSeeker, error) { + return bytes.NewReader(r.index.Bytes()), nil +} + +func (r *ByteReader) Blooms() (io.ReadSeeker, error) { + return bytes.NewReader(r.blooms.Bytes()), nil +} + +// File reader +type DirectoryBlockReader struct { + dir string + blooms, index *os.File + + initialized bool +} + +func NewDirectoryBlockReader(dir string) *DirectoryBlockReader { + return &DirectoryBlockReader{ + dir: dir, + initialized: false, + } +} + +func (r *DirectoryBlockReader) Init() error { + if !r.initialized { + var err error + r.index, err = os.Open(filepath.Join(r.dir, seriesFileName)) + if err != nil { + return errors.Wrap(err, "opening series file") + } + + r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName)) + if err != nil { + return errors.Wrap(err, "opening bloom file") + } + + r.initialized = true + } + return nil +} + +func (r *DirectoryBlockReader) Index() (io.ReadSeeker, error) { + if !r.initialized { + if err := r.Init(); err != nil { + return nil, err + } + } + return r.index, nil +} + +func (r *DirectoryBlockReader) Blooms() (io.ReadSeeker, error) { + if !r.initialized { + if err := r.Init(); err != nil { + return nil, err + } + } + return r.blooms, nil +}